Add support for JSON feeds

This commit is contained in:
Tulir Asokan 2020-08-03 03:03:19 +03:00
parent c07beb23be
commit 44927f2cf5
3 changed files with 100 additions and 48 deletions

View File

@ -1,5 +1,6 @@
maubot: 0.1.0
id: xyz.maubot.rss
version: 0.1.0
version: 0.2.0
license: AGPL-3.0-or-later
modules:
- rss

View File

@ -13,11 +13,12 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Type, List, Any, Dict, Tuple, Awaitable
from typing import Type, List, Any, Dict, Tuple, Awaitable, Iterable, Optional
from datetime import datetime
from time import mktime, time
from string import Template
import asyncio
import json
import aiohttp
import hashlib
@ -108,11 +109,15 @@ class RSSBot(Plugin):
subs = self.db.get_feeds()
if not subs:
return
datas = await asyncio.gather(*[self.read_feed(feed.url) for feed in subs])
for feed, (data, headers) in zip(subs, datas):
parsed_data = feedparser.parse(data, response_headers=headers)
entries = parsed_data.entries
new_entries = {entry.id: entry for entry in self.find_entries(feed.id, entries)}
for res in asyncio.as_completed([self.parse_feed(feed=feed) for feed in subs]):
feed, entries = await res
if not entries:
continue
try:
new_entries = {entry.id: entry for entry in entries}
except Exception:
self.log.exception(f"Error items of {feed.url}")
continue
for old_entry in self.db.get_entries(feed.id):
new_entries.pop(old_entry.id, None)
self.db.add_entries(new_entries.values())
@ -130,23 +135,82 @@ class RSSBot(Plugin):
self.log.exception("Error while polling feeds")
await asyncio.sleep(self.config["update_interval"] * 60, loop=self.loop)
async def read_feed(self, url: str) -> Tuple[str, Dict[str, str]]:
async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None
) -> Tuple[Feed, Iterable[Entry]]:
if feed is None:
if url is None:
raise ValueError("Either feed or url must be set")
feed = Feed(-1, url, "", "", "", [])
elif url is not None:
raise ValueError("Only one of feed or url must be set")
resp = await self.http.get(feed.url)
if resp.headers["Content-Type"] == "application/json":
return await self._parse_json(feed, resp)
else:
return await self._parse_rss(feed, resp)
@classmethod
async def _parse_json(cls, feed: Feed, resp: aiohttp.ClientResponse
) -> Tuple[Feed, Iterable[Entry]]:
content = await resp.json()
if not isinstance(content["items"], list):
raise ValueError("Feed is not a valid JSON feed (items is not a list)")
feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""),
url=feed.url, link=content.get("home_page_url", ""),
subscriptions=feed.subscriptions)
return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"])
@classmethod
def _parse_json_entry(cls, feed_id: int, entry: Dict[str, Any]) -> Entry:
try:
resp = await self.http.get(url)
except Exception:
self.log.exception(f"Error fetching {url}")
return "", {}
date = datetime.fromisoformat(entry["date_published"])
except (ValueError, KeyError):
date = datetime.now()
title = entry.get("title", "")
summary = (entry.get("summary")
or entry.get("content_html")
or entry.get("content_text")
or "")
id = str(entry["id"])
link = entry.get("url") or id
return Entry(feed_id=feed_id, id=id, date=date, title=title, summary=summary, link=link)
@classmethod
async def _parse_rss(cls, feed: Feed, resp: aiohttp.ClientResponse
) -> Tuple[Feed, Iterable[Entry]]:
try:
content = await resp.text()
except UnicodeDecodeError:
try:
content = await resp.text(encoding="utf-8")
except:
content = await resp.text(encoding="utf-8", errors="ignore")
except UnicodeDecodeError:
content = str(await resp.read())[2:-1]
return content, {"Content-Location": url, **resp.headers, "Content-Encoding": "identity"}
headers = {"Content-Location": feed.url, **resp.headers, "Content-Encoding": "identity"}
parsed_data = feedparser.parse(content, response_headers=headers)
if parsed_data.bozo:
raise ValueError("Feed is not valid")
feed = Feed(id=feed.id, url=feed.url, title=parsed_data.get("title", feed.url),
subtitle=parsed_data.get("description", ""), link=parsed_data.get("link", ""),
subscriptions=feed.subscriptions)
return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries)
@classmethod
def _parse_rss_entry(cls, feed_id: int, entry: Any) -> Entry:
return Entry(
feed_id=feed_id,
id=(getattr(entry, "id", None) or
hashlib.sha1(" ".join([getattr(entry, "title", ""),
getattr(entry, "description", ""),
getattr(entry, "link", "")]).encode("utf-8")
).hexdigest()),
date=cls._parse_rss_date(entry),
title=getattr(entry, "title", ""),
summary=getattr(entry, "description", ""),
link=getattr(entry, "link", ""),
)
@staticmethod
def get_date(entry: Any) -> datetime:
def _parse_rss_date(entry: Any) -> datetime:
try:
return datetime.fromtimestamp(mktime(entry["published_parsed"]))
except (KeyError, TypeError, ValueError):
@ -157,21 +221,6 @@ class RSSBot(Plugin):
pass
return datetime.now()
@classmethod
def find_entries(cls, feed_id: int, entries: List[Any]) -> List[Entry]:
return [Entry(
feed_id=feed_id,
id=(getattr(entry, "id", None) or
hashlib.sha1(" ".join([getattr(entry, "title", ""),
getattr(entry, "description", ""),
getattr(entry, "link", "")]).encode("utf-8")
).hexdigest()),
date=cls.get_date(entry),
title=getattr(entry, "title", ""),
summary=getattr(entry, "description", ""),
link=getattr(entry, "link", ""),
) for entry in entries]
async def get_power_levels(self, room_id: RoomID) -> PowerLevelStateEventContent:
try:
expiry, levels = self.power_level_cache[room_id]
@ -210,16 +259,13 @@ class RSSBot(Plugin):
return
feed = self.db.get_feed_by_url(url)
if not feed:
data, headers = await self.read_feed(url)
metadata = feedparser.parse(data, response_headers=headers)
if metadata.bozo:
await evt.reply("That doesn't look like a valid feed.")
try:
info, entries = await self.parse_feed(url=url)
except Exception as e:
await evt.reply(f"Failed to load feed: {e}")
return
channel = metadata.get("channel", {})
feed = self.db.create_feed(url, channel.get("title", url),
channel.get("description", ""),
channel.get("link", ""))
self.db.add_entries(self.find_entries(feed.id, metadata.entries))
feed = self.db.create_feed(info)
self.db.add_entries(entries, override_feed_id=feed.id)
self.db.subscribe(feed.id, evt.room_id, evt.sender)
await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})")
@ -248,11 +294,12 @@ class RSSBot(Plugin):
await evt.reply("This room is not subscribed to that feed")
return
self.db.update_template(feed.id, evt.room_id, template)
sub.notification_template = Template(template)
sample_entry = Entry(feed.id, "SAMPLE", datetime.now(), "Sample entry",
"This is a sample entry to demonstrate your new template",
"http://example.com")
await evt.reply(f"Template for feed ID {feed.id} updated. Sample notification:")
await self._send(feed, sample_entry, Template(template), sub.room_id)
await self._send(feed, sample_entry, sub)
@rss.subcommand("notice", aliases=("n",),
help="Set whether or not the bot should send updates as m.notice")

View File

@ -144,10 +144,14 @@ class Database:
return (Entry(*row) for row in
self.db.execute(select([self.entry]).where(self.entry.c.feed_id == feed_id)))
def add_entries(self, entries: Iterable[Entry]) -> None:
def add_entries(self, entries: Iterable[Entry], override_feed_id: Optional[int] = None) -> None:
if not entries:
return
self.db.execute(self.entry.insert(), [entry._asdict() for entry in entries])
entries = [entry._asdict() for entry in entries]
if override_feed_id is not None:
for entry in entries:
entry["feed_id"] = override_feed_id
self.db.execute(self.entry.insert(), entries)
def get_feed_by_url(self, url: str) -> Optional[Feed]:
rows = self.db.execute(select([self.feed]).where(self.feed.c.url == url))
@ -187,11 +191,11 @@ class Database:
.where(self.subscription.c.room_id == old)
.values(room_id=new))
def create_feed(self, url: str, title: str, subtitle: str, link: str) -> Feed:
res = self.db.execute(self.feed.insert().values(url=url, title=title, subtitle=subtitle,
link=link))
return Feed(id=res.inserted_primary_key[0], url=url, title=title, subtitle=subtitle,
link=link, subscriptions=[])
def create_feed(self, info: Feed) -> Feed:
res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link))
return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link, subscriptions=[])
def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None:
self.db.execute(self.subscription.insert().values(