brozzler/brozzler/ydl.py
2025-07-21 18:15:36 -07:00

656 lines
26 KiB
Python

"""
brozzler/ydl.py - youtube-dl / yt-dlp support for brozzler
Copyright (C) 2024-2025 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import datetime
import json
import os
import random
import tempfile
import threading
import time
import urllib.request
from typing import Any, List, Optional
import doublethink
import psycopg
import structlog
import urlcanon
import yt_dlp
from dataclasses import dataclass
from psycopg_pool import ConnectionPool, PoolTimeout
from yt_dlp.utils import ExtractorError, match_filter_func
import brozzler
from . import metrics
thread_local = threading.local()
PROXY_ATTEMPTS = 4
YTDLP_WAIT = 10
YTDLP_MAX_REDIRECTS = 5
logger = structlog.get_logger(logger_name=__name__)
# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata
@dataclass(frozen=True)
class VideoCaptureRecord:
crawl_job_id: int
is_test_crawl: bool
seed_id: int
collection_id: int
containing_page_timestamp: str
containing_page_digest: str
containing_page_media_index: int
containing_page_media_count: int
video_digest: str
video_timestamp: str
video_mimetype: str
video_http_status: int
video_size: int
containing_page_url: str
video_url: str
video_title: str
video_display_id: (
str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param
)
video_resolution: str
video_capture_status: str # recrawl? what else?
class VideoDataClient:
import psycopg
from psycopg_pool import ConnectionPool, PoolTimeout
VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE")
def __init__(self):
pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9)
pool.wait()
logger.info("pg pool ready")
# atexit.register(pool.close)
self.pool = pool
def _execute_pg_query(
self, query: str, row_factory=None, fetchone=False, fetchall=False
) -> Optional[Any]:
try:
with self.pool.connection() as conn:
with conn.cursor(row_factory=row_factory) as cur:
cur.execute(query)
if fetchone:
return cur.fetchone()
if fetchall:
return cur.fetchall()
except PoolTimeout as e:
logger.warn("hit PoolTimeout: %s", e)
self.pool.check()
except Exception as e:
logger.warn("postgres query failed: %s", e)
return None
def get_recent_video_capture(self, site=None, containing_page_url=None) -> List:
account_id = site["account_id"] if site["account_id"] else None
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
if account_id and seed_id and containing_page_url:
# check for postgres query for most recent record
pg_query = (
"SELECT * from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY video_timestamp LIMIT 1",
(account_id, seed_id, str(urlcanon.aggressive(containing_page_url))),
)
try:
results = self._execute_pg_query(pg_query, fetchall=True)
except Exception as e:
logger.warn("postgres query failed: %s", e)
results = []
else:
logger.warn("missing account_id, seed_id, or containing_page_url")
results = []
return results
def get_video_captures(self, site=None, source=None) -> List[str]:
account_id = site["account_id"] if site["account_id"] else None
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
# TODO: generalize, maybe make variable?
# containing_page_timestamp_pattern = "2025%" # for future pre-dup additions
if source == "youtube":
containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http"
# support other media sources here
if account_id and seed_id and source:
pg_query = (
"SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s",
(
account_id,
seed_id,
containing_page_url_pattern,
),
)
try:
results = self._execute_pg_query(
pg_query, row_factory=psycopg.rows.scalar_row, fetchall=True
)
except Exception as e:
logger.warn("postgres query failed: %s", e)
results = []
else:
logger.warn("missing account_id, seed_id, or source")
results = []
return results
def create_video_capture_record(self, video_capture_record):
# NOTE: we want to do this in brozzler postcrawl for now
# WIP
# TODO: needs added fields added to postgres table, refinement
pg_query = (
f"INSERT INTO video ({VideoCaptureRecord - items}) VALUES (%s, %s, ...)",
VideoCaptureRecord - values,
)
try:
results = self._execute_pg_query(pg_query)
except Exception as e:
logger.warn("postgres query failed: %s", e)
results = []
return results
def isyoutubehost(url):
# split 1 splits scheme from url, split 2 splits path from hostname
return "youtube.com" in url.split("//")[-1].split("/")[0]
class ExtraHeaderAdder(urllib.request.BaseHandler):
def __init__(self, extra_headers):
self.extra_headers = extra_headers
self.http_request = self._http_request
self.https_request = self._http_request
def _http_request(self, req):
for h, v in self.extra_headers.items():
if h.capitalize() not in req.headers:
req.add_header(h, v)
return req
def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
"""
Builds a yt-dlp `yt_dlp.YoutubeDL` for brozzling `site` with `worker`.
The `YoutubeDL` instance does a few special brozzler-specific things:
- periodically updates `site.last_claimed` in rethinkdb
- pushes captured video to warcprox using a WARCPROX_WRITE_RECORD request
- some logging
Args:
worker (brozzler.BrozzlerWorker): the calling brozzler worker
destdir (str): where to save downloaded videos
site (brozzler.Site): the site we are brozzling
page (brozzler.Page): the page we are brozzling
Returns:
a yt-dlp `yt_dlp.YoutubeDL` instance
"""
class _YoutubeDL(yt_dlp.YoutubeDL):
logger = structlog.get_logger(__module__ + "." + __qualname__)
def __init__(self, url, params=None, auto_init=True):
super().__init__(params, auto_init)
self.url = url
self.logger = self.logger.bind(url=url)
def process_ie_result(self, ie_result, download=True, extra_info=None):
if extra_info is None:
extra_info = {}
result_type = ie_result.get("_type", "video")
if result_type in ("url", "url_transparent"):
if "extraction_depth" in extra_info:
self.logger.info(
"Following redirect",
redirect_url=ie_result["url"],
extraction_depth=extra_info["extraction_depth"],
)
extra_info["extraction_depth"] = 1 + extra_info.get(
"extraction_depth", 0
)
else:
extra_info["extraction_depth"] = 0
if extra_info["extraction_depth"] >= YTDLP_MAX_REDIRECTS:
raise ExtractorError(
f"Too many hops for URL: {ie_result['url']}",
expected=True,
)
return super().process_ie_result(ie_result, download, extra_info)
def add_default_extra_info(self, ie_result, ie, url):
# hook in some logging
super().add_default_extra_info(ie_result, ie, url)
extract_context = self.logger.bind(extractor=ie.IE_NAME)
if ie_result.get("_type") == "playlist":
extract_context.info("found playlist")
if ie.IE_NAME in {
"youtube:playlist",
"youtube:tab",
"soundcloud:user",
"instagram:user",
}:
# At this point ie_result['entries'] is an iterator that
# will fetch more metadata from youtube to list all the
# videos. We unroll that iterator here partly because
# otherwise `process_ie_result()` will clobber it, and we
# use it later to extract the watch pages as outlinks.
try:
ie_result["entries_no_dl"] = list(ie_result["entries"])
except Exception:
extract_context.warning(
"failed to unroll entries ie_result['entries']?",
exc_info=True,
)
ie_result["entries_no_dl"] = []
ie_result["entries"] = []
self.logger.info(
"not downloading media files from this "
"playlist because we expect to capture them from "
"individual watch/track/detail pages",
media_file_count=len(ie_result["entries_no_dl"]),
)
else:
extract_context.info("found a download")
def _push_video_to_warcprox(self, site, info_dict, postprocessor):
# 220211 update: does yt-dlp supply content-type? no, not as such
# XXX Don't know how to get the right content-type. Youtube-dl
# doesn't supply it. Sometimes (with --hls-prefer-native)
# youtube-dl produces a stitched-up video that /usr/bin/file fails
# to identify (says "application/octet-stream"). `ffprobe` doesn't
# give us a mimetype.
if info_dict.get("ext") == "mp4":
mimetype = "video/mp4"
else:
try:
import magic
mimetype = magic.from_file(info_dict["filepath"], mime=True)
except ImportError:
mimetype = "video/%s" % info_dict["ext"]
self.logger.warning(
"guessing mimetype due to error",
mimetype=mimetype,
exc_info=True,
)
# youtube watch page postprocessor is MoveFiles
if postprocessor == "FixupM3u8" or postprocessor == "Merger":
url = "youtube-dl:%05d:%s" % (
info_dict.get("playlist_index") or 1,
info_dict["webpage_url"],
)
else:
url = info_dict.get("url", "")
# skip urls containing .m3u8, to avoid duplicates handled by FixupM3u8
if url == "" or ".m3u8" in url:
return
size = os.path.getsize(info_dict["filepath"])
self.logger.info(
"pushing video to warcprox",
format=info_dict["format"],
mimetype=mimetype,
size=size,
warcprox=worker._proxy_for(site),
)
with open(info_dict["filepath"], "rb") as f:
# include content-length header to avoid chunked
# transfer, which warcprox currently rejects
extra_headers = dict(site.extra_headers())
extra_headers["content-length"] = size
request, response = worker._warcprox_write_record(
warcprox_address=worker._proxy_for(site),
url=url,
warc_type="resource",
content_type=mimetype,
payload=f,
extra_headers=extra_headers,
)
# consulted by _remember_videos()
ydl.pushed_videos.append(
{
"url": url,
"response_code": response.code,
"content-type": mimetype,
"content-length": size,
}
)
def maybe_heartbeat_site_last_claimed(*args, **kwargs):
# in case yt-dlp takes a long time, heartbeat site.last_claimed
# to prevent another brozzler-worker from claiming the site
try:
if (
site.rr
and doublethink.utcnow() - site.last_claimed
> datetime.timedelta(minutes=worker.SITE_SESSION_MINUTES)
):
worker.logger.debug(
"heartbeating site.last_claimed to prevent another "
"brozzler-worker claiming this site",
id=site.id,
)
site.last_claimed = doublethink.utcnow()
site.save()
except: # noqa: E722
worker.logger.debug(
"problem heartbeating site.last_claimed site",
id=site.id,
exc_info=True,
)
def ydl_postprocess_hook(d):
if d["status"] == "finished":
worker.logger.info(
"[ydl_postprocess_hook] Finished postprocessing",
postprocessor=d["postprocessor"],
)
is_youtube_host = isyoutubehost(d["info_dict"]["webpage_url"])
metrics.brozzler_ydl_download_successes.labels(is_youtube_host).inc(1)
if worker._using_warcprox(site):
_YoutubeDL._push_video_to_warcprox(
_YoutubeDL, site, d["info_dict"], d["postprocessor"]
)
# default socket_timeout is 20 -- we hit it often when cluster is busy
ydl_opts = {
"outtmpl": "{}/ydl%(autonumber)s.out".format(destdir),
"retries": 1,
"nocheckcertificate": True,
"noplaylist": True,
"noprogress": True,
"nopart": True,
"no_color": True,
"socket_timeout": 40,
"progress_hooks": [maybe_heartbeat_site_last_claimed],
"postprocessor_hooks": [ydl_postprocess_hook],
# https://github.com/yt-dlp/yt-dlp#format-selection
# "By default, yt-dlp tries to download the best available quality..."
# v.2023.07.06 https://www.reddit.com/r/youtubedl/wiki/h264/?rdt=63577
# recommended: convert working cli to api call with
# https://github.com/yt-dlp/yt-dlp/blob/master/devscripts/cli_to_api.py
"format_sort": ["res:720", "vcodec:h264", "acodec:aac"],
# skip live streams
"match_filter": match_filter_func("!is_live"),
"extractor_args": {"generic": {"impersonate": [""]}},
# --cache-dir local or..
# this looked like a problem with nsf-mounted homedir, maybe not a problem for brozzler on focal?
"cache_dir": "/home/archiveit",
"logger": logger,
"verbose": False,
"quiet": False,
# recommended to avoid bot detection
"sleep_interval": 7,
"max_sleep_interval": 27,
# preserve pre-2025.07.21 mtime handling
"updatetime": True,
}
ytdlp_url = page.redirect_url if page.redirect_url else page.url
is_youtube_host = isyoutubehost(ytdlp_url)
if is_youtube_host and ytdlp_proxy_endpoints:
if 'com/watch' not in ytdlp_url:
ydl_opts["proxy"] = ytdlp_proxy_endpoints[4]
else:
ydl_opts["proxy"] = random.choice(ytdlp_proxy_endpoints[0:4])
# don't log proxy value secrets
ytdlp_proxy_for_logs = (
ydl_opts["proxy"].split("@")[1] if "@" in ydl_opts["proxy"] else "@@@"
)
logger.info("using yt-dlp proxy ...", proxy=ytdlp_proxy_for_logs)
# skip warcprox proxying yt-dlp v.2023.07.06: youtube extractor using ranges
# if worker._proxy_for(site):
# ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site))
ydl = _YoutubeDL(ytdlp_url, params=ydl_opts)
if site.extra_headers():
ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers(page)))
ydl.pushed_videos = []
ydl.is_youtube_host = is_youtube_host
return ydl
# new maybe? def _remember_videos(page, site, worker, ydl, ie_result, pushed_videos=None):
def _remember_videos(page, pushed_videos=None):
"""
Saves info about videos captured by yt-dlp in `page.videos`
"""
if "videos" not in page:
page.videos = []
for pushed_video in pushed_videos or []:
video = {
"blame": "youtube-dl",
"url": pushed_video["url"],
"response_code": pushed_video["response_code"],
"content-type": pushed_video["content-type"],
"content-length": pushed_video["content-length"],
}
"""
# WIP: add new video record to QA postgres here, or in postcrawl only?
warc_prefix_items = site.warcprox_meta["warc-prefix"].split("-")
video_record = worker._video_data.VideoCaptureRecord()
video_record.crawl_job_id = site.job_id
video_record.is_test_crawl = True if warc_prefix_items[2] == "TEST" else False
video_record.seed_id = site["metadata"]["ait_seed_id"]
video_record.collection_id = int(warc_prefix_items[1])
video_record.containing_page_timestamp = None
video_record.containing_page_digest = None
video_record.containing_page_media_index = None
video_record.containing_page_media_count = None
video_record.video_digest = None
video_record.video_timestamp = None
video_record.video_mimetype = pushed_video["content-type"]
video_record.video_http_status = pushed_video["response_code"]
video_record.video_size = pushed_video["content-length"] # probably?
video_record.containing_page_url = str(
urlcanon.aggressive(ydl.url)
) # probably?
video_record.video_url = pushed_video["url"]
# note: NEW! ie_result may not be correct when multiple videos present
video_record.video_title = ie_result.get("title")
video_record.video_display_id = ie_result.get("display_id")
video_record.video_resolution = ie_result.get("resolution")
video_record.video_capture_status = None # "recrawl" maybe
"""
logger.debug("embedded video", video=video)
page.videos.append(video)
def _try_youtube_dl(worker, ydl, site, page):
max_attempts = PROXY_ATTEMPTS if ydl.is_youtube_host else 1
attempt = 0
while attempt < max_attempts:
try:
logger.info("trying yt-dlp", url=ydl.url)
# should_download_vid = not ydl.is_youtube_host
# then
# ydl.extract_info(str(urlcanon.whatwg(ydl.url)), download=should_download_vid)
# if ydl.is_youtube_host and ie_result:
# download_url = ie_result.get("url")
with brozzler.thread_accept_exceptions():
# we do whatwg canonicalization here to avoid "<urlopen error
# no host given>" resulting in ProxyError
# needs automated test
# and yt-dlp needs sanitize_info for extract_info
ie_result = ydl.sanitize_info(
ydl.extract_info(str(urlcanon.whatwg(ydl.url)))
)
metrics.brozzler_ydl_extract_successes.labels(ydl.is_youtube_host).inc(1)
break
except brozzler.ShutdownRequested:
raise
except Exception as e:
if (
hasattr(e, "exc_info")
and e.exc_info[0] == yt_dlp.utils.UnsupportedError
):
return None
elif (
hasattr(e, "exc_info")
and e.exc_info[0] == urllib.error.HTTPError
and hasattr(e.exc_info[1], "code")
and e.exc_info[1].code == 420
):
raise brozzler.ReachedLimit(e.exc_info[1])
elif isinstance(e, yt_dlp.utils.DownloadError) and (
"Redirect loop detected" in e.msg or "Too many redirects" in e.msg
):
raise brozzler.VideoExtractorError(e.msg)
else:
# todo: other errors to handle separately?
# OSError('Tunnel connection failed: 464 Host Not Allowed') (caused by ProxyError...)
# and others...
attempt += 1
if attempt == max_attempts:
logger.warning(
"Failed after %s attempt(s)",
max_attempts,
attempts=max_attempts,
exc_info=True,
)
raise brozzler.VideoExtractorError(
"yt-dlp hit error extracting info for %s" % ydl.url
)
else:
retry_wait = min(60, YTDLP_WAIT * (1.5 ** (attempt - 1)))
logger.info(
"Attempt %s failed. Retrying in %s seconds...",
attempt,
retry_wait,
)
time.sleep(retry_wait)
else:
raise brozzler.VideoExtractorError(
"yt-dlp hit unknown error extracting info for %s" % ydl.url
)
logger.info("ytdlp completed successfully")
info_json = json.dumps(ie_result, sort_keys=True, indent=4)
_remember_videos(page, ydl.pushed_videos)
if worker._using_warcprox(site):
logger.info(
"sending WARCPROX_WRITE_RECORD request to warcprox with yt-dlp json",
url=ydl.url,
)
worker._warcprox_write_record(
warcprox_address=worker._proxy_for(site),
url="youtube-dl:%s" % str(urlcanon.semantic(ydl.url)),
warc_type="metadata",
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"),
extra_headers=site.extra_headers(page),
)
return ie_result
@metrics.brozzler_ytdlp_duration_seconds.time()
@metrics.brozzler_in_progress_ytdlps.track_inprogress()
def do_youtube_dl(worker, site, page, ytdlp_proxy_endpoints):
"""
Runs yt-dlp configured for `worker` and `site` to download videos from
`page`.
Args:
worker (brozzler.BrozzlerWorker): the calling brozzler worker
site (brozzler.Site): the site we are brozzling
page (brozzler.Page): the page we are brozzling
Returns:
`list` of `str`: outlink urls
"""
with tempfile.TemporaryDirectory(
prefix="brzl-ydl-", dir=worker._ytdlp_tmpdir
) as tempdir:
logger.info("tempdir for yt-dlp", tempdir=tempdir)
ydl = _build_youtube_dl(worker, tempdir, site, page, ytdlp_proxy_endpoints)
ie_result = _try_youtube_dl(worker, ydl, site, page)
# print(ie_result)
outlinks = set()
if ie_result and (
ie_result.get("extractor") == "youtube:playlist"
or ie_result.get("extractor") == "youtube:tab"
):
if worker._video_data:
logger.info(
"checking for previously captured youtube watch pages for account %s, seed_id %s",
site["account_id"],
site["metadata"]["ait_seed_id"],
)
try:
captured_youtube_watch_pages = set()
captured_youtube_watch_pages.update(
worker._video_data.get_video_captures(site, source="youtube")
)
uncaptured_youtube_watch_pages = []
for e in ie_result.get("entries_no_dl", []):
# note: http needed for match
youtube_watch_url = str(
urlcanon.aggressive(
f"http://www.youtube.com/watch?v={e['id']}"
)
)
if youtube_watch_url in captured_youtube_watch_pages:
logger.info(
"skipping adding %s to yt-dlp outlinks",
youtube_watch_url,
)
continue
uncaptured_youtube_watch_pages.append(
f"https://www.youtube.com/watch?v={e['id']}"
)
except Exception as e:
logger.warning("hit exception processing worker._video_data: %s", e)
if uncaptured_youtube_watch_pages:
outlinks.update(uncaptured_youtube_watch_pages)
else:
outlinks = {
"https://www.youtube.com/watch?v=%s" % e["id"]
for e in ie_result.get("entries_no_dl", [])
}
# todo: handle outlinks for instagram and soundcloud, other media source, here (if anywhere)
return outlinks