Merge branch 'predup_type_playlist' into qa

This commit is contained in:
Barbara Miller 2025-08-15 13:49:07 -07:00
commit 16d97ccb5c
3 changed files with 205 additions and 165 deletions

184
brozzler/video_data.py Normal file
View file

@ -0,0 +1,184 @@
"""
brozzler/video_data.py - video data support for brozzler predup
Copyright (C) 2025 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import datetime
import os
from dataclasses import dataclass
from typing import Any, Bool, List, Optional
import structlog
import urlcanon
logger = structlog.get_logger(logger_name=__name__)
# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata
@dataclass(frozen=True)
class VideoCaptureRecord:
crawl_job_id: int
is_test_crawl: bool
seed_id: int
collection_id: int
containing_page_timestamp: str
containing_page_digest: str
containing_page_media_index: int
containing_page_media_count: int
video_digest: str
video_timestamp: str
video_mimetype: str
video_http_status: int
video_size: int
containing_page_url: str
video_url: str
video_title: str
video_display_id: (
str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param
)
video_resolution: str
video_capture_status: str # recrawl? what else?
class VideoDataClient:
from psycopg_pool import ConnectionPool, PoolTimeout
VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE")
def __init__(self):
from psycopg_pool import ConnectionPool
pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9)
pool.wait()
logger.info("pg pool ready")
# atexit.register(pool.close)
self.pool = pool
def _execute_pg_query(self, query_tuple, fetchall=False) -> Optional[Any]:
from psycopg_pool import PoolTimeout
query_str, params = query_tuple
try:
with self.pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query_str, params)
return cur.fetchall() if fetchall else cur.fetchone()
except PoolTimeout as e:
logger.warn("hit PoolTimeout: %s", e)
self.pool.check()
except Exception as e:
logger.warn("postgres query failed: %s", e)
return None
def _timestamp4datetime(timestamp):
"""split `timestamp` into a tuple of 6 integers.
:param timestamp: full-length timestamp
"""
timestamp = timestamp[:14]
return (
int(timestamp[:-10]),
int(timestamp[-10:-8]),
int(timestamp[-8:-6]),
int(timestamp[-6:-4]),
int(timestamp[-4:-2]),
int(timestamp[-2:]),
)
def recent_video_capture_exists(
self, site=None, containing_page_url=None, recent=30
) -> Bool:
# using ait_account_id as postgres partition id
partition_id = (
site["metadata"]["ait_account_id"]
if site["metadata"]["ait_account_id"]
else None
)
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
result = False
if partition_id and seed_id and containing_page_url:
# check for postgres query for most recent record
pg_query = (
"SELECT containing_page_timestamp from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY containing_page_timestamp DESC LIMIT 1",
(partition_id, seed_id, str(urlcanon.aggressive(containing_page_url))),
)
try:
result_tuple = self._execute_pg_query(pg_query)
if result_tuple:
result = result_tuple[0]
logger.info("found most recent capture timestamp: %s", result)
capture_timestamp = datetime.datetime(
*self._timestamp4datetime(result)
)
time_diff = (
datetime.datetime.now(datetime.timezone.utc)()
- capture_timestamp
)
if time_diff < datetime.timedelta(recent):
logger.info(
"recent video capture from %s exists",
containing_page_url,
)
result = True
except Exception as e:
logger.warn("postgres query failed: %s", e)
else:
logger.warn(
"missing partition_id/account_id, seed_id, or containing_page_url"
)
return result
def get_video_captures(self, site=None, source=None) -> List[str]:
# using ait_account_id as postgres partition id
partition_id = (
site["metadata"]["ait_account_id"]
if site["metadata"]["ait_account_id"]
else None
)
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
results = []
if source == "youtube":
containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http"
# support other media sources here
if partition_id and seed_id and source:
pg_query = (
"SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s",
(
partition_id,
seed_id,
containing_page_url_pattern,
),
)
try:
result = self._execute_pg_query(pg_query, fetchall=True)
if result:
results = [row[0] for row in result]
except Exception as e:
logger.warn("postgres query failed: %s", e)
else:
logger.warn("missing partition_id/account_id, seed_id, or source")
return results

View file

@ -42,7 +42,6 @@ from urllib3.exceptions import ProxyError, TimeoutError
import brozzler
import brozzler.browser
from brozzler.model import VideoCaptureOptions
from brozzler.ydl import VideoDataClient
from . import metrics
@ -94,7 +93,10 @@ class BrozzlerWorker:
self._service_registry = service_registry
self._ytdlp_proxy_endpoints = ytdlp_proxy_endpoints
self._max_browsers = max_browsers
# see video_data.py for more info
if self.VIDEO_DATA_SOURCE and self.VIDEO_DATA_SOURCE.startswith("postgresql"):
from brozzler.video_data import VideoDataClient
self._video_data = VideoDataClient()
self._warcprox_auto = warcprox_auto
@ -280,21 +282,6 @@ class BrozzlerWorker:
img.save(out, "jpeg", quality=95)
return out.getbuffer()
def _timestamp4datetime(timestamp):
"""split `timestamp` into a tuple of 6 integers.
:param timestamp: full-length timestamp
"""
timestamp = timestamp[:14]
return (
int(timestamp[:-10]),
int(timestamp[-10:-8]),
int(timestamp[-8:-6]),
int(timestamp[-6:-4]),
int(timestamp[-4:-2]),
int(timestamp[-2:]),
)
def should_ytdlp(self, logger, site, page, page_status):
# called only after we've passed needs_browsing() check
@ -316,29 +303,26 @@ class BrozzlerWorker:
# predup...
logger.info("checking for recent previous captures of %s", ytdlp_url)
if "youtube.com/watch" in ytdlp_url:
try:
previous_capture = self._video_data.get_recent_video_capture(
site, ytdlp_url
)
if previous_capture:
capture_timestamp = datetime.datetime(
*self._timestamp4datetime(previous_capture)
)
logger.info("capture_timestamp: %s", capture_timestamp)
time_diff = datetime.datetime.now() - capture_timestamp
# TODO: make variable for timedelta
if time_diff < datetime.timedelta(days=90):
logger.info(
"skipping ytdlp for %s since there's a recent capture",
ytdlp_url,
)
return False
except Exception as e:
logger.warning(
"exception querying for previous capture for %s: %s",
recent = 90 # 90 days
else:
recent = 30 # 30 days, current default
try:
recent_capture_exists = self._video_data.recent_video_capture_exists(
site, ytdlp_url, recent
)
if recent_capture_exists:
logger.info(
"recent previous capture of %s found, skipping ytdlp",
ytdlp_url,
str(e),
)
return False
except Exception as e:
logger.warning(
"exception querying for previous capture for %s: %s",
ytdlp_url,
str(e),
)
return True
@metrics.brozzler_page_processing_duration_seconds.time()

View file

@ -24,8 +24,6 @@ import tempfile
import threading
import time
import urllib.request
from dataclasses import dataclass
from typing import Any, List, Optional
import doublethink
import structlog
@ -48,132 +46,6 @@ YTDLP_MAX_REDIRECTS = 5
logger = structlog.get_logger(logger_name=__name__)
# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata
@dataclass(frozen=True)
class VideoCaptureRecord:
crawl_job_id: int
is_test_crawl: bool
seed_id: int
collection_id: int
containing_page_timestamp: str
containing_page_digest: str
containing_page_media_index: int
containing_page_media_count: int
video_digest: str
video_timestamp: str
video_mimetype: str
video_http_status: int
video_size: int
containing_page_url: str
video_url: str
video_title: str
video_display_id: (
str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param
)
video_resolution: str
video_capture_status: str # recrawl? what else?
class VideoDataClient:
VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE")
def __init__(self):
from psycopg_pool import ConnectionPool
pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9)
pool.wait()
logger.info("pg pool ready")
# atexit.register(pool.close)
self.pool = pool
def _execute_pg_query(self, query_tuple, fetchall=False) -> Optional[Any]:
from psycopg_pool import PoolTimeout
query_str, params = query_tuple
try:
with self.pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query_str, params)
return cur.fetchall() if fetchall else cur.fetchone()
except PoolTimeout as e:
logger.warn("hit PoolTimeout: %s", e)
self.pool.check()
except Exception as e:
logger.warn("postgres query failed: %s", e)
return None
def get_recent_video_capture(self, site=None, containing_page_url=None) -> List:
# using ait_account_id as postgres partition id
partition_id = (
site["metadata"]["ait_account_id"]
if site["metadata"]["ait_account_id"]
else None
)
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
result = None
if partition_id and seed_id and containing_page_url:
# check for postgres query for most recent record
pg_query = (
"SELECT containing_page_timestamp from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY containing_page_timestamp DESC LIMIT 1",
(partition_id, seed_id, str(urlcanon.aggressive(containing_page_url))),
)
try:
result_tuple = self._execute_pg_query(pg_query)
if result_tuple:
result = result_tuple[0]
logger.info("found most recent video capture record: %s", result)
except Exception as e:
logger.warn("postgres query failed: %s", e)
else:
logger.warn(
"missing partition_id/account_id, seed_id, or containing_page_url"
)
return result
def get_video_captures(self, site=None, source=None) -> List[str]:
# using ait_account_id as postgres partition id
partition_id = (
site["metadata"]["ait_account_id"]
if site["metadata"]["ait_account_id"]
else None
)
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
results = []
if source == "youtube":
containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http"
# support other media sources here
if partition_id and seed_id and source:
pg_query = (
"SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s",
(
partition_id,
seed_id,
containing_page_url_pattern,
),
)
try:
result = self._execute_pg_query(pg_query, fetchall=True)
if result:
results = [row[0] for row in result]
except Exception as e:
logger.warn("postgres query failed: %s", e)
else:
logger.warn("missing partition_id/account_id, seed_id, or source")
return results
def isyoutubehost(url):
# split 1 splits scheme from url, split 2 splits path from hostname
return "youtube.com" in url.split("//")[-1].split("/")[0]