Add backoff for fetching feeds that are down

This commit is contained in:
Tulir Asokan 2021-04-10 00:30:10 +03:00
parent 794d8e1bb9
commit 5efba56c3b
4 changed files with 57 additions and 22 deletions

View File

@ -1,5 +1,7 @@
# Feed update interval in minutes # Feed update interval in minutes
update_interval: 60 update_interval: 60
# Maximum backoff in minutes when failing to fetch feeds (defaults to 5 days)
max_backoff: 7200
# The time to sleep between send requests when broadcasting a new feed entry. # The time to sleep between send requests when broadcasting a new feed entry.
# Set to 0 to disable sleep or -1 to run all requests asynchronously at once. # Set to 0 to disable sleep or -1 to run all requests asynchronously at once.
spam_sleep: 2 spam_sleep: 2

View File

@ -1,6 +1,6 @@
maubot: 0.1.0 maubot: 0.1.0
id: xyz.maubot.rss id: xyz.maubot.rss
version: 0.2.3 version: 0.2.4
license: AGPL-3.0-or-later license: AGPL-3.0-or-later
modules: modules:
- rss - rss

View File

@ -1,5 +1,5 @@
# rss - A maubot plugin to subscribe to RSS/Atom feeds. # rss - A maubot plugin to subscribe to RSS/Atom feeds.
# Copyright (C) 2020 Tulir Asokan # Copyright (C) 2021 Tulir Asokan
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -18,6 +18,7 @@ from datetime import datetime
from time import mktime, time from time import mktime, time
from string import Template from string import Template
import asyncio import asyncio
import time
import aiohttp import aiohttp
import hashlib import hashlib
@ -31,10 +32,13 @@ from maubot.handlers import command, event
from .db import Database, Feed, Entry, Subscription from .db import Database, Feed, Entry, Subscription
rss_change_level = EventType.find("xyz.maubot.rss", t_class=EventType.Class.STATE)
class Config(BaseProxyConfig): class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None: def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("update_interval") helper.copy("update_interval")
helper.copy("max_backoff")
helper.copy("spam_sleep") helper.copy("spam_sleep")
helper.copy("command_prefix") helper.copy("command_prefix")
helper.copy("admins") helper.copy("admins")
@ -108,14 +112,25 @@ class RSSBot(Plugin):
subs = self.db.get_feeds() subs = self.db.get_feeds()
if not subs: if not subs:
return return
for res in asyncio.as_completed([self.try_parse_feed(feed=feed) for feed in subs]): now = int(time.time())
tasks = [self.try_parse_feed(feed=feed) for feed in subs if feed.next_retry < now]
feed: Feed
entries: Iterable[Entry]
for res in asyncio.as_completed(tasks):
feed, entries = await res feed, entries = await res
if not entries: if not entries:
error_count = feed.error_count + 1
next_retry_delay = self.config["update_interval"] * 60 * error_count
next_retry_delay = min(next_retry_delay, self.config["max_backoff"] * 60)
next_retry = int(time.time() + next_retry_delay)
self.db.set_backoff(feed, error_count, next_retry)
continue continue
elif feed.error_count > 0:
self.db.set_backoff(feed, error_count=0, next_retry=0)
try: try:
new_entries = {entry.id: entry for entry in entries} new_entries = {entry.id: entry for entry in entries}
except Exception: except Exception:
self.log.exception(f"Error items of {feed.url}") self.log.exception(f"Weird error in items of {feed.url}")
continue continue
for old_entry in self.db.get_entries(feed.id): for old_entry in self.db.get_entries(feed.id):
new_entries.pop(old_entry.id, None) new_entries.pop(old_entry.id, None)
@ -137,8 +152,8 @@ class RSSBot(Plugin):
async def try_parse_feed(self, feed: Optional[Feed] = None) -> Tuple[Feed, Iterable[Entry]]: async def try_parse_feed(self, feed: Optional[Feed] = None) -> Tuple[Feed, Iterable[Entry]]:
try: try:
return await self.parse_feed(feed=feed) return await self.parse_feed(feed=feed)
except Exception: except Exception as e:
self.log.exception(f"Failed to parse feed {feed.id} / {feed.url}") self.log.warning(f"Failed to parse feed {feed.id} / {feed.url}: {e}")
return feed, [] return feed, []
async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None
@ -146,7 +161,7 @@ class RSSBot(Plugin):
if feed is None: if feed is None:
if url is None: if url is None:
raise ValueError("Either feed or url must be set") raise ValueError("Either feed or url must be set")
feed = Feed(-1, url, "", "", "", []) feed = Feed(-1, url, "", "", "", 0, 0, [])
elif url is not None: elif url is not None:
raise ValueError("Only one of feed or url must be set") raise ValueError("Only one of feed or url must be set")
resp = await self.http.get(feed.url) resp = await self.http.get(feed.url)
@ -167,7 +182,7 @@ class RSSBot(Plugin):
raise ValueError("Feed is not a valid JSON feed (items is not a list)") raise ValueError("Feed is not a valid JSON feed (items is not a list)")
feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""), feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""),
url=feed.url, link=content.get("home_page_url", ""), url=feed.url, link=content.get("home_page_url", ""),
subscriptions=feed.subscriptions) next_retry=0, error_count=0, subscriptions=feed.subscriptions)
return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"]) return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"])
@classmethod @classmethod
@ -203,7 +218,7 @@ class RSSBot(Plugin):
feed_data = parsed_data.get("feed", {}) feed_data = parsed_data.get("feed", {})
feed = Feed(id=feed.id, url=feed.url, title=feed_data.get("title", feed.url), feed = Feed(id=feed.id, url=feed.url, title=feed_data.get("title", feed.url),
subtitle=feed_data.get("description", ""), link=feed_data.get("link", ""), subtitle=feed_data.get("description", ""), link=feed_data.get("link", ""),
subscriptions=feed.subscriptions) error_count=0, next_retry=0, subscriptions=feed.subscriptions)
return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries) return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries)
@classmethod @classmethod
@ -249,8 +264,8 @@ class RSSBot(Plugin):
return True return True
levels = await self.get_power_levels(evt.room_id) levels = await self.get_power_levels(evt.room_id)
user_level = levels.get_user_level(evt.sender) user_level = levels.get_user_level(evt.sender)
state_level = levels.events.get("xyz.maubot.rss", levels.state_default) state_level = levels.get_event_level(rss_change_level)
if type(state_level) != int: if not isinstance(state_level, int):
state_level = 50 state_level = 50
if user_level < state_level: if user_level < state_level:
await evt.reply("You don't have the permission to " await evt.reply("You don't have the permission to "
@ -278,6 +293,8 @@ class RSSBot(Plugin):
return return
feed = self.db.create_feed(info) feed = self.db.create_feed(info)
self.db.add_entries(entries, override_feed_id=feed.id) self.db.add_entries(entries, override_feed_id=feed.id)
elif feed.error_count > 0:
self.db.set_backoff(feed, error_count=feed.error_count, next_retry=0)
self.db.subscribe(feed.id, evt.room_id, evt.sender) self.db.subscribe(feed.id, evt.room_id, evt.sender)
await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})") await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})")

View File

@ -26,8 +26,8 @@ from mautrix.types import UserID, RoomID
Subscription = NamedTuple("Subscription", feed_id=int, room_id=RoomID, user_id=UserID, Subscription = NamedTuple("Subscription", feed_id=int, room_id=RoomID, user_id=UserID,
notification_template=Template, send_notice=bool) notification_template=Template, send_notice=bool)
Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, next_retry=int,
subscriptions=List[Subscription]) error_count=int, subscriptions=List[Subscription])
Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, link=str) Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, link=str)
@ -46,7 +46,9 @@ class Database:
Column("url", Text, nullable=False, unique=True), Column("url", Text, nullable=False, unique=True),
Column("title", Text, nullable=False), Column("title", Text, nullable=False),
Column("subtitle", Text, nullable=False), Column("subtitle", Text, nullable=False),
Column("link", Text, nullable=False)) Column("link", Text, nullable=False),
Column("next_retry", Integer, nullable=False),
Column("error_count", Integer, nullable=False))
self.subscription = Table("subscription", metadata, self.subscription = Table("subscription", metadata,
Column("feed_id", Integer, ForeignKey("feed.id"), Column("feed_id", Integer, ForeignKey("feed.id"),
primary_key=True), primary_key=True),
@ -104,6 +106,10 @@ class Database:
if version == 1: if version == 1:
self.db.execute("ALTER TABLE subscription ADD COLUMN send_notice BOOLEAN DEFAULT true") self.db.execute("ALTER TABLE subscription ADD COLUMN send_notice BOOLEAN DEFAULT true")
version = 2 version = 2
if version == 2:
self.db.execute("ALTER TABLE feed ADD COLUMN next_retry BIGINT DEFAULT 0")
self.db.execute("ALTER TABLE feed ADD COLUMN error_count BIGINT DEFAULT 0")
version = 3
self.db.execute(self.version.delete()) self.db.execute(self.version.delete())
self.db.execute(self.version.insert().values(version=version)) self.db.execute(self.version.insert().values(version=version))
@ -116,9 +122,10 @@ class Database:
.where(self.subscription.c.feed_id == self.feed.c.id)) .where(self.subscription.c.feed_id == self.feed.c.id))
map: Dict[int, Feed] = {} map: Dict[int, Feed] = {}
for row in rows: for row in rows:
(feed_id, url, title, subtitle, link, (feed_id, url, title, subtitle, link, next_retry, error_count,
room_id, user_id, notification_template, send_notice) = row room_id, user_id, notification_template, send_notice) = row
map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, subscriptions=[])) map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, next_retry,
error_count, subscriptions=[]))
map[feed_id].subscriptions.append( map[feed_id].subscriptions.append(
Subscription(feed_id=feed_id, room_id=room_id, user_id=user_id, Subscription(feed_id=feed_id, room_id=room_id, user_id=user_id,
notification_template=Template(notification_template), notification_template=Template(notification_template),
@ -126,8 +133,10 @@ class Database:
return map.values() return map.values()
def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Tuple[Feed, UserID]]: def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Tuple[Feed, UserID]]:
return ((Feed(feed_id, url, title, subtitle, link, subscriptions=[]), user_id) return ((Feed(feed_id, url, title, subtitle, link, next_retry, error_count,
for (feed_id, url, title, subtitle, link, user_id) in subscriptions=[]),
user_id)
for (feed_id, url, title, subtitle, link, next_retry, error_count, user_id) in
self.db.execute(select([self.feed, self.subscription.c.user_id]) self.db.execute(select([self.feed, self.subscription.c.user_id])
.where(and_(self.subscription.c.room_id == room_id, .where(and_(self.subscription.c.room_id == room_id,
self.subscription.c.feed_id == self.feed.c.id)))) self.subscription.c.feed_id == self.feed.c.id))))
@ -174,12 +183,12 @@ class Database:
.where(and_(tbl.c.feed_id == feed_id, tbl.c.room_id == room_id, .where(and_(tbl.c.feed_id == feed_id, tbl.c.room_id == room_id,
self.feed.c.id == feed_id))) self.feed.c.id == feed_id)))
try: try:
(feed_id, url, title, subtitle, link, (feed_id, url, title, subtitle, link, next_retry, error_count,
room_id, user_id, template, send_notice) = next(rows) room_id, user_id, template, send_notice) = next(rows)
notification_template = Template(template) notification_template = Template(template)
return (Subscription(feed_id, room_id, user_id, notification_template, send_notice) return (Subscription(feed_id, room_id, user_id, notification_template, send_notice)
if room_id else None, if room_id else None,
Feed(feed_id, url, title, subtitle, link, [])) Feed(feed_id, url, title, subtitle, link, next_retry, error_count, []))
except (ValueError, StopIteration): except (ValueError, StopIteration):
return None, None return None, None
@ -190,9 +199,16 @@ class Database:
def create_feed(self, info: Feed) -> Feed: def create_feed(self, info: Feed) -> Feed:
res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title, res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link)) subtitle=info.subtitle, link=info.link,
next_retry=info.next_retry))
return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title, return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link, subscriptions=[]) subtitle=info.subtitle, link=info.link, next_retry=info.next_retry,
error_count=info.error_count, subscriptions=[])
def set_backoff(self, info: Feed, error_count: int, next_retry: int) -> None:
self.db.execute(self.feed.update()
.where(self.feed.c.id == info.id)
.values(error_count=error_count, next_retry=next_retry))
def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None: def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None:
self.db.execute(self.subscription.insert().values( self.db.execute(self.subscription.insert().values(