Add parallelism limit for polling feeds

This commit is contained in:
Tulir Asokan 2025-08-27 14:44:12 +03:00
parent 93984bef86
commit 7df6c62f6b
2 changed files with 23 additions and 3 deletions

View file

@ -18,7 +18,7 @@ from __future__ import annotations
from typing import Any, Iterable
from datetime import datetime
from string import Template
from time import mktime, time
from time import mktime, monotonic, time
import asyncio
import hashlib
import html
@ -55,6 +55,7 @@ class Config(BaseProxyConfig):
helper.copy("command_prefix")
helper.copy("notification_template")
helper.copy("admins")
helper.copy("poll_parallelism_limit")
class BoolArgument(command.Argument):
@ -75,6 +76,7 @@ class BoolArgument(command.Argument):
class RSSBot(Plugin):
dbm: DBManager
poll_task: asyncio.Future
poll_sema: asyncio.Semaphore | None
http: aiohttp.ClientSession
power_level_cache: dict[RoomID, tuple[int, PowerLevelStateEventContent]]
@ -88,12 +90,19 @@ class RSSBot(Plugin):
async def start(self) -> None:
await super().start()
self.config.load_and_update()
self.on_external_config_update()
self.dbm = DBManager(self.database)
self.http = self.client.api.session
self.power_level_cache = {}
self.poll_task = asyncio.create_task(self.poll_feeds())
def on_external_config_update(self) -> None:
self.config.load_and_update()
poll_parallelism_limit = self.config["poll_parallelism_limit"]
self.poll_sema = (
asyncio.Semaphore(poll_parallelism_limit) if poll_parallelism_limit > 0 else None
)
async def stop(self) -> None:
await super().stop()
self.poll_task.cancel()
@ -142,6 +151,7 @@ class RSSBot(Plugin):
if not subs:
return
now = int(time())
start = monotonic()
tasks = [self.try_parse_feed(feed=feed) for feed in subs if feed.next_retry < now]
feed: Feed
entries: Iterable[Entry]
@ -176,7 +186,8 @@ class RSSBot(Plugin):
await self.dbm.add_entries(new_entry_list)
for entry in new_entry_list:
await self._broadcast(feed, entry, feed.subscriptions)
self.log.info(f"Finished polling {len(tasks)} feeds")
duration = monotonic() - now
self.log.info(f"Finished polling {len(tasks)} feeds in {duration:.2f} seconds")
async def _poll_feeds(self) -> None:
self.log.debug("Polling started")
@ -190,6 +201,13 @@ class RSSBot(Plugin):
await asyncio.sleep(self.config["update_interval"] * 60)
async def try_parse_feed(self, feed: Feed | None = None) -> tuple[Feed, list[Entry]]:
if self.poll_sema is not None:
async with self.poll_sema:
return await self._try_parse_feed(feed)
else:
return await self._try_parse_feed(feed)
async def _try_parse_feed(self, feed: Feed | None = None) -> tuple[Feed, list[Entry]]:
try:
self.log.trace(
f"Trying to fetch {feed.id} / {feed.url} "