self._video_data in worker

This commit is contained in:
Barbara Miller 2025-06-30 17:27:14 -07:00
parent db17335ffb
commit b4b950c0fc
2 changed files with 17 additions and 9 deletions

View file

@ -21,6 +21,7 @@ limitations under the License.
import datetime import datetime
import io import io
import json import json
import os
import socket import socket
import threading import threading
import time import time
@ -39,6 +40,7 @@ from urllib3.exceptions import ProxyError, TimeoutError
import brozzler import brozzler
import brozzler.browser import brozzler.browser
from brozzler.model import VideoCaptureOptions from brozzler.model import VideoCaptureOptions
from brozzler.ydl import VideoDataClient
from . import metrics from . import metrics
@ -56,6 +58,7 @@ class BrozzlerWorker:
SITE_SESSION_MINUTES = 15 SITE_SESSION_MINUTES = 15
HEADER_REQUEST_TIMEOUT = 30 HEADER_REQUEST_TIMEOUT = 30
FETCH_URL_TIMEOUT = 60 FETCH_URL_TIMEOUT = 60
VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE")
def __init__( def __init__(
self, self,
@ -88,6 +91,8 @@ class BrozzlerWorker:
self._service_registry = service_registry self._service_registry = service_registry
self._ytdlp_proxy_endpoints = ytdlp_proxy_endpoints self._ytdlp_proxy_endpoints = ytdlp_proxy_endpoints
self._max_browsers = max_browsers self._max_browsers = max_browsers
if VIDEO_DATA_SOURCE and VIDEO_DATA_SOURCE.startswith("postgresql"):
self._video_data = VideoDataClient()
self._warcprox_auto = warcprox_auto self._warcprox_auto = warcprox_auto
self._proxy = proxy self._proxy = proxy

View file

@ -51,14 +51,16 @@ logger = structlog.get_logger(logger_name=__name__)
class VideoDataClient: class VideoDataClient:
def __init__(self): import psycopg
if VIDEO_DATA_SOURCE and VIDEO_DATA_SOURCE.startswith("postgresql"): from psycopg_pool import ConnectionPool, PoolTimeout
pool = ConnectionPool(VIDEO_DATA_SOURCE, min_size=1, max_size=9)
pool.wait()
logger.info("pg pool ready")
# atexit.register(pool.close)
self.pool = pool def __init__(self):
pool = ConnectionPool(VIDEO_DATA_SOURCE, min_size=1, max_size=9)
pool.wait()
logger.info("pg pool ready")
# atexit.register(pool.close)
self.pool = pool
def _execute_pg_query( def _execute_pg_query(
self, query: str, row_factory=None, fetchone=False, fetchall=False self, query: str, row_factory=None, fetchone=False, fetchall=False
@ -512,10 +514,11 @@ def do_youtube_dl(worker, site, page, ytdlp_proxy_endpoints):
or ie_result.get("extractor") == "youtube:tab" or ie_result.get("extractor") == "youtube:tab"
): ):
if VIDEO_DATA_SOURCE and VIDEO_DATA_SOURCE.startswith("postgresql"): if VIDEO_DATA_SOURCE and VIDEO_DATA_SOURCE.startswith("postgresql"):
video_data = VideoDataClient()
captured_youtube_watch_pages = set() captured_youtube_watch_pages = set()
captured_youtube_watch_pages.add( captured_youtube_watch_pages.add(
video_data.get_pg_video_captures_by_source(site, source="youtube") worker._video_data.get_pg_video_captures_by_source(
site, source="youtube"
)
) )
uncaptured_youtube_watch_pages = [] uncaptured_youtube_watch_pages = []
for e in ie_result.get("entries_no_dl", []): for e in ie_result.get("entries_no_dl", []):