Move HTML parsing to a separate file for URL previews. (#11566)

* Splits the logic for parsing HTML from the resource handling code.
* Fix a circular import in the oEmbed code (which uses the HTML parsing code).
* Renames some of the HTML parsing methods to:
  * Make it clear which methods are "internal" to the module.
  * Clarify what the methods do.
This commit is contained in:
Patrick Cloke 2021-12-13 12:55:07 -05:00 committed by GitHub
parent 5305a5e881
commit eb39da6782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 432 additions and 401 deletions

1
changelog.d/11566.misc Normal file
View File

@ -0,0 +1 @@
Split the HTML parsing code from the URL preview resource code.

View File

@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, List, Optional
import attr
from synapse.rest.media.v1.preview_html import parse_html_description
from synapse.types import JsonDict
from synapse.util import json_decoder
@ -245,8 +246,6 @@ def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) ->
if video_urls:
open_graph_response["og:video"] = video_urls[0]
from synapse.rest.media.v1.preview_url_resource import _calc_description
description = _calc_description(tree)
description = parse_html_description(tree)
if description:
open_graph_response["og:description"] = description

View File

@ -0,0 +1,397 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import codecs
import itertools
import logging
import re
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Set, Union
from urllib import parse as urlparse
if TYPE_CHECKING:
from lxml import etree
logger = logging.getLogger(__name__)
_charset_match = re.compile(
br'<\s*meta[^>]*charset\s*=\s*"?([a-z0-9_-]+)"?', flags=re.I
)
_xml_encoding_match = re.compile(
br'\s*<\s*\?\s*xml[^>]*encoding="([a-z0-9_-]+)"', flags=re.I
)
_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
def _normalise_encoding(encoding: str) -> Optional[str]:
"""Use the Python codec's name as the normalised entry."""
try:
return codecs.lookup(encoding).name
except LookupError:
return None
def _get_html_media_encodings(
body: bytes, content_type: Optional[str]
) -> Iterable[str]:
"""
Get potential encoding of the body based on the (presumably) HTML body or the content-type header.
The precedence used for finding a character encoding is:
1. <meta> tag with a charset declared.
2. The XML document's character encoding attribute.
3. The Content-Type header.
4. Fallback to utf-8.
5. Fallback to windows-1252.
This roughly follows the algorithm used by BeautifulSoup's bs4.dammit.EncodingDetector.
Args:
body: The HTML document, as bytes.
content_type: The Content-Type header.
Returns:
The character encoding of the body, as a string.
"""
# There's no point in returning an encoding more than once.
attempted_encodings: Set[str] = set()
# Limit searches to the first 1kb, since it ought to be at the top.
body_start = body[:1024]
# Check if it has an encoding set in a meta tag.
match = _charset_match.search(body_start)
if match:
encoding = _normalise_encoding(match.group(1).decode("ascii"))
if encoding:
attempted_encodings.add(encoding)
yield encoding
# TODO Support <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
# Check if it has an XML document with an encoding.
match = _xml_encoding_match.match(body_start)
if match:
encoding = _normalise_encoding(match.group(1).decode("ascii"))
if encoding and encoding not in attempted_encodings:
attempted_encodings.add(encoding)
yield encoding
# Check the HTTP Content-Type header for a character set.
if content_type:
content_match = _content_type_match.match(content_type)
if content_match:
encoding = _normalise_encoding(content_match.group(1))
if encoding and encoding not in attempted_encodings:
attempted_encodings.add(encoding)
yield encoding
# Finally, fallback to UTF-8, then windows-1252.
for fallback in ("utf-8", "cp1252"):
if fallback not in attempted_encodings:
yield fallback
def decode_body(
body: bytes, uri: str, content_type: Optional[str] = None
) -> Optional["etree.Element"]:
"""
This uses lxml to parse the HTML document.
Args:
body: The HTML document, as bytes.
uri: The URI used to download the body.
content_type: The Content-Type header.
Returns:
The parsed HTML body, or None if an error occurred during processed.
"""
# If there's no body, nothing useful is going to be found.
if not body:
return None
# The idea here is that multiple encodings are tried until one works.
# Unfortunately the result is never used and then LXML will decode the string
# again with the found encoding.
for encoding in _get_html_media_encodings(body, content_type):
try:
body.decode(encoding)
except Exception:
pass
else:
break
else:
logger.warning("Unable to decode HTML body for %s", uri)
return None
from lxml import etree
# Create an HTML parser.
parser = etree.HTMLParser(recover=True, encoding=encoding)
# Attempt to parse the body. Returns None if the body was successfully
# parsed, but no tree was found.
return etree.fromstring(body, parser)
def parse_html_to_open_graph(
tree: "etree.Element", media_uri: str
) -> Dict[str, Optional[str]]:
"""
Parse the HTML document into an Open Graph response.
This uses lxml to search the HTML document for Open Graph data (or
synthesizes it from the document).
Args:
tree: The parsed HTML document.
media_url: The URI used to download the body.
Returns:
The Open Graph response as a dictionary.
"""
# if we see any image URLs in the OG response, then spider them
# (although the client could choose to do this by asking for previews of those
# URLs to avoid DoSing the server)
# "og:type" : "video",
# "og:url" : "https://www.youtube.com/watch?v=LXDBoHyjmtw",
# "og:site_name" : "YouTube",
# "og:video:type" : "application/x-shockwave-flash",
# "og:description" : "Fun stuff happening here",
# "og:title" : "RemoteJam - Matrix team hack for Disrupt Europe Hackathon",
# "og:image" : "https://i.ytimg.com/vi/LXDBoHyjmtw/maxresdefault.jpg",
# "og:video:url" : "http://www.youtube.com/v/LXDBoHyjmtw?version=3&autohide=1",
# "og:video:width" : "1280"
# "og:video:height" : "720",
# "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
og: Dict[str, Optional[str]] = {}
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
if "content" in tag.attrib:
# if we've got more than 50 tags, someone is taking the piss
if len(og) >= 50:
logger.warning("Skipping OG for page with too many 'og:' tags")
return {}
og[tag.attrib["property"]] = tag.attrib["content"]
# TODO: grab article: meta tags too, e.g.:
# "article:publisher" : "https://www.facebook.com/thethudonline" />
# "article:author" content="https://www.facebook.com/thethudonline" />
# "article:tag" content="baby" />
# "article:section" content="Breaking News" />
# "article:published_time" content="2016-03-31T19:58:24+00:00" />
# "article:modified_time" content="2016-04-01T18:31:53+00:00" />
if "og:title" not in og:
# do some basic spidering of the HTML
title = tree.xpath("(//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1]")
if title and title[0].text is not None:
og["og:title"] = title[0].text.strip()
else:
og["og:title"] = None
if "og:image" not in og:
# TODO: extract a favicon failing all else
meta_image = tree.xpath(
"//*/meta[translate(@itemprop, 'IMAGE', 'image')='image']/@content"
)
if meta_image:
og["og:image"] = rebase_url(meta_image[0], media_uri)
else:
# TODO: consider inlined CSS styles as well as width & height attribs
images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
images = sorted(
images,
key=lambda i: (
-1 * float(i.attrib["width"]) * float(i.attrib["height"])
),
)
if not images:
images = tree.xpath("//img[@src]")
if images:
og["og:image"] = images[0].attrib["src"]
if "og:description" not in og:
meta_description = tree.xpath(
"//*/meta"
"[translate(@name, 'DESCRIPTION', 'description')='description']"
"/@content"
)
if meta_description:
og["og:description"] = meta_description[0]
else:
og["og:description"] = parse_html_description(tree)
elif og["og:description"]:
# This must be a non-empty string at this point.
assert isinstance(og["og:description"], str)
og["og:description"] = summarize_paragraphs([og["og:description"]])
# TODO: delete the url downloads to stop diskfilling,
# as we only ever cared about its OG
return og
def parse_html_description(tree: "etree.Element") -> Optional[str]:
"""
Calculate a text description based on an HTML document.
Grabs any text nodes which are inside the <body/> tag, unless they are within
an HTML5 semantic markup tag (<header/>, <nav/>, <aside/>, <footer/>), or
if they are within a <script/> or <style/> tag.
This is a very very very coarse approximation to a plain text render of the page.
Args:
tree: The parsed HTML document.
Returns:
The plain text description, or None if one cannot be generated.
"""
# We don't just use XPATH here as that is slow on some machines.
from lxml import etree
TAGS_TO_REMOVE = (
"header",
"nav",
"aside",
"footer",
"script",
"noscript",
"style",
etree.Comment,
)
# Split all the text nodes into paragraphs (by splitting on new
# lines)
text_nodes = (
re.sub(r"\s+", "\n", el).strip()
for el in _iterate_over_text(tree.find("body"), *TAGS_TO_REMOVE)
)
return summarize_paragraphs(text_nodes)
def _iterate_over_text(
tree: "etree.Element", *tags_to_ignore: Iterable[Union[str, "etree.Comment"]]
) -> Generator[str, None, None]:
"""Iterate over the tree returning text nodes in a depth first fashion,
skipping text nodes inside certain tags.
"""
# This is basically a stack that we extend using itertools.chain.
# This will either consist of an element to iterate over *or* a string
# to be returned.
elements = iter([tree])
while True:
el = next(elements, None)
if el is None:
return
if isinstance(el, str):
yield el
elif el.tag not in tags_to_ignore:
# el.text is the text before the first child, so we can immediately
# return it if the text exists.
if el.text:
yield el.text
# We add to the stack all the elements children, interspersed with
# each child's tail text (if it exists). The tail text of a node
# is text that comes *after* the node, so we always include it even
# if we ignore the child node.
elements = itertools.chain(
itertools.chain.from_iterable( # Basically a flatmap
[child, child.tail] if child.tail else [child]
for child in el.iterchildren()
),
elements,
)
def rebase_url(url: str, base: str) -> str:
base_parts = list(urlparse.urlparse(base))
url_parts = list(urlparse.urlparse(url))
if not url_parts[0]: # fix up schema
url_parts[0] = base_parts[0] or "http"
if not url_parts[1]: # fix up hostname
url_parts[1] = base_parts[1]
if not url_parts[2].startswith("/"):
url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts[2]) + url_parts[2]
return urlparse.urlunparse(url_parts)
def summarize_paragraphs(
text_nodes: Iterable[str], min_size: int = 200, max_size: int = 500
) -> Optional[str]:
"""
Try to get a summary respecting first paragraph and then word boundaries.
Args:
text_nodes: The paragraphs to summarize.
min_size: The minimum number of words to include.
max_size: The maximum number of words to include.
Returns:
A summary of the text nodes, or None if that was not possible.
"""
# TODO: Respect sentences?
description = ""
# Keep adding paragraphs until we get to the MIN_SIZE.
for text_node in text_nodes:
if len(description) < min_size:
text_node = re.sub(r"[\t \r\n]+", " ", text_node)
description += text_node + "\n\n"
else:
break
description = description.strip()
description = re.sub(r"[\t ]+", " ", description)
description = re.sub(r"[\t \r\n]*[\r\n]+", "\n\n", description)
# If the concatenation of paragraphs to get above MIN_SIZE
# took us over MAX_SIZE, then we need to truncate mid paragraph
if len(description) > max_size:
new_desc = ""
# This splits the paragraph into words, but keeping the
# (preceding) whitespace intact so we can easily concat
# words back together.
for match in re.finditer(r"\s*\S+", description):
word = match.group()
# Keep adding words while the total length is less than
# MAX_SIZE.
if len(word) + len(new_desc) < max_size:
new_desc += word
else:
# At this point the next word *will* take us over
# MAX_SIZE, but we also want to ensure that its not
# a huge word. If it is add it anyway and we'll
# truncate later.
if len(new_desc) < min_size:
new_desc += word
break
# Double check that we're not over the limit
if len(new_desc) > max_size:
new_desc = new_desc[:max_size]
# We always add an ellipsis because at the very least
# we chopped mid paragraph.
description = new_desc.strip() + ""
return description if description else None

View File

@ -12,18 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import codecs
import datetime
import errno
import fnmatch
import itertools
import logging
import os
import re
import shutil
import sys
import traceback
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Set, Tuple, Union
from typing import TYPE_CHECKING, Iterable, Optional, Tuple
from urllib import parse as urlparse
import attr
@ -45,6 +43,11 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.media.v1._base import get_filename_from_headers
from synapse.rest.media.v1.media_storage import MediaStorage
from synapse.rest.media.v1.oembed import OEmbedProvider
from synapse.rest.media.v1.preview_html import (
decode_body,
parse_html_to_open_graph,
rebase_url,
)
from synapse.types import JsonDict, UserID
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
@ -54,21 +57,11 @@ from synapse.util.stringutils import random_string
from ._base import FileInfo
if TYPE_CHECKING:
from lxml import etree
from synapse.rest.media.v1.media_repository import MediaRepository
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
_charset_match = re.compile(
br'<\s*meta[^>]*charset\s*=\s*"?([a-z0-9_-]+)"?', flags=re.I
)
_xml_encoding_match = re.compile(
br'\s*<\s*\?\s*xml[^>]*encoding="([a-z0-9_-]+)"', flags=re.I
)
_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
OG_TAG_NAME_MAXLEN = 50
OG_TAG_VALUE_MAXLEN = 1000
@ -311,7 +304,7 @@ class PreviewUrlResource(DirectServeJsonResource):
# If there was no oEmbed URL (or oEmbed parsing failed), attempt
# to generate the Open Graph information from the HTML.
if not oembed_url or not og:
og = _calc_og(tree, media_info.uri)
og = parse_html_to_open_graph(tree, media_info.uri)
await self._precache_image_url(user, media_info, og)
else:
@ -468,7 +461,7 @@ class PreviewUrlResource(DirectServeJsonResource):
# request itself and benefit from the same caching etc. But for now we
# just rely on the caching on the master request to speed things up.
image_info = await self._download_url(
_rebase_url(og["og:image"], media_info.uri), user
rebase_url(og["og:image"], media_info.uri), user
)
if _is_media(image_info.media_type):
@ -632,301 +625,6 @@ class PreviewUrlResource(DirectServeJsonResource):
logger.debug("No media removed from url cache")
def _normalise_encoding(encoding: str) -> Optional[str]:
"""Use the Python codec's name as the normalised entry."""
try:
return codecs.lookup(encoding).name
except LookupError:
return None
def get_html_media_encodings(body: bytes, content_type: Optional[str]) -> Iterable[str]:
"""
Get potential encoding of the body based on the (presumably) HTML body or the content-type header.
The precedence used for finding a character encoding is:
1. <meta> tag with a charset declared.
2. The XML document's character encoding attribute.
3. The Content-Type header.
4. Fallback to utf-8.
5. Fallback to windows-1252.
This roughly follows the algorithm used by BeautifulSoup's bs4.dammit.EncodingDetector.
Args:
body: The HTML document, as bytes.
content_type: The Content-Type header.
Returns:
The character encoding of the body, as a string.
"""
# There's no point in returning an encoding more than once.
attempted_encodings: Set[str] = set()
# Limit searches to the first 1kb, since it ought to be at the top.
body_start = body[:1024]
# Check if it has an encoding set in a meta tag.
match = _charset_match.search(body_start)
if match:
encoding = _normalise_encoding(match.group(1).decode("ascii"))
if encoding:
attempted_encodings.add(encoding)
yield encoding
# TODO Support <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
# Check if it has an XML document with an encoding.
match = _xml_encoding_match.match(body_start)
if match:
encoding = _normalise_encoding(match.group(1).decode("ascii"))
if encoding and encoding not in attempted_encodings:
attempted_encodings.add(encoding)
yield encoding
# Check the HTTP Content-Type header for a character set.
if content_type:
content_match = _content_type_match.match(content_type)
if content_match:
encoding = _normalise_encoding(content_match.group(1))
if encoding and encoding not in attempted_encodings:
attempted_encodings.add(encoding)
yield encoding
# Finally, fallback to UTF-8, then windows-1252.
for fallback in ("utf-8", "cp1252"):
if fallback not in attempted_encodings:
yield fallback
def decode_body(
body: bytes, uri: str, content_type: Optional[str] = None
) -> Optional["etree.Element"]:
"""
This uses lxml to parse the HTML document.
Args:
body: The HTML document, as bytes.
uri: The URI used to download the body.
content_type: The Content-Type header.
Returns:
The parsed HTML body, or None if an error occurred during processed.
"""
# If there's no body, nothing useful is going to be found.
if not body:
return None
# The idea here is that multiple encodings are tried until one works.
# Unfortunately the result is never used and then LXML will decode the string
# again with the found encoding.
for encoding in get_html_media_encodings(body, content_type):
try:
body.decode(encoding)
except Exception:
pass
else:
break
else:
logger.warning("Unable to decode HTML body for %s", uri)
return None
from lxml import etree
# Create an HTML parser.
parser = etree.HTMLParser(recover=True, encoding=encoding)
# Attempt to parse the body. Returns None if the body was successfully
# parsed, but no tree was found.
return etree.fromstring(body, parser)
def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]:
"""
Calculate metadata for an HTML document.
This uses lxml to search the HTML document for Open Graph data.
Args:
tree: The parsed HTML document.
media_url: The URI used to download the body.
Returns:
The Open Graph response as a dictionary.
"""
# if we see any image URLs in the OG response, then spider them
# (although the client could choose to do this by asking for previews of those
# URLs to avoid DoSing the server)
# "og:type" : "video",
# "og:url" : "https://www.youtube.com/watch?v=LXDBoHyjmtw",
# "og:site_name" : "YouTube",
# "og:video:type" : "application/x-shockwave-flash",
# "og:description" : "Fun stuff happening here",
# "og:title" : "RemoteJam - Matrix team hack for Disrupt Europe Hackathon",
# "og:image" : "https://i.ytimg.com/vi/LXDBoHyjmtw/maxresdefault.jpg",
# "og:video:url" : "http://www.youtube.com/v/LXDBoHyjmtw?version=3&autohide=1",
# "og:video:width" : "1280"
# "og:video:height" : "720",
# "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
og: Dict[str, Optional[str]] = {}
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
if "content" in tag.attrib:
# if we've got more than 50 tags, someone is taking the piss
if len(og) >= 50:
logger.warning("Skipping OG for page with too many 'og:' tags")
return {}
og[tag.attrib["property"]] = tag.attrib["content"]
# TODO: grab article: meta tags too, e.g.:
# "article:publisher" : "https://www.facebook.com/thethudonline" />
# "article:author" content="https://www.facebook.com/thethudonline" />
# "article:tag" content="baby" />
# "article:section" content="Breaking News" />
# "article:published_time" content="2016-03-31T19:58:24+00:00" />
# "article:modified_time" content="2016-04-01T18:31:53+00:00" />
if "og:title" not in og:
# do some basic spidering of the HTML
title = tree.xpath("(//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1]")
if title and title[0].text is not None:
og["og:title"] = title[0].text.strip()
else:
og["og:title"] = None
if "og:image" not in og:
# TODO: extract a favicon failing all else
meta_image = tree.xpath(
"//*/meta[translate(@itemprop, 'IMAGE', 'image')='image']/@content"
)
if meta_image:
og["og:image"] = _rebase_url(meta_image[0], media_uri)
else:
# TODO: consider inlined CSS styles as well as width & height attribs
images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
images = sorted(
images,
key=lambda i: (
-1 * float(i.attrib["width"]) * float(i.attrib["height"])
),
)
if not images:
images = tree.xpath("//img[@src]")
if images:
og["og:image"] = images[0].attrib["src"]
if "og:description" not in og:
meta_description = tree.xpath(
"//*/meta"
"[translate(@name, 'DESCRIPTION', 'description')='description']"
"/@content"
)
if meta_description:
og["og:description"] = meta_description[0]
else:
og["og:description"] = _calc_description(tree)
elif og["og:description"]:
# This must be a non-empty string at this point.
assert isinstance(og["og:description"], str)
og["og:description"] = summarize_paragraphs([og["og:description"]])
# TODO: delete the url downloads to stop diskfilling,
# as we only ever cared about its OG
return og
def _calc_description(tree: "etree.Element") -> Optional[str]:
"""
Calculate a text description based on an HTML document.
Grabs any text nodes which are inside the <body/> tag, unless they are within
an HTML5 semantic markup tag (<header/>, <nav/>, <aside/>, <footer/>), or
if they are within a <script/> or <style/> tag.
This is a very very very coarse approximation to a plain text render of the page.
Args:
tree: The parsed HTML document.
Returns:
The plain text description, or None if one cannot be generated.
"""
# We don't just use XPATH here as that is slow on some machines.
from lxml import etree
TAGS_TO_REMOVE = (
"header",
"nav",
"aside",
"footer",
"script",
"noscript",
"style",
etree.Comment,
)
# Split all the text nodes into paragraphs (by splitting on new
# lines)
text_nodes = (
re.sub(r"\s+", "\n", el).strip()
for el in _iterate_over_text(tree.find("body"), *TAGS_TO_REMOVE)
)
return summarize_paragraphs(text_nodes)
def _iterate_over_text(
tree: "etree.Element", *tags_to_ignore: Iterable[Union[str, "etree.Comment"]]
) -> Generator[str, None, None]:
"""Iterate over the tree returning text nodes in a depth first fashion,
skipping text nodes inside certain tags.
"""
# This is basically a stack that we extend using itertools.chain.
# This will either consist of an element to iterate over *or* a string
# to be returned.
elements = iter([tree])
while True:
el = next(elements, None)
if el is None:
return
if isinstance(el, str):
yield el
elif el.tag not in tags_to_ignore:
# el.text is the text before the first child, so we can immediately
# return it if the text exists.
if el.text:
yield el.text
# We add to the stack all the elements children, interspersed with
# each child's tail text (if it exists). The tail text of a node
# is text that comes *after* the node, so we always include it even
# if we ignore the child node.
elements = itertools.chain(
itertools.chain.from_iterable( # Basically a flatmap
[child, child.tail] if child.tail else [child]
for child in el.iterchildren()
),
elements,
)
def _rebase_url(url: str, base: str) -> str:
base_parts = list(urlparse.urlparse(base))
url_parts = list(urlparse.urlparse(url))
if not url_parts[0]: # fix up schema
url_parts[0] = base_parts[0] or "http"
if not url_parts[1]: # fix up hostname
url_parts[1] = base_parts[1]
if not url_parts[2].startswith("/"):
url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts[2]) + url_parts[2]
return urlparse.urlunparse(url_parts)
def _is_media(content_type: str) -> bool:
return content_type.lower().startswith("image/")
@ -940,68 +638,3 @@ def _is_html(content_type: str) -> bool:
def _is_json(content_type: str) -> bool:
return content_type.lower().startswith("application/json")
def summarize_paragraphs(
text_nodes: Iterable[str], min_size: int = 200, max_size: int = 500
) -> Optional[str]:
"""
Try to get a summary respecting first paragraph and then word boundaries.
Args:
text_nodes: The paragraphs to summarize.
min_size: The minimum number of words to include.
max_size: The maximum number of words to include.
Returns:
A summary of the text nodes, or None if that was not possible.
"""
# TODO: Respect sentences?
description = ""
# Keep adding paragraphs until we get to the MIN_SIZE.
for text_node in text_nodes:
if len(description) < min_size:
text_node = re.sub(r"[\t \r\n]+", " ", text_node)
description += text_node + "\n\n"
else:
break
description = description.strip()
description = re.sub(r"[\t ]+", " ", description)
description = re.sub(r"[\t \r\n]*[\r\n]+", "\n\n", description)
# If the concatenation of paragraphs to get above MIN_SIZE
# took us over MAX_SIZE, then we need to truncate mid paragraph
if len(description) > max_size:
new_desc = ""
# This splits the paragraph into words, but keeping the
# (preceding) whitespace intact so we can easily concat
# words back together.
for match in re.finditer(r"\s*\S+", description):
word = match.group()
# Keep adding words while the total length is less than
# MAX_SIZE.
if len(word) + len(new_desc) < max_size:
new_desc += word
else:
# At this point the next word *will* take us over
# MAX_SIZE, but we also want to ensure that its not
# a huge word. If it is add it anyway and we'll
# truncate later.
if len(new_desc) < min_size:
new_desc += word
break
# Double check that we're not over the limit
if len(new_desc) > max_size:
new_desc = new_desc[:max_size]
# We always add an ellipsis because at the very least
# we chopped mid paragraph.
description = new_desc.strip() + ""
return description if description else None

View File

@ -1,4 +1,5 @@
# Copyright 2018 New Vector Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.rest.media.v1.preview_url_resource import (
_calc_og,
from synapse.rest.media.v1.preview_html import (
_get_html_media_encodings,
decode_body,
get_html_media_encodings,
parse_html_to_open_graph,
summarize_paragraphs,
)
@ -160,7 +160,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
@ -176,7 +176,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
@ -195,7 +195,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(
og,
@ -217,7 +217,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
@ -231,7 +231,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": None, "og:description": "Some text."})
@ -246,7 +246,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Title", "og:description": "Some text."})
@ -261,7 +261,7 @@ class CalcOgTestCase(unittest.TestCase):
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": None, "og:description": "Some text."})
@ -289,7 +289,7 @@ class CalcOgTestCase(unittest.TestCase):
<head><title>Foo</title></head><body>Some text.</body></html>
""".strip()
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
def test_invalid_encoding(self):
@ -303,7 +303,7 @@ class CalcOgTestCase(unittest.TestCase):
</html>
"""
tree = decode_body(html, "http://example.com/test.html", "invalid-encoding")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
def test_invalid_encoding2(self):
@ -318,7 +318,7 @@ class CalcOgTestCase(unittest.TestCase):
</html>
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "ÿÿ Foo", "og:description": "Some text."})
def test_windows_1252(self):
@ -332,14 +332,14 @@ class CalcOgTestCase(unittest.TestCase):
</html>
"""
tree = decode_body(html, "http://example.com/test.html")
og = _calc_og(tree, "http://example.com/test.html")
og = parse_html_to_open_graph(tree, "http://example.com/test.html")
self.assertEqual(og, {"og:title": "ó", "og:description": "Some text."})
class MediaEncodingTestCase(unittest.TestCase):
def test_meta_charset(self):
"""A character encoding is found via the meta tag."""
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<html>
<head><meta charset="ascii">
@ -351,7 +351,7 @@ class MediaEncodingTestCase(unittest.TestCase):
self.assertEqual(list(encodings), ["ascii", "utf-8", "cp1252"])
# A less well-formed version.
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<html>
<head>< meta charset = ascii>
@ -364,7 +364,7 @@ class MediaEncodingTestCase(unittest.TestCase):
def test_meta_charset_underscores(self):
"""A character encoding contains underscore."""
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<html>
<head><meta charset="Shift_JIS">
@ -377,7 +377,7 @@ class MediaEncodingTestCase(unittest.TestCase):
def test_xml_encoding(self):
"""A character encoding is found via the meta tag."""
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<?xml version="1.0" encoding="ascii"?>
<html>
@ -389,7 +389,7 @@ class MediaEncodingTestCase(unittest.TestCase):
def test_meta_xml_encoding(self):
"""Meta tags take precedence over XML encoding."""
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<?xml version="1.0" encoding="ascii"?>
<html>
@ -413,17 +413,17 @@ class MediaEncodingTestCase(unittest.TestCase):
'text/html; charset=ascii";',
)
for header in headers:
encodings = get_html_media_encodings(b"", header)
encodings = _get_html_media_encodings(b"", header)
self.assertEqual(list(encodings), ["ascii", "utf-8", "cp1252"])
def test_fallback(self):
"""A character encoding cannot be found in the body or header."""
encodings = get_html_media_encodings(b"", "text/html")
encodings = _get_html_media_encodings(b"", "text/html")
self.assertEqual(list(encodings), ["utf-8", "cp1252"])
def test_duplicates(self):
"""Ensure each encoding is only attempted once."""
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<?xml version="1.0" encoding="utf8"?>
<html>
@ -437,7 +437,7 @@ class MediaEncodingTestCase(unittest.TestCase):
def test_unknown_invalid(self):
"""A character encoding should be ignored if it is unknown or invalid."""
encodings = get_html_media_encodings(
encodings = _get_html_media_encodings(
b"""
<html>
<head><meta charset="invalid">