Autodiscover oEmbed endpoint from returned HTML (#10822)

Searches the returned HTML for an oEmbed endpoint using the
autodiscovery mechanism (`<link rel=...>`), and will request it
to generate the preview.
This commit is contained in:
Patrick Cloke 2021-10-08 14:14:42 -04:00 committed by GitHub
parent 593eeac19e
commit 1b112840d2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 222 additions and 53 deletions

View file

@ -22,7 +22,7 @@ import re
import shutil
import sys
import traceback
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Tuple, Union
from urllib import parse as urlparse
import attr
@ -296,22 +296,32 @@ class PreviewUrlResource(DirectServeJsonResource):
body = file.read()
encoding = get_html_media_encoding(body, media_info.media_type)
og = decode_and_calc_og(body, media_info.uri, encoding)
tree = decode_body(body, encoding)
if tree is not None:
# Check if this HTML document points to oEmbed information and
# defer to that.
oembed_url = self._oembed.autodiscover_from_html(tree)
og = {}
if oembed_url:
oembed_info = await self._download_url(oembed_url, user)
og, expiration_ms = await self._handle_oembed_response(
url, oembed_info, expiration_ms
)
await self._precache_image_url(user, media_info, og)
# If there was no oEmbed URL (or oEmbed parsing failed), attempt
# to generate the Open Graph information from the HTML.
if not oembed_url or not og:
og = _calc_og(tree, media_info.uri)
elif oembed_url and _is_json(media_info.media_type):
# Handle an oEmbed response.
with open(media_info.filename, "rb") as file:
body = file.read()
oembed_response = self._oembed.parse_oembed_response(url, body)
og = oembed_response.open_graph_result
# Use the cache age from the oEmbed result, instead of the HTTP response.
if oembed_response.cache_age is not None:
expiration_ms = oembed_response.cache_age
await self._precache_image_url(user, media_info, og)
else:
og = {}
elif oembed_url:
# Handle the oEmbed information.
og, expiration_ms = await self._handle_oembed_response(
url, media_info, expiration_ms
)
await self._precache_image_url(user, media_info, og)
else:
@ -479,6 +489,39 @@ class PreviewUrlResource(DirectServeJsonResource):
else:
del og["og:image"]
async def _handle_oembed_response(
self, url: str, media_info: MediaInfo, expiration_ms: int
) -> Tuple[JsonDict, int]:
"""
Parse the downloaded oEmbed info.
Args:
url: The URL which is being previewed (not the one which was
requested).
media_info: The media being previewed.
expiration_ms: The length of time, in milliseconds, the media is valid for.
Returns:
A tuple of:
The Open Graph dictionary, if the oEmbed info can be parsed.
The (possibly updated) length of time, in milliseconds, the media is valid for.
"""
# If JSON was not returned, there's nothing to do.
if not _is_json(media_info.media_type):
return {}, expiration_ms
with open(media_info.filename, "rb") as file:
body = file.read()
oembed_response = self._oembed.parse_oembed_response(url, body)
open_graph_result = oembed_response.open_graph_result
# Use the cache age from the oEmbed result, if one was given.
if open_graph_result and oembed_response.cache_age is not None:
expiration_ms = oembed_response.cache_age
return open_graph_result, expiration_ms
def _start_expire_url_cache_data(self) -> Deferred:
return run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data
@ -631,26 +674,22 @@ def get_html_media_encoding(body: bytes, content_type: str) -> str:
return "utf-8"
def decode_and_calc_og(
body: bytes, media_uri: str, request_encoding: Optional[str] = None
) -> JsonDict:
def decode_body(
body: bytes, request_encoding: Optional[str] = None
) -> Optional["etree.Element"]:
"""
Calculate metadata for an HTML document.
This uses lxml to parse the HTML document into the OG response. If errors
occur during processing of the document, an empty response is returned.
This uses lxml to parse the HTML document.
Args:
body: The HTML document, as bytes.
media_url: The URI used to download the body.
request_encoding: The character encoding of the body, as a string.
Returns:
The OG response as a dictionary.
The parsed HTML body, or None if an error occurred during processed.
"""
# If there's no body, nothing useful is going to be found.
if not body:
return {}
return None
from lxml import etree
@ -662,25 +701,22 @@ def decode_and_calc_og(
parser = etree.HTMLParser(recover=True, encoding="utf-8")
except Exception as e:
logger.warning("Unable to create HTML parser: %s" % (e,))
return {}
return None
def _attempt_calc_og(body_attempt: Union[bytes, str]) -> Dict[str, Optional[str]]:
# Attempt to parse the body. If this fails, log and return no metadata.
tree = etree.fromstring(body_attempt, parser)
# The data was successfully parsed, but no tree was found.
if tree is None:
return {}
return _calc_og(tree, media_uri)
def _attempt_decode_body(
body_attempt: Union[bytes, str]
) -> Optional["etree.Element"]:
# Attempt to parse the body. Returns None if the body was successfully
# parsed, but no tree was found.
return etree.fromstring(body_attempt, parser)
# Attempt to parse the body. If this fails, log and return no metadata.
try:
return _attempt_calc_og(body)
return _attempt_decode_body(body)
except UnicodeDecodeError:
# blindly try decoding the body as utf-8, which seems to fix
# the charset mismatches on https://google.com
return _attempt_calc_og(body.decode("utf-8", "ignore"))
return _attempt_decode_body(body.decode("utf-8", "ignore"))
def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]: