From eb39da6782b57c939450839097f32a14cba3ebfc Mon Sep 17 00:00:00 2001
From: Patrick Cloke
Date: Mon, 13 Dec 2021 12:55:07 -0500
Subject: [PATCH] Move HTML parsing to a separate file for URL previews.
(#11566)
* Splits the logic for parsing HTML from the resource handling code.
* Fix a circular import in the oEmbed code (which uses the HTML parsing code).
* Renames some of the HTML parsing methods to:
* Make it clear which methods are "internal" to the module.
* Clarify what the methods do.
---
changelog.d/11566.misc | 1 +
synapse/rest/media/v1/oembed.py | 5 +-
synapse/rest/media/v1/preview_html.py | 397 ++++++++++++++++++
synapse/rest/media/v1/preview_url_resource.py | 383 +----------------
tests/rest/media/v1/test_url_preview.py | 1 +
tests/test_preview.py | 46 +-
6 files changed, 432 insertions(+), 401 deletions(-)
create mode 100644 changelog.d/11566.misc
create mode 100644 synapse/rest/media/v1/preview_html.py
diff --git a/changelog.d/11566.misc b/changelog.d/11566.misc
new file mode 100644
index 000000000..c48e73cd4
--- /dev/null
+++ b/changelog.d/11566.misc
@@ -0,0 +1 @@
+Split the HTML parsing code from the URL preview resource code.
diff --git a/synapse/rest/media/v1/oembed.py b/synapse/rest/media/v1/oembed.py
index 2a59552c2..cce1527ed 100644
--- a/synapse/rest/media/v1/oembed.py
+++ b/synapse/rest/media/v1/oembed.py
@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, List, Optional
import attr
+from synapse.rest.media.v1.preview_html import parse_html_description
from synapse.types import JsonDict
from synapse.util import json_decoder
@@ -245,8 +246,6 @@ def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) ->
if video_urls:
open_graph_response["og:video"] = video_urls[0]
- from synapse.rest.media.v1.preview_url_resource import _calc_description
-
- description = _calc_description(tree)
+ description = parse_html_description(tree)
if description:
open_graph_response["og:description"] = description
diff --git a/synapse/rest/media/v1/preview_html.py b/synapse/rest/media/v1/preview_html.py
new file mode 100644
index 000000000..30b067dd4
--- /dev/null
+++ b/synapse/rest/media/v1/preview_html.py
@@ -0,0 +1,397 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import codecs
+import itertools
+import logging
+import re
+from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Set, Union
+from urllib import parse as urlparse
+
+if TYPE_CHECKING:
+ from lxml import etree
+
+logger = logging.getLogger(__name__)
+
+_charset_match = re.compile(
+ br'<\s*meta[^>]*charset\s*=\s*"?([a-z0-9_-]+)"?', flags=re.I
+)
+_xml_encoding_match = re.compile(
+ br'\s*<\s*\?\s*xml[^>]*encoding="([a-z0-9_-]+)"', flags=re.I
+)
+_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
+
+
+def _normalise_encoding(encoding: str) -> Optional[str]:
+ """Use the Python codec's name as the normalised entry."""
+ try:
+ return codecs.lookup(encoding).name
+ except LookupError:
+ return None
+
+
+def _get_html_media_encodings(
+ body: bytes, content_type: Optional[str]
+) -> Iterable[str]:
+ """
+ Get potential encoding of the body based on the (presumably) HTML body or the content-type header.
+
+ The precedence used for finding a character encoding is:
+
+ 1. tag with a charset declared.
+ 2. The XML document's character encoding attribute.
+ 3. The Content-Type header.
+ 4. Fallback to utf-8.
+ 5. Fallback to windows-1252.
+
+ This roughly follows the algorithm used by BeautifulSoup's bs4.dammit.EncodingDetector.
+
+ Args:
+ body: The HTML document, as bytes.
+ content_type: The Content-Type header.
+
+ Returns:
+ The character encoding of the body, as a string.
+ """
+ # There's no point in returning an encoding more than once.
+ attempted_encodings: Set[str] = set()
+
+ # Limit searches to the first 1kb, since it ought to be at the top.
+ body_start = body[:1024]
+
+ # Check if it has an encoding set in a meta tag.
+ match = _charset_match.search(body_start)
+ if match:
+ encoding = _normalise_encoding(match.group(1).decode("ascii"))
+ if encoding:
+ attempted_encodings.add(encoding)
+ yield encoding
+
+ # TODO Support
+
+ # Check if it has an XML document with an encoding.
+ match = _xml_encoding_match.match(body_start)
+ if match:
+ encoding = _normalise_encoding(match.group(1).decode("ascii"))
+ if encoding and encoding not in attempted_encodings:
+ attempted_encodings.add(encoding)
+ yield encoding
+
+ # Check the HTTP Content-Type header for a character set.
+ if content_type:
+ content_match = _content_type_match.match(content_type)
+ if content_match:
+ encoding = _normalise_encoding(content_match.group(1))
+ if encoding and encoding not in attempted_encodings:
+ attempted_encodings.add(encoding)
+ yield encoding
+
+ # Finally, fallback to UTF-8, then windows-1252.
+ for fallback in ("utf-8", "cp1252"):
+ if fallback not in attempted_encodings:
+ yield fallback
+
+
+def decode_body(
+ body: bytes, uri: str, content_type: Optional[str] = None
+) -> Optional["etree.Element"]:
+ """
+ This uses lxml to parse the HTML document.
+
+ Args:
+ body: The HTML document, as bytes.
+ uri: The URI used to download the body.
+ content_type: The Content-Type header.
+
+ Returns:
+ The parsed HTML body, or None if an error occurred during processed.
+ """
+ # If there's no body, nothing useful is going to be found.
+ if not body:
+ return None
+
+ # The idea here is that multiple encodings are tried until one works.
+ # Unfortunately the result is never used and then LXML will decode the string
+ # again with the found encoding.
+ for encoding in _get_html_media_encodings(body, content_type):
+ try:
+ body.decode(encoding)
+ except Exception:
+ pass
+ else:
+ break
+ else:
+ logger.warning("Unable to decode HTML body for %s", uri)
+ return None
+
+ from lxml import etree
+
+ # Create an HTML parser.
+ parser = etree.HTMLParser(recover=True, encoding=encoding)
+
+ # Attempt to parse the body. Returns None if the body was successfully
+ # parsed, but no tree was found.
+ return etree.fromstring(body, parser)
+
+
+def parse_html_to_open_graph(
+ tree: "etree.Element", media_uri: str
+) -> Dict[str, Optional[str]]:
+ """
+ Parse the HTML document into an Open Graph response.
+
+ This uses lxml to search the HTML document for Open Graph data (or
+ synthesizes it from the document).
+
+ Args:
+ tree: The parsed HTML document.
+ media_url: The URI used to download the body.
+
+ Returns:
+ The Open Graph response as a dictionary.
+ """
+
+ # if we see any image URLs in the OG response, then spider them
+ # (although the client could choose to do this by asking for previews of those
+ # URLs to avoid DoSing the server)
+
+ # "og:type" : "video",
+ # "og:url" : "https://www.youtube.com/watch?v=LXDBoHyjmtw",
+ # "og:site_name" : "YouTube",
+ # "og:video:type" : "application/x-shockwave-flash",
+ # "og:description" : "Fun stuff happening here",
+ # "og:title" : "RemoteJam - Matrix team hack for Disrupt Europe Hackathon",
+ # "og:image" : "https://i.ytimg.com/vi/LXDBoHyjmtw/maxresdefault.jpg",
+ # "og:video:url" : "http://www.youtube.com/v/LXDBoHyjmtw?version=3&autohide=1",
+ # "og:video:width" : "1280"
+ # "og:video:height" : "720",
+ # "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
+
+ og: Dict[str, Optional[str]] = {}
+ for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
+ if "content" in tag.attrib:
+ # if we've got more than 50 tags, someone is taking the piss
+ if len(og) >= 50:
+ logger.warning("Skipping OG for page with too many 'og:' tags")
+ return {}
+ og[tag.attrib["property"]] = tag.attrib["content"]
+
+ # TODO: grab article: meta tags too, e.g.:
+
+ # "article:publisher" : "https://www.facebook.com/thethudonline" />
+ # "article:author" content="https://www.facebook.com/thethudonline" />
+ # "article:tag" content="baby" />
+ # "article:section" content="Breaking News" />
+ # "article:published_time" content="2016-03-31T19:58:24+00:00" />
+ # "article:modified_time" content="2016-04-01T18:31:53+00:00" />
+
+ if "og:title" not in og:
+ # do some basic spidering of the HTML
+ title = tree.xpath("(//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1]")
+ if title and title[0].text is not None:
+ og["og:title"] = title[0].text.strip()
+ else:
+ og["og:title"] = None
+
+ if "og:image" not in og:
+ # TODO: extract a favicon failing all else
+ meta_image = tree.xpath(
+ "//*/meta[translate(@itemprop, 'IMAGE', 'image')='image']/@content"
+ )
+ if meta_image:
+ og["og:image"] = rebase_url(meta_image[0], media_uri)
+ else:
+ # TODO: consider inlined CSS styles as well as width & height attribs
+ images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
+ images = sorted(
+ images,
+ key=lambda i: (
+ -1 * float(i.attrib["width"]) * float(i.attrib["height"])
+ ),
+ )
+ if not images:
+ images = tree.xpath("//img[@src]")
+ if images:
+ og["og:image"] = images[0].attrib["src"]
+
+ if "og:description" not in og:
+ meta_description = tree.xpath(
+ "//*/meta"
+ "[translate(@name, 'DESCRIPTION', 'description')='description']"
+ "/@content"
+ )
+ if meta_description:
+ og["og:description"] = meta_description[0]
+ else:
+ og["og:description"] = parse_html_description(tree)
+ elif og["og:description"]:
+ # This must be a non-empty string at this point.
+ assert isinstance(og["og:description"], str)
+ og["og:description"] = summarize_paragraphs([og["og:description"]])
+
+ # TODO: delete the url downloads to stop diskfilling,
+ # as we only ever cared about its OG
+ return og
+
+
+def parse_html_description(tree: "etree.Element") -> Optional[str]:
+ """
+ Calculate a text description based on an HTML document.
+
+ Grabs any text nodes which are inside the tag, unless they are within
+ an HTML5 semantic markup tag (, , , ), or
+ if they are within a or tag.
+
+ This is a very very very coarse approximation to a plain text render of the page.
+
+ Args:
+ tree: The parsed HTML document.
+
+ Returns:
+ The plain text description, or None if one cannot be generated.
+ """
+ # We don't just use XPATH here as that is slow on some machines.
+
+ from lxml import etree
+
+ TAGS_TO_REMOVE = (
+ "header",
+ "nav",
+ "aside",
+ "footer",
+ "script",
+ "noscript",
+ "style",
+ etree.Comment,
+ )
+
+ # Split all the text nodes into paragraphs (by splitting on new
+ # lines)
+ text_nodes = (
+ re.sub(r"\s+", "\n", el).strip()
+ for el in _iterate_over_text(tree.find("body"), *TAGS_TO_REMOVE)
+ )
+ return summarize_paragraphs(text_nodes)
+
+
+def _iterate_over_text(
+ tree: "etree.Element", *tags_to_ignore: Iterable[Union[str, "etree.Comment"]]
+) -> Generator[str, None, None]:
+ """Iterate over the tree returning text nodes in a depth first fashion,
+ skipping text nodes inside certain tags.
+ """
+ # This is basically a stack that we extend using itertools.chain.
+ # This will either consist of an element to iterate over *or* a string
+ # to be returned.
+ elements = iter([tree])
+ while True:
+ el = next(elements, None)
+ if el is None:
+ return
+
+ if isinstance(el, str):
+ yield el
+ elif el.tag not in tags_to_ignore:
+ # el.text is the text before the first child, so we can immediately
+ # return it if the text exists.
+ if el.text:
+ yield el.text
+
+ # We add to the stack all the elements children, interspersed with
+ # each child's tail text (if it exists). The tail text of a node
+ # is text that comes *after* the node, so we always include it even
+ # if we ignore the child node.
+ elements = itertools.chain(
+ itertools.chain.from_iterable( # Basically a flatmap
+ [child, child.tail] if child.tail else [child]
+ for child in el.iterchildren()
+ ),
+ elements,
+ )
+
+
+def rebase_url(url: str, base: str) -> str:
+ base_parts = list(urlparse.urlparse(base))
+ url_parts = list(urlparse.urlparse(url))
+ if not url_parts[0]: # fix up schema
+ url_parts[0] = base_parts[0] or "http"
+ if not url_parts[1]: # fix up hostname
+ url_parts[1] = base_parts[1]
+ if not url_parts[2].startswith("/"):
+ url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts[2]) + url_parts[2]
+ return urlparse.urlunparse(url_parts)
+
+
+def summarize_paragraphs(
+ text_nodes: Iterable[str], min_size: int = 200, max_size: int = 500
+) -> Optional[str]:
+ """
+ Try to get a summary respecting first paragraph and then word boundaries.
+
+ Args:
+ text_nodes: The paragraphs to summarize.
+ min_size: The minimum number of words to include.
+ max_size: The maximum number of words to include.
+
+ Returns:
+ A summary of the text nodes, or None if that was not possible.
+ """
+
+ # TODO: Respect sentences?
+
+ description = ""
+
+ # Keep adding paragraphs until we get to the MIN_SIZE.
+ for text_node in text_nodes:
+ if len(description) < min_size:
+ text_node = re.sub(r"[\t \r\n]+", " ", text_node)
+ description += text_node + "\n\n"
+ else:
+ break
+
+ description = description.strip()
+ description = re.sub(r"[\t ]+", " ", description)
+ description = re.sub(r"[\t \r\n]*[\r\n]+", "\n\n", description)
+
+ # If the concatenation of paragraphs to get above MIN_SIZE
+ # took us over MAX_SIZE, then we need to truncate mid paragraph
+ if len(description) > max_size:
+ new_desc = ""
+
+ # This splits the paragraph into words, but keeping the
+ # (preceding) whitespace intact so we can easily concat
+ # words back together.
+ for match in re.finditer(r"\s*\S+", description):
+ word = match.group()
+
+ # Keep adding words while the total length is less than
+ # MAX_SIZE.
+ if len(word) + len(new_desc) < max_size:
+ new_desc += word
+ else:
+ # At this point the next word *will* take us over
+ # MAX_SIZE, but we also want to ensure that its not
+ # a huge word. If it is add it anyway and we'll
+ # truncate later.
+ if len(new_desc) < min_size:
+ new_desc += word
+ break
+
+ # Double check that we're not over the limit
+ if len(new_desc) > max_size:
+ new_desc = new_desc[:max_size]
+
+ # We always add an ellipsis because at the very least
+ # we chopped mid paragraph.
+ description = new_desc.strip() + "…"
+ return description if description else None
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 054f3c296..a3829d943 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -12,18 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import codecs
import datetime
import errno
import fnmatch
-import itertools
import logging
import os
import re
import shutil
import sys
import traceback
-from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Set, Tuple, Union
+from typing import TYPE_CHECKING, Iterable, Optional, Tuple
from urllib import parse as urlparse
import attr
@@ -45,6 +43,11 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.media.v1._base import get_filename_from_headers
from synapse.rest.media.v1.media_storage import MediaStorage
from synapse.rest.media.v1.oembed import OEmbedProvider
+from synapse.rest.media.v1.preview_html import (
+ decode_body,
+ parse_html_to_open_graph,
+ rebase_url,
+)
from synapse.types import JsonDict, UserID
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
@@ -54,21 +57,11 @@ from synapse.util.stringutils import random_string
from ._base import FileInfo
if TYPE_CHECKING:
- from lxml import etree
-
from synapse.rest.media.v1.media_repository import MediaRepository
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
-_charset_match = re.compile(
- br'<\s*meta[^>]*charset\s*=\s*"?([a-z0-9_-]+)"?', flags=re.I
-)
-_xml_encoding_match = re.compile(
- br'\s*<\s*\?\s*xml[^>]*encoding="([a-z0-9_-]+)"', flags=re.I
-)
-_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
-
OG_TAG_NAME_MAXLEN = 50
OG_TAG_VALUE_MAXLEN = 1000
@@ -311,7 +304,7 @@ class PreviewUrlResource(DirectServeJsonResource):
# If there was no oEmbed URL (or oEmbed parsing failed), attempt
# to generate the Open Graph information from the HTML.
if not oembed_url or not og:
- og = _calc_og(tree, media_info.uri)
+ og = parse_html_to_open_graph(tree, media_info.uri)
await self._precache_image_url(user, media_info, og)
else:
@@ -468,7 +461,7 @@ class PreviewUrlResource(DirectServeJsonResource):
# request itself and benefit from the same caching etc. But for now we
# just rely on the caching on the master request to speed things up.
image_info = await self._download_url(
- _rebase_url(og["og:image"], media_info.uri), user
+ rebase_url(og["og:image"], media_info.uri), user
)
if _is_media(image_info.media_type):
@@ -632,301 +625,6 @@ class PreviewUrlResource(DirectServeJsonResource):
logger.debug("No media removed from url cache")
-def _normalise_encoding(encoding: str) -> Optional[str]:
- """Use the Python codec's name as the normalised entry."""
- try:
- return codecs.lookup(encoding).name
- except LookupError:
- return None
-
-
-def get_html_media_encodings(body: bytes, content_type: Optional[str]) -> Iterable[str]:
- """
- Get potential encoding of the body based on the (presumably) HTML body or the content-type header.
-
- The precedence used for finding a character encoding is:
-
- 1. tag with a charset declared.
- 2. The XML document's character encoding attribute.
- 3. The Content-Type header.
- 4. Fallback to utf-8.
- 5. Fallback to windows-1252.
-
- This roughly follows the algorithm used by BeautifulSoup's bs4.dammit.EncodingDetector.
-
- Args:
- body: The HTML document, as bytes.
- content_type: The Content-Type header.
-
- Returns:
- The character encoding of the body, as a string.
- """
- # There's no point in returning an encoding more than once.
- attempted_encodings: Set[str] = set()
-
- # Limit searches to the first 1kb, since it ought to be at the top.
- body_start = body[:1024]
-
- # Check if it has an encoding set in a meta tag.
- match = _charset_match.search(body_start)
- if match:
- encoding = _normalise_encoding(match.group(1).decode("ascii"))
- if encoding:
- attempted_encodings.add(encoding)
- yield encoding
-
- # TODO Support
-
- # Check if it has an XML document with an encoding.
- match = _xml_encoding_match.match(body_start)
- if match:
- encoding = _normalise_encoding(match.group(1).decode("ascii"))
- if encoding and encoding not in attempted_encodings:
- attempted_encodings.add(encoding)
- yield encoding
-
- # Check the HTTP Content-Type header for a character set.
- if content_type:
- content_match = _content_type_match.match(content_type)
- if content_match:
- encoding = _normalise_encoding(content_match.group(1))
- if encoding and encoding not in attempted_encodings:
- attempted_encodings.add(encoding)
- yield encoding
-
- # Finally, fallback to UTF-8, then windows-1252.
- for fallback in ("utf-8", "cp1252"):
- if fallback not in attempted_encodings:
- yield fallback
-
-
-def decode_body(
- body: bytes, uri: str, content_type: Optional[str] = None
-) -> Optional["etree.Element"]:
- """
- This uses lxml to parse the HTML document.
-
- Args:
- body: The HTML document, as bytes.
- uri: The URI used to download the body.
- content_type: The Content-Type header.
-
- Returns:
- The parsed HTML body, or None if an error occurred during processed.
- """
- # If there's no body, nothing useful is going to be found.
- if not body:
- return None
-
- # The idea here is that multiple encodings are tried until one works.
- # Unfortunately the result is never used and then LXML will decode the string
- # again with the found encoding.
- for encoding in get_html_media_encodings(body, content_type):
- try:
- body.decode(encoding)
- except Exception:
- pass
- else:
- break
- else:
- logger.warning("Unable to decode HTML body for %s", uri)
- return None
-
- from lxml import etree
-
- # Create an HTML parser.
- parser = etree.HTMLParser(recover=True, encoding=encoding)
-
- # Attempt to parse the body. Returns None if the body was successfully
- # parsed, but no tree was found.
- return etree.fromstring(body, parser)
-
-
-def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]:
- """
- Calculate metadata for an HTML document.
-
- This uses lxml to search the HTML document for Open Graph data.
-
- Args:
- tree: The parsed HTML document.
- media_url: The URI used to download the body.
-
- Returns:
- The Open Graph response as a dictionary.
- """
-
- # if we see any image URLs in the OG response, then spider them
- # (although the client could choose to do this by asking for previews of those
- # URLs to avoid DoSing the server)
-
- # "og:type" : "video",
- # "og:url" : "https://www.youtube.com/watch?v=LXDBoHyjmtw",
- # "og:site_name" : "YouTube",
- # "og:video:type" : "application/x-shockwave-flash",
- # "og:description" : "Fun stuff happening here",
- # "og:title" : "RemoteJam - Matrix team hack for Disrupt Europe Hackathon",
- # "og:image" : "https://i.ytimg.com/vi/LXDBoHyjmtw/maxresdefault.jpg",
- # "og:video:url" : "http://www.youtube.com/v/LXDBoHyjmtw?version=3&autohide=1",
- # "og:video:width" : "1280"
- # "og:video:height" : "720",
- # "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
-
- og: Dict[str, Optional[str]] = {}
- for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
- if "content" in tag.attrib:
- # if we've got more than 50 tags, someone is taking the piss
- if len(og) >= 50:
- logger.warning("Skipping OG for page with too many 'og:' tags")
- return {}
- og[tag.attrib["property"]] = tag.attrib["content"]
-
- # TODO: grab article: meta tags too, e.g.:
-
- # "article:publisher" : "https://www.facebook.com/thethudonline" />
- # "article:author" content="https://www.facebook.com/thethudonline" />
- # "article:tag" content="baby" />
- # "article:section" content="Breaking News" />
- # "article:published_time" content="2016-03-31T19:58:24+00:00" />
- # "article:modified_time" content="2016-04-01T18:31:53+00:00" />
-
- if "og:title" not in og:
- # do some basic spidering of the HTML
- title = tree.xpath("(//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1]")
- if title and title[0].text is not None:
- og["og:title"] = title[0].text.strip()
- else:
- og["og:title"] = None
-
- if "og:image" not in og:
- # TODO: extract a favicon failing all else
- meta_image = tree.xpath(
- "//*/meta[translate(@itemprop, 'IMAGE', 'image')='image']/@content"
- )
- if meta_image:
- og["og:image"] = _rebase_url(meta_image[0], media_uri)
- else:
- # TODO: consider inlined CSS styles as well as width & height attribs
- images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
- images = sorted(
- images,
- key=lambda i: (
- -1 * float(i.attrib["width"]) * float(i.attrib["height"])
- ),
- )
- if not images:
- images = tree.xpath("//img[@src]")
- if images:
- og["og:image"] = images[0].attrib["src"]
-
- if "og:description" not in og:
- meta_description = tree.xpath(
- "//*/meta"
- "[translate(@name, 'DESCRIPTION', 'description')='description']"
- "/@content"
- )
- if meta_description:
- og["og:description"] = meta_description[0]
- else:
- og["og:description"] = _calc_description(tree)
- elif og["og:description"]:
- # This must be a non-empty string at this point.
- assert isinstance(og["og:description"], str)
- og["og:description"] = summarize_paragraphs([og["og:description"]])
-
- # TODO: delete the url downloads to stop diskfilling,
- # as we only ever cared about its OG
- return og
-
-
-def _calc_description(tree: "etree.Element") -> Optional[str]:
- """
- Calculate a text description based on an HTML document.
-
- Grabs any text nodes which are inside the
Some text.
tag, unless they are within
- an HTML5 semantic markup tag (, , , ), or
- if they are within a or tag.
-
- This is a very very very coarse approximation to a plain text render of the page.
-
- Args:
- tree: The parsed HTML document.
-
- Returns:
- The plain text description, or None if one cannot be generated.
- """
- # We don't just use XPATH here as that is slow on some machines.
-
- from lxml import etree
-
- TAGS_TO_REMOVE = (
- "header",
- "nav",
- "aside",
- "footer",
- "script",
- "noscript",
- "style",
- etree.Comment,
- )
-
- # Split all the text nodes into paragraphs (by splitting on new
- # lines)
- text_nodes = (
- re.sub(r"\s+", "\n", el).strip()
- for el in _iterate_over_text(tree.find("body"), *TAGS_TO_REMOVE)
- )
- return summarize_paragraphs(text_nodes)
-
-
-def _iterate_over_text(
- tree: "etree.Element", *tags_to_ignore: Iterable[Union[str, "etree.Comment"]]
-) -> Generator[str, None, None]:
- """Iterate over the tree returning text nodes in a depth first fashion,
- skipping text nodes inside certain tags.
- """
- # This is basically a stack that we extend using itertools.chain.
- # This will either consist of an element to iterate over *or* a string
- # to be returned.
- elements = iter([tree])
- while True:
- el = next(elements, None)
- if el is None:
- return
-
- if isinstance(el, str):
- yield el
- elif el.tag not in tags_to_ignore:
- # el.text is the text before the first child, so we can immediately
- # return it if the text exists.
- if el.text:
- yield el.text
-
- # We add to the stack all the elements children, interspersed with
- # each child's tail text (if it exists). The tail text of a node
- # is text that comes *after* the node, so we always include it even
- # if we ignore the child node.
- elements = itertools.chain(
- itertools.chain.from_iterable( # Basically a flatmap
- [child, child.tail] if child.tail else [child]
- for child in el.iterchildren()
- ),
- elements,
- )
-
-
-def _rebase_url(url: str, base: str) -> str:
- base_parts = list(urlparse.urlparse(base))
- url_parts = list(urlparse.urlparse(url))
- if not url_parts[0]: # fix up schema
- url_parts[0] = base_parts[0] or "http"
- if not url_parts[1]: # fix up hostname
- url_parts[1] = base_parts[1]
- if not url_parts[2].startswith("/"):
- url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts[2]) + url_parts[2]
- return urlparse.urlunparse(url_parts)
-
-
def _is_media(content_type: str) -> bool:
return content_type.lower().startswith("image/")
@@ -940,68 +638,3 @@ def _is_html(content_type: str) -> bool:
def _is_json(content_type: str) -> bool:
return content_type.lower().startswith("application/json")
-
-
-def summarize_paragraphs(
- text_nodes: Iterable[str], min_size: int = 200, max_size: int = 500
-) -> Optional[str]:
- """
- Try to get a summary respecting first paragraph and then word boundaries.
-
- Args:
- text_nodes: The paragraphs to summarize.
- min_size: The minimum number of words to include.
- max_size: The maximum number of words to include.
-
- Returns:
- A summary of the text nodes, or None if that was not possible.
- """
-
- # TODO: Respect sentences?
-
- description = ""
-
- # Keep adding paragraphs until we get to the MIN_SIZE.
- for text_node in text_nodes:
- if len(description) < min_size:
- text_node = re.sub(r"[\t \r\n]+", " ", text_node)
- description += text_node + "\n\n"
- else:
- break
-
- description = description.strip()
- description = re.sub(r"[\t ]+", " ", description)
- description = re.sub(r"[\t \r\n]*[\r\n]+", "\n\n", description)
-
- # If the concatenation of paragraphs to get above MIN_SIZE
- # took us over MAX_SIZE, then we need to truncate mid paragraph
- if len(description) > max_size:
- new_desc = ""
-
- # This splits the paragraph into words, but keeping the
- # (preceding) whitespace intact so we can easily concat
- # words back together.
- for match in re.finditer(r"\s*\S+", description):
- word = match.group()
-
- # Keep adding words while the total length is less than
- # MAX_SIZE.
- if len(word) + len(new_desc) < max_size:
- new_desc += word
- else:
- # At this point the next word *will* take us over
- # MAX_SIZE, but we also want to ensure that its not
- # a huge word. If it is add it anyway and we'll
- # truncate later.
- if len(new_desc) < min_size:
- new_desc += word
- break
-
- # Double check that we're not over the limit
- if len(new_desc) > max_size:
- new_desc = new_desc[:max_size]
-
- # We always add an ellipsis because at the very least
- # we chopped mid paragraph.
- description = new_desc.strip() + "…"
- return description if description else None
diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/v1/test_url_preview.py
index 8698135a7..16e904f15 100644
--- a/tests/rest/media/v1/test_url_preview.py
+++ b/tests/rest/media/v1/test_url_preview.py
@@ -1,4 +1,5 @@
# Copyright 2018 New Vector Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/tests/test_preview.py b/tests/test_preview.py
index 40b89fb2e..46e02f483 100644
--- a/tests/test_preview.py
+++ b/tests/test_preview.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.rest.media.v1.preview_url_resource import (
- _calc_og,
+from synapse.rest.media.v1.preview_html import (
+ _get_html_media_encodings,
decode_body,
- get_html_media_encodings,
+ parse_html_to_open_graph,
summarize_paragraphs,
)
@@ -160,7 +160,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
@@ -176,7 +176,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
@@ -195,7 +195,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(
og,
@@ -217,7 +217,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
@@ -231,7 +231,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": None, "og:description": "Some text."})
@@ -246,7 +246,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Title", "og:description": "Some text."})
@@ -261,7 +261,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
- og = _calc_og(tree, "http://example.com/test.html")
+ og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": None, "og:description": "Some text."})
@@ -289,7 +289,7 @@ class CalcOgTestCase(unittest.TestCase):