From 7df6c62f6b20b16fbf01a37818d60c84868987ae Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 27 Aug 2025 14:44:12 +0300 Subject: [PATCH] Add parallelism limit for polling feeds --- base-config.yaml | 2 ++ rss/bot.py | 24 +++++++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/base-config.yaml b/base-config.yaml index 52c92d4..6df33fe 100644 --- a/base-config.yaml +++ b/base-config.yaml @@ -2,6 +2,8 @@ update_interval: 60 # Maximum backoff in minutes when failing to fetch feeds (defaults to 5 days) max_backoff: 7200 +# How many feeds to poll in parallel? Set to 0 to disable limit. +poll_parallelism_limit: 10 # The time to sleep between send requests when broadcasting a new feed entry. # Set to 0 to disable sleep or -1 to run all requests asynchronously at once. spam_sleep: 2 diff --git a/rss/bot.py b/rss/bot.py index 74c1681..91c745f 100644 --- a/rss/bot.py +++ b/rss/bot.py @@ -18,7 +18,7 @@ from __future__ import annotations from typing import Any, Iterable from datetime import datetime from string import Template -from time import mktime, time +from time import mktime, monotonic, time import asyncio import hashlib import html @@ -55,6 +55,7 @@ class Config(BaseProxyConfig): helper.copy("command_prefix") helper.copy("notification_template") helper.copy("admins") + helper.copy("poll_parallelism_limit") class BoolArgument(command.Argument): @@ -75,6 +76,7 @@ class BoolArgument(command.Argument): class RSSBot(Plugin): dbm: DBManager poll_task: asyncio.Future + poll_sema: asyncio.Semaphore | None http: aiohttp.ClientSession power_level_cache: dict[RoomID, tuple[int, PowerLevelStateEventContent]] @@ -88,12 +90,19 @@ class RSSBot(Plugin): async def start(self) -> None: await super().start() - self.config.load_and_update() + self.on_external_config_update() self.dbm = DBManager(self.database) self.http = self.client.api.session self.power_level_cache = {} self.poll_task = asyncio.create_task(self.poll_feeds()) + def on_external_config_update(self) -> None: + self.config.load_and_update() + poll_parallelism_limit = self.config["poll_parallelism_limit"] + self.poll_sema = ( + asyncio.Semaphore(poll_parallelism_limit) if poll_parallelism_limit > 0 else None + ) + async def stop(self) -> None: await super().stop() self.poll_task.cancel() @@ -142,6 +151,7 @@ class RSSBot(Plugin): if not subs: return now = int(time()) + start = monotonic() tasks = [self.try_parse_feed(feed=feed) for feed in subs if feed.next_retry < now] feed: Feed entries: Iterable[Entry] @@ -176,7 +186,8 @@ class RSSBot(Plugin): await self.dbm.add_entries(new_entry_list) for entry in new_entry_list: await self._broadcast(feed, entry, feed.subscriptions) - self.log.info(f"Finished polling {len(tasks)} feeds") + duration = monotonic() - now + self.log.info(f"Finished polling {len(tasks)} feeds in {duration:.2f} seconds") async def _poll_feeds(self) -> None: self.log.debug("Polling started") @@ -190,6 +201,13 @@ class RSSBot(Plugin): await asyncio.sleep(self.config["update_interval"] * 60) async def try_parse_feed(self, feed: Feed | None = None) -> tuple[Feed, list[Entry]]: + if self.poll_sema is not None: + async with self.poll_sema: + return await self._try_parse_feed(feed) + else: + return await self._try_parse_feed(feed) + + async def _try_parse_feed(self, feed: Feed | None = None) -> tuple[Feed, list[Entry]]: try: self.log.trace( f"Trying to fetch {feed.id} / {feed.url} "