diff --git a/brozzler/video_data.py b/brozzler/video_data.py new file mode 100644 index 0000000..5dc94f9 --- /dev/null +++ b/brozzler/video_data.py @@ -0,0 +1,184 @@ +""" +brozzler/video_data.py - video data support for brozzler predup + +Copyright (C) 2025 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import datetime +import os +from dataclasses import dataclass +from typing import Any, Bool, List, Optional + +import structlog +import urlcanon + +logger = structlog.get_logger(logger_name=__name__) + + +# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata +@dataclass(frozen=True) +class VideoCaptureRecord: + crawl_job_id: int + is_test_crawl: bool + seed_id: int + collection_id: int + containing_page_timestamp: str + containing_page_digest: str + containing_page_media_index: int + containing_page_media_count: int + video_digest: str + video_timestamp: str + video_mimetype: str + video_http_status: int + video_size: int + containing_page_url: str + video_url: str + video_title: str + video_display_id: ( + str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param + ) + video_resolution: str + video_capture_status: str # recrawl? what else? + + +class VideoDataClient: + from psycopg_pool import ConnectionPool, PoolTimeout + + VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE") + + def __init__(self): + from psycopg_pool import ConnectionPool + + pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9) + pool.wait() + logger.info("pg pool ready") + # atexit.register(pool.close) + + self.pool = pool + + def _execute_pg_query(self, query_tuple, fetchall=False) -> Optional[Any]: + from psycopg_pool import PoolTimeout + + query_str, params = query_tuple + try: + with self.pool.connection() as conn: + with conn.cursor() as cur: + cur.execute(query_str, params) + return cur.fetchall() if fetchall else cur.fetchone() + except PoolTimeout as e: + logger.warn("hit PoolTimeout: %s", e) + self.pool.check() + except Exception as e: + logger.warn("postgres query failed: %s", e) + return None + + def _timestamp4datetime(timestamp): + """split `timestamp` into a tuple of 6 integers. + + :param timestamp: full-length timestamp + """ + timestamp = timestamp[:14] + return ( + int(timestamp[:-10]), + int(timestamp[-10:-8]), + int(timestamp[-8:-6]), + int(timestamp[-6:-4]), + int(timestamp[-4:-2]), + int(timestamp[-2:]), + ) + + def recent_video_capture_exists( + self, site=None, containing_page_url=None, recent=30 + ) -> Bool: + # using ait_account_id as postgres partition id + partition_id = ( + site["metadata"]["ait_account_id"] + if site["metadata"]["ait_account_id"] + else None + ) + seed_id = ( + site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None + ) + result = False + + if partition_id and seed_id and containing_page_url: + # check for postgres query for most recent record + pg_query = ( + "SELECT containing_page_timestamp from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY containing_page_timestamp DESC LIMIT 1", + (partition_id, seed_id, str(urlcanon.aggressive(containing_page_url))), + ) + try: + result_tuple = self._execute_pg_query(pg_query) + if result_tuple: + result = result_tuple[0] + logger.info("found most recent capture timestamp: %s", result) + capture_timestamp = datetime.datetime( + *self._timestamp4datetime(result) + ) + time_diff = ( + datetime.datetime.now(datetime.timezone.utc)() + - capture_timestamp + ) + if time_diff < datetime.timedelta(recent): + logger.info( + "recent video capture from %s exists", + containing_page_url, + ) + result = True + + except Exception as e: + logger.warn("postgres query failed: %s", e) + else: + logger.warn( + "missing partition_id/account_id, seed_id, or containing_page_url" + ) + + return result + + def get_video_captures(self, site=None, source=None) -> List[str]: + # using ait_account_id as postgres partition id + partition_id = ( + site["metadata"]["ait_account_id"] + if site["metadata"]["ait_account_id"] + else None + ) + seed_id = ( + site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None + ) + results = [] + + if source == "youtube": + containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http" + # support other media sources here + + if partition_id and seed_id and source: + pg_query = ( + "SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s", + ( + partition_id, + seed_id, + containing_page_url_pattern, + ), + ) + try: + result = self._execute_pg_query(pg_query, fetchall=True) + if result: + results = [row[0] for row in result] + except Exception as e: + logger.warn("postgres query failed: %s", e) + else: + logger.warn("missing partition_id/account_id, seed_id, or source") + + return results diff --git a/brozzler/worker.py b/brozzler/worker.py index 4538572..a75461a 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -42,7 +42,6 @@ from urllib3.exceptions import ProxyError, TimeoutError import brozzler import brozzler.browser from brozzler.model import VideoCaptureOptions -from brozzler.ydl import VideoDataClient from . import metrics @@ -94,7 +93,10 @@ class BrozzlerWorker: self._service_registry = service_registry self._ytdlp_proxy_endpoints = ytdlp_proxy_endpoints self._max_browsers = max_browsers + # see video_data.py for more info if self.VIDEO_DATA_SOURCE and self.VIDEO_DATA_SOURCE.startswith("postgresql"): + from brozzler.video_data import VideoDataClient + self._video_data = VideoDataClient() self._warcprox_auto = warcprox_auto @@ -280,21 +282,6 @@ class BrozzlerWorker: img.save(out, "jpeg", quality=95) return out.getbuffer() - def _timestamp4datetime(timestamp): - """split `timestamp` into a tuple of 6 integers. - - :param timestamp: full-length timestamp - """ - timestamp = timestamp[:14] - return ( - int(timestamp[:-10]), - int(timestamp[-10:-8]), - int(timestamp[-8:-6]), - int(timestamp[-6:-4]), - int(timestamp[-4:-2]), - int(timestamp[-2:]), - ) - def should_ytdlp(self, logger, site, page, page_status): # called only after we've passed needs_browsing() check @@ -316,29 +303,26 @@ class BrozzlerWorker: # predup... logger.info("checking for recent previous captures of %s", ytdlp_url) if "youtube.com/watch" in ytdlp_url: - try: - previous_capture = self._video_data.get_recent_video_capture( - site, ytdlp_url - ) - if previous_capture: - capture_timestamp = datetime.datetime( - *self._timestamp4datetime(previous_capture) - ) - logger.info("capture_timestamp: %s", capture_timestamp) - time_diff = datetime.datetime.now() - capture_timestamp - # TODO: make variable for timedelta - if time_diff < datetime.timedelta(days=90): - logger.info( - "skipping ytdlp for %s since there's a recent capture", - ytdlp_url, - ) - return False - except Exception as e: - logger.warning( - "exception querying for previous capture for %s: %s", + recent = 90 # 90 days + else: + recent = 30 # 30 days, current default + try: + recent_capture_exists = self._video_data.recent_video_capture_exists( + site, ytdlp_url, recent + ) + if recent_capture_exists: + logger.info( + "recent previous capture of %s found, skipping ytdlp", ytdlp_url, - str(e), ) + return False + except Exception as e: + logger.warning( + "exception querying for previous capture for %s: %s", + ytdlp_url, + str(e), + ) + return True @metrics.brozzler_page_processing_duration_seconds.time() diff --git a/brozzler/ydl.py b/brozzler/ydl.py index 56d03d0..057d2d2 100644 --- a/brozzler/ydl.py +++ b/brozzler/ydl.py @@ -24,8 +24,6 @@ import tempfile import threading import time import urllib.request -from dataclasses import dataclass -from typing import Any, List, Optional import doublethink import structlog @@ -48,132 +46,6 @@ YTDLP_MAX_REDIRECTS = 5 logger = structlog.get_logger(logger_name=__name__) -# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata -@dataclass(frozen=True) -class VideoCaptureRecord: - crawl_job_id: int - is_test_crawl: bool - seed_id: int - collection_id: int - containing_page_timestamp: str - containing_page_digest: str - containing_page_media_index: int - containing_page_media_count: int - video_digest: str - video_timestamp: str - video_mimetype: str - video_http_status: int - video_size: int - containing_page_url: str - video_url: str - video_title: str - video_display_id: ( - str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param - ) - video_resolution: str - video_capture_status: str # recrawl? what else? - - -class VideoDataClient: - - VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE") - - def __init__(self): - from psycopg_pool import ConnectionPool - - pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9) - pool.wait() - logger.info("pg pool ready") - # atexit.register(pool.close) - - self.pool = pool - - def _execute_pg_query(self, query_tuple, fetchall=False) -> Optional[Any]: - from psycopg_pool import PoolTimeout - - query_str, params = query_tuple - try: - with self.pool.connection() as conn: - with conn.cursor() as cur: - cur.execute(query_str, params) - return cur.fetchall() if fetchall else cur.fetchone() - except PoolTimeout as e: - logger.warn("hit PoolTimeout: %s", e) - self.pool.check() - except Exception as e: - logger.warn("postgres query failed: %s", e) - return None - - def get_recent_video_capture(self, site=None, containing_page_url=None) -> List: - # using ait_account_id as postgres partition id - partition_id = ( - site["metadata"]["ait_account_id"] - if site["metadata"]["ait_account_id"] - else None - ) - seed_id = ( - site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None - ) - result = None - - if partition_id and seed_id and containing_page_url: - # check for postgres query for most recent record - pg_query = ( - "SELECT containing_page_timestamp from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY containing_page_timestamp DESC LIMIT 1", - (partition_id, seed_id, str(urlcanon.aggressive(containing_page_url))), - ) - try: - result_tuple = self._execute_pg_query(pg_query) - if result_tuple: - result = result_tuple[0] - logger.info("found most recent video capture record: %s", result) - - except Exception as e: - logger.warn("postgres query failed: %s", e) - else: - logger.warn( - "missing partition_id/account_id, seed_id, or containing_page_url" - ) - - return result - - def get_video_captures(self, site=None, source=None) -> List[str]: - # using ait_account_id as postgres partition id - partition_id = ( - site["metadata"]["ait_account_id"] - if site["metadata"]["ait_account_id"] - else None - ) - seed_id = ( - site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None - ) - results = [] - - if source == "youtube": - containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http" - # support other media sources here - - if partition_id and seed_id and source: - pg_query = ( - "SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s", - ( - partition_id, - seed_id, - containing_page_url_pattern, - ), - ) - try: - result = self._execute_pg_query(pg_query, fetchall=True) - if result: - results = [row[0] for row in result] - except Exception as e: - logger.warn("postgres query failed: %s", e) - else: - logger.warn("missing partition_id/account_id, seed_id, or source") - - return results - - def isyoutubehost(url): # split 1 splits scheme from url, split 2 splits path from hostname return "youtube.com" in url.split("//")[-1].split("/")[0]