From 5efba56c3bb2f380b57ceb512995fb2ff64e8535 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sat, 10 Apr 2021 00:30:10 +0300 Subject: [PATCH] Add backoff for fetching feeds that are down --- base-config.yaml | 2 ++ maubot.yaml | 2 +- rss/bot.py | 37 +++++++++++++++++++++++++++---------- rss/db.py | 38 +++++++++++++++++++++++++++----------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/base-config.yaml b/base-config.yaml index 90f649a..053b8b4 100644 --- a/base-config.yaml +++ b/base-config.yaml @@ -1,5 +1,7 @@ # Feed update interval in minutes update_interval: 60 +# Maximum backoff in minutes when failing to fetch feeds (defaults to 5 days) +max_backoff: 7200 # The time to sleep between send requests when broadcasting a new feed entry. # Set to 0 to disable sleep or -1 to run all requests asynchronously at once. spam_sleep: 2 diff --git a/maubot.yaml b/maubot.yaml index 87ac32e..e226e69 100644 --- a/maubot.yaml +++ b/maubot.yaml @@ -1,6 +1,6 @@ maubot: 0.1.0 id: xyz.maubot.rss -version: 0.2.3 +version: 0.2.4 license: AGPL-3.0-or-later modules: - rss diff --git a/rss/bot.py b/rss/bot.py index 1e98a78..85343fd 100644 --- a/rss/bot.py +++ b/rss/bot.py @@ -1,5 +1,5 @@ # rss - A maubot plugin to subscribe to RSS/Atom feeds. -# Copyright (C) 2020 Tulir Asokan +# Copyright (C) 2021 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -18,6 +18,7 @@ from datetime import datetime from time import mktime, time from string import Template import asyncio +import time import aiohttp import hashlib @@ -31,10 +32,13 @@ from maubot.handlers import command, event from .db import Database, Feed, Entry, Subscription +rss_change_level = EventType.find("xyz.maubot.rss", t_class=EventType.Class.STATE) + class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("update_interval") + helper.copy("max_backoff") helper.copy("spam_sleep") helper.copy("command_prefix") helper.copy("admins") @@ -108,14 +112,25 @@ class RSSBot(Plugin): subs = self.db.get_feeds() if not subs: return - for res in asyncio.as_completed([self.try_parse_feed(feed=feed) for feed in subs]): + now = int(time.time()) + tasks = [self.try_parse_feed(feed=feed) for feed in subs if feed.next_retry < now] + feed: Feed + entries: Iterable[Entry] + for res in asyncio.as_completed(tasks): feed, entries = await res if not entries: + error_count = feed.error_count + 1 + next_retry_delay = self.config["update_interval"] * 60 * error_count + next_retry_delay = min(next_retry_delay, self.config["max_backoff"] * 60) + next_retry = int(time.time() + next_retry_delay) + self.db.set_backoff(feed, error_count, next_retry) continue + elif feed.error_count > 0: + self.db.set_backoff(feed, error_count=0, next_retry=0) try: new_entries = {entry.id: entry for entry in entries} except Exception: - self.log.exception(f"Error items of {feed.url}") + self.log.exception(f"Weird error in items of {feed.url}") continue for old_entry in self.db.get_entries(feed.id): new_entries.pop(old_entry.id, None) @@ -137,8 +152,8 @@ class RSSBot(Plugin): async def try_parse_feed(self, feed: Optional[Feed] = None) -> Tuple[Feed, Iterable[Entry]]: try: return await self.parse_feed(feed=feed) - except Exception: - self.log.exception(f"Failed to parse feed {feed.id} / {feed.url}") + except Exception as e: + self.log.warning(f"Failed to parse feed {feed.id} / {feed.url}: {e}") return feed, [] async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None @@ -146,7 +161,7 @@ class RSSBot(Plugin): if feed is None: if url is None: raise ValueError("Either feed or url must be set") - feed = Feed(-1, url, "", "", "", []) + feed = Feed(-1, url, "", "", "", 0, 0, []) elif url is not None: raise ValueError("Only one of feed or url must be set") resp = await self.http.get(feed.url) @@ -167,7 +182,7 @@ class RSSBot(Plugin): raise ValueError("Feed is not a valid JSON feed (items is not a list)") feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""), url=feed.url, link=content.get("home_page_url", ""), - subscriptions=feed.subscriptions) + next_retry=0, error_count=0, subscriptions=feed.subscriptions) return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"]) @classmethod @@ -203,7 +218,7 @@ class RSSBot(Plugin): feed_data = parsed_data.get("feed", {}) feed = Feed(id=feed.id, url=feed.url, title=feed_data.get("title", feed.url), subtitle=feed_data.get("description", ""), link=feed_data.get("link", ""), - subscriptions=feed.subscriptions) + error_count=0, next_retry=0, subscriptions=feed.subscriptions) return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries) @classmethod @@ -249,8 +264,8 @@ class RSSBot(Plugin): return True levels = await self.get_power_levels(evt.room_id) user_level = levels.get_user_level(evt.sender) - state_level = levels.events.get("xyz.maubot.rss", levels.state_default) - if type(state_level) != int: + state_level = levels.get_event_level(rss_change_level) + if not isinstance(state_level, int): state_level = 50 if user_level < state_level: await evt.reply("You don't have the permission to " @@ -278,6 +293,8 @@ class RSSBot(Plugin): return feed = self.db.create_feed(info) self.db.add_entries(entries, override_feed_id=feed.id) + elif feed.error_count > 0: + self.db.set_backoff(feed, error_count=feed.error_count, next_retry=0) self.db.subscribe(feed.id, evt.room_id, evt.sender) await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})") diff --git a/rss/db.py b/rss/db.py index bccdd5e..3e71691 100644 --- a/rss/db.py +++ b/rss/db.py @@ -26,8 +26,8 @@ from mautrix.types import UserID, RoomID Subscription = NamedTuple("Subscription", feed_id=int, room_id=RoomID, user_id=UserID, notification_template=Template, send_notice=bool) -Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, - subscriptions=List[Subscription]) +Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, next_retry=int, + error_count=int, subscriptions=List[Subscription]) Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, link=str) @@ -46,7 +46,9 @@ class Database: Column("url", Text, nullable=False, unique=True), Column("title", Text, nullable=False), Column("subtitle", Text, nullable=False), - Column("link", Text, nullable=False)) + Column("link", Text, nullable=False), + Column("next_retry", Integer, nullable=False), + Column("error_count", Integer, nullable=False)) self.subscription = Table("subscription", metadata, Column("feed_id", Integer, ForeignKey("feed.id"), primary_key=True), @@ -104,6 +106,10 @@ class Database: if version == 1: self.db.execute("ALTER TABLE subscription ADD COLUMN send_notice BOOLEAN DEFAULT true") version = 2 + if version == 2: + self.db.execute("ALTER TABLE feed ADD COLUMN next_retry BIGINT DEFAULT 0") + self.db.execute("ALTER TABLE feed ADD COLUMN error_count BIGINT DEFAULT 0") + version = 3 self.db.execute(self.version.delete()) self.db.execute(self.version.insert().values(version=version)) @@ -116,9 +122,10 @@ class Database: .where(self.subscription.c.feed_id == self.feed.c.id)) map: Dict[int, Feed] = {} for row in rows: - (feed_id, url, title, subtitle, link, + (feed_id, url, title, subtitle, link, next_retry, error_count, room_id, user_id, notification_template, send_notice) = row - map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, subscriptions=[])) + map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, next_retry, + error_count, subscriptions=[])) map[feed_id].subscriptions.append( Subscription(feed_id=feed_id, room_id=room_id, user_id=user_id, notification_template=Template(notification_template), @@ -126,8 +133,10 @@ class Database: return map.values() def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Tuple[Feed, UserID]]: - return ((Feed(feed_id, url, title, subtitle, link, subscriptions=[]), user_id) - for (feed_id, url, title, subtitle, link, user_id) in + return ((Feed(feed_id, url, title, subtitle, link, next_retry, error_count, + subscriptions=[]), + user_id) + for (feed_id, url, title, subtitle, link, next_retry, error_count, user_id) in self.db.execute(select([self.feed, self.subscription.c.user_id]) .where(and_(self.subscription.c.room_id == room_id, self.subscription.c.feed_id == self.feed.c.id)))) @@ -174,12 +183,12 @@ class Database: .where(and_(tbl.c.feed_id == feed_id, tbl.c.room_id == room_id, self.feed.c.id == feed_id))) try: - (feed_id, url, title, subtitle, link, + (feed_id, url, title, subtitle, link, next_retry, error_count, room_id, user_id, template, send_notice) = next(rows) notification_template = Template(template) return (Subscription(feed_id, room_id, user_id, notification_template, send_notice) if room_id else None, - Feed(feed_id, url, title, subtitle, link, [])) + Feed(feed_id, url, title, subtitle, link, next_retry, error_count, [])) except (ValueError, StopIteration): return None, None @@ -190,9 +199,16 @@ class Database: def create_feed(self, info: Feed) -> Feed: res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title, - subtitle=info.subtitle, link=info.link)) + subtitle=info.subtitle, link=info.link, + next_retry=info.next_retry)) return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title, - subtitle=info.subtitle, link=info.link, subscriptions=[]) + subtitle=info.subtitle, link=info.link, next_retry=info.next_retry, + error_count=info.error_count, subscriptions=[]) + + def set_backoff(self, info: Feed, error_count: int, next_retry: int) -> None: + self.db.execute(self.feed.update() + .where(self.feed.c.id == info.id) + .values(error_count=error_count, next_retry=next_retry)) def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None: self.db.execute(self.subscription.insert().values(