mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-19 01:31:28 -05:00
796a4b7482
* Perfer `type(x) is int` to `isinstance(x, int)` This covered all additional instances I could see where `x` was user-controlled. The remaining cases are ``` $ rg -s 'isinstance.*[^_]int' tests/replication/_base.py 576: if isinstance(obj, int): synapse/util/caches/stream_change_cache.py 136: assert isinstance(stream_pos, int) 214: assert isinstance(stream_pos, int) 246: assert isinstance(stream_pos, int) 267: assert isinstance(stream_pos, int) synapse/replication/tcp/external_cache.py 133: if isinstance(result, int): synapse/metrics/__init__.py 100: if isinstance(calls, (int, float)): synapse/handlers/appservice.py 262: assert isinstance(new_token, int) synapse/config/_util.py 62: if isinstance(p, int): ``` which cover metrics, logic related to `jsonschema`, and replication and data streams. AFAICS these are all internal to Synapse * Changelog
266 lines
9.4 KiB
Python
266 lines
9.4 KiB
Python
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import html
|
|
import logging
|
|
import urllib.parse
|
|
from typing import TYPE_CHECKING, List, Optional
|
|
|
|
import attr
|
|
|
|
from synapse.rest.media.v1.preview_html import parse_html_description
|
|
from synapse.types import JsonDict
|
|
from synapse.util import json_decoder
|
|
|
|
if TYPE_CHECKING:
|
|
from lxml import etree
|
|
|
|
from synapse.server import HomeServer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
class OEmbedResult:
|
|
# The Open Graph result (converted from the oEmbed result).
|
|
open_graph_result: JsonDict
|
|
# The author_name of the oEmbed result
|
|
author_name: Optional[str]
|
|
# Number of milliseconds to cache the content, according to the oEmbed response.
|
|
#
|
|
# This will be None if no cache-age is provided in the oEmbed response (or
|
|
# if the oEmbed response cannot be turned into an Open Graph response).
|
|
cache_age: Optional[int]
|
|
|
|
|
|
class OEmbedProvider:
|
|
"""
|
|
A helper for accessing oEmbed content.
|
|
|
|
It can be used to check if a URL should be accessed via oEmbed and for
|
|
requesting/parsing oEmbed content.
|
|
"""
|
|
|
|
def __init__(self, hs: "HomeServer"):
|
|
self._oembed_patterns = {}
|
|
for oembed_endpoint in hs.config.oembed.oembed_patterns:
|
|
api_endpoint = oembed_endpoint.api_endpoint
|
|
|
|
# Only JSON is supported at the moment. This could be declared in
|
|
# the formats field. Otherwise, if the endpoint ends in .xml assume
|
|
# it doesn't support JSON.
|
|
if (
|
|
oembed_endpoint.formats is not None
|
|
and "json" not in oembed_endpoint.formats
|
|
) or api_endpoint.endswith(".xml"):
|
|
logger.info(
|
|
"Ignoring oEmbed endpoint due to not supporting JSON: %s",
|
|
api_endpoint,
|
|
)
|
|
continue
|
|
|
|
# Iterate through each URL pattern and point it to the endpoint.
|
|
for pattern in oembed_endpoint.url_patterns:
|
|
self._oembed_patterns[pattern] = api_endpoint
|
|
|
|
def get_oembed_url(self, url: str) -> Optional[str]:
|
|
"""
|
|
Check whether the URL should be downloaded as oEmbed content instead.
|
|
|
|
Args:
|
|
url: The URL to check.
|
|
|
|
Returns:
|
|
A URL to use instead or None if the original URL should be used.
|
|
"""
|
|
for url_pattern, endpoint in self._oembed_patterns.items():
|
|
if url_pattern.fullmatch(url):
|
|
# TODO Specify max height / width.
|
|
|
|
# Note that only the JSON format is supported, some endpoints want
|
|
# this in the URL, others want it as an argument.
|
|
endpoint = endpoint.replace("{format}", "json")
|
|
|
|
args = {"url": url, "format": "json"}
|
|
query_str = urllib.parse.urlencode(args, True)
|
|
return f"{endpoint}?{query_str}"
|
|
|
|
# No match.
|
|
return None
|
|
|
|
def autodiscover_from_html(self, tree: "etree.Element") -> Optional[str]:
|
|
"""
|
|
Search an HTML document for oEmbed autodiscovery information.
|
|
|
|
Args:
|
|
tree: The parsed HTML body.
|
|
|
|
Returns:
|
|
The URL to use for oEmbed information, or None if no URL was found.
|
|
"""
|
|
# Search for link elements with the proper rel and type attributes.
|
|
for tag in tree.xpath(
|
|
"//link[@rel='alternate'][@type='application/json+oembed']"
|
|
):
|
|
if "href" in tag.attrib:
|
|
return tag.attrib["href"]
|
|
|
|
# Some providers (e.g. Flickr) use alternative instead of alternate.
|
|
for tag in tree.xpath(
|
|
"//link[@rel='alternative'][@type='application/json+oembed']"
|
|
):
|
|
if "href" in tag.attrib:
|
|
return tag.attrib["href"]
|
|
|
|
return None
|
|
|
|
def parse_oembed_response(self, url: str, raw_body: bytes) -> OEmbedResult:
|
|
"""
|
|
Parse the oEmbed response into an Open Graph response.
|
|
|
|
Args:
|
|
url: The URL which is being previewed (not the one which was
|
|
requested).
|
|
raw_body: The oEmbed response as JSON encoded as bytes.
|
|
|
|
Returns:
|
|
json-encoded Open Graph data
|
|
"""
|
|
|
|
try:
|
|
# oEmbed responses *must* be UTF-8 according to the spec.
|
|
oembed = json_decoder.decode(raw_body.decode("utf-8"))
|
|
except ValueError:
|
|
return OEmbedResult({}, None, None)
|
|
|
|
# The version is a required string field, but not always provided,
|
|
# or sometimes provided as a float. Be lenient.
|
|
oembed_version = oembed.get("version", "1.0")
|
|
if oembed_version != "1.0" and oembed_version != 1:
|
|
return OEmbedResult({}, None, None)
|
|
|
|
# Attempt to parse the cache age, if possible.
|
|
try:
|
|
cache_age = int(oembed.get("cache_age")) * 1000
|
|
except (TypeError, ValueError):
|
|
# If the cache age cannot be parsed (e.g. wrong type or invalid
|
|
# string), ignore it.
|
|
cache_age = None
|
|
|
|
# The oEmbed response converted to Open Graph.
|
|
open_graph_response: JsonDict = {"og:url": url}
|
|
|
|
title = oembed.get("title")
|
|
if title and isinstance(title, str):
|
|
# A common WordPress plug-in seems to incorrectly escape entities
|
|
# in the oEmbed response.
|
|
open_graph_response["og:title"] = html.unescape(title)
|
|
|
|
author_name = oembed.get("author_name")
|
|
if not isinstance(author_name, str):
|
|
author_name = None
|
|
|
|
# Use the provider name and as the site.
|
|
provider_name = oembed.get("provider_name")
|
|
if provider_name and isinstance(provider_name, str):
|
|
open_graph_response["og:site_name"] = provider_name
|
|
|
|
# If a thumbnail exists, use it. Note that dimensions will be calculated later.
|
|
thumbnail_url = oembed.get("thumbnail_url")
|
|
if thumbnail_url and isinstance(thumbnail_url, str):
|
|
open_graph_response["og:image"] = thumbnail_url
|
|
|
|
# Process each type separately.
|
|
oembed_type = oembed.get("type")
|
|
if oembed_type == "rich":
|
|
html_str = oembed.get("html")
|
|
if isinstance(html_str, str):
|
|
calc_description_and_urls(open_graph_response, html_str)
|
|
|
|
elif oembed_type == "photo":
|
|
# If this is a photo, use the full image, not the thumbnail.
|
|
url = oembed.get("url")
|
|
if url and isinstance(url, str):
|
|
open_graph_response["og:image"] = url
|
|
|
|
elif oembed_type == "video":
|
|
open_graph_response["og:type"] = "video.other"
|
|
html_str = oembed.get("html")
|
|
if html_str and isinstance(html_str, str):
|
|
calc_description_and_urls(open_graph_response, oembed["html"])
|
|
for size in ("width", "height"):
|
|
val = oembed.get(size)
|
|
if type(val) is int:
|
|
open_graph_response[f"og:video:{size}"] = val
|
|
|
|
elif oembed_type == "link":
|
|
open_graph_response["og:type"] = "website"
|
|
|
|
else:
|
|
logger.warning("Unknown oEmbed type: %s", oembed_type)
|
|
|
|
return OEmbedResult(open_graph_response, author_name, cache_age)
|
|
|
|
|
|
def _fetch_urls(tree: "etree.Element", tag_name: str) -> List[str]:
|
|
results = []
|
|
for tag in tree.xpath("//*/" + tag_name):
|
|
if "src" in tag.attrib:
|
|
results.append(tag.attrib["src"])
|
|
return results
|
|
|
|
|
|
def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) -> None:
|
|
"""
|
|
Calculate description for an HTML document.
|
|
|
|
This uses lxml to convert the HTML document into plaintext. If errors
|
|
occur during processing of the document, an empty response is returned.
|
|
|
|
Args:
|
|
open_graph_response: The current Open Graph summary. This is updated with additional fields.
|
|
html_body: The HTML document, as bytes.
|
|
|
|
Returns:
|
|
The summary
|
|
"""
|
|
# If there's no body, nothing useful is going to be found.
|
|
if not html_body:
|
|
return
|
|
|
|
from lxml import etree
|
|
|
|
# Create an HTML parser. If this fails, log and return no metadata.
|
|
parser = etree.HTMLParser(recover=True, encoding="utf-8")
|
|
|
|
# Attempt to parse the body. If this fails, log and return no metadata.
|
|
tree = etree.fromstring(html_body, parser)
|
|
|
|
# The data was successfully parsed, but no tree was found.
|
|
if tree is None:
|
|
return
|
|
|
|
# Attempt to find interesting URLs (images, videos, embeds).
|
|
if "og:image" not in open_graph_response:
|
|
image_urls = _fetch_urls(tree, "img")
|
|
if image_urls:
|
|
open_graph_response["og:image"] = image_urls[0]
|
|
|
|
video_urls = _fetch_urls(tree, "video") + _fetch_urls(tree, "embed")
|
|
if video_urls:
|
|
open_graph_response["og:video"] = video_urls[0]
|
|
|
|
description = parse_html_description(tree)
|
|
if description:
|
|
open_graph_response["og:description"] = description
|