mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-23 16:19:49 -05:00
Merge pull request #289 from galgeek/metrics_plus_proxy_retries
prometheus metrics and yt-dlp external proxy retry updates
This commit is contained in:
commit
d987ba2e4b
@ -39,6 +39,10 @@ class PageInterstitialShown(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class VideoExtractorError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ProxyError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -234,6 +234,25 @@ def brozzle_page(argv=None):
|
||||
action="store_true",
|
||||
help="Try to avoid web bot detection",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--metrics_port",
|
||||
type=int,
|
||||
dest="metrics_port",
|
||||
default=0,
|
||||
help="Port for brozzler's Prometheus scrape endpoint",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--registry_url",
|
||||
dest="registry_url",
|
||||
default=None,
|
||||
help="http-sd-registry url, for Prometheus metrics discovery",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--env",
|
||||
dest="env",
|
||||
default=None,
|
||||
help="deployment environment for this brozzler instance, e.g., prod or qa",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--screenshot-full-page", dest="screenshot_full_page", action="store_true"
|
||||
)
|
||||
@ -279,6 +298,9 @@ def brozzle_page(argv=None):
|
||||
window_height=args.window_height,
|
||||
window_width=args.window_width,
|
||||
stealth=args.stealth,
|
||||
metrics_port=args.metrics_port,
|
||||
registry_url=args.registry_url,
|
||||
env=args.env,
|
||||
)
|
||||
|
||||
def on_screenshot(screenshot_jpeg):
|
||||
@ -517,6 +539,25 @@ def brozzler_worker(argv=None):
|
||||
action="store_true",
|
||||
help="Try to avoid web bot detection",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--metrics_port",
|
||||
type=int,
|
||||
dest="metrics_port",
|
||||
default=0,
|
||||
help="Port for brozzler's Prometheus scrape endpoint",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--registry_url",
|
||||
dest="registry_url",
|
||||
default=None,
|
||||
help="http-sd-registry url, for Prometheus metrics discovery",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--env",
|
||||
dest="env",
|
||||
default=None,
|
||||
help="deployment environment for this brozzler instance, e.g., prod or qa",
|
||||
)
|
||||
add_common_options(arg_parser, argv)
|
||||
|
||||
args = arg_parser.parse_args(args=argv[1:])
|
||||
@ -573,6 +614,9 @@ def brozzler_worker(argv=None):
|
||||
skip_visit_hashtags=args.skip_visit_hashtags,
|
||||
skip_youtube_dl=args.skip_youtube_dl,
|
||||
stealth=args.stealth,
|
||||
metrics_port=args.metrics_port,
|
||||
registry_url=args.registry_url,
|
||||
env=args.env,
|
||||
)
|
||||
|
||||
signal.signal(signal.SIGQUIT, dump_state)
|
||||
|
58
brozzler/metrics.py
Normal file
58
brozzler/metrics.py
Normal file
@ -0,0 +1,58 @@
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
from http_sd_registry.client import (
|
||||
Client,
|
||||
Env,
|
||||
Registration,
|
||||
Scheme,
|
||||
format_self_target,
|
||||
)
|
||||
from http_sd_registry.config import ClientConfig
|
||||
except ImportError:
|
||||
# for users without access to http_sd_registry
|
||||
http_sd_registry = None
|
||||
|
||||
|
||||
from prometheus_client import Counter, Gauge, Histogram, start_http_server
|
||||
|
||||
# fmt: off
|
||||
brozzler_pages_crawled = Counter("brozzler_pages_crawled", "number of pages visited by brozzler")
|
||||
brozzler_page_processing_duration_seconds = Histogram("brozzler_page_processing_duration_seconds", "time spent processing a page in brozzler")
|
||||
brozzler_outlinks_found = Counter("brozzler_outlinks_found", "number of outlinks found by brozzler")
|
||||
brozzler_last_page_crawled_time = Gauge("brozzler_last_page_crawled_time", "time of last page visit, in seconds since UNIX epoch")
|
||||
brozzler_in_progress_pages = Gauge("brozzler_in_progress_pages", "number of pages currently processing with brozzler")
|
||||
brozzler_ydl_urls_checked = Counter("brozzler_ydl_urls_checked", "count of urls checked by brozzler yt-dlp")
|
||||
brozzler_ydl_extract_attempts = Counter("brozzler_ydl_extract_attempts", "count of extracts attempted by brozzler yt-dlp", labelnames=["youtube_host"])
|
||||
brozzler_ydl_extract_successes = Counter("brozzler_ydl_extract_successes", "count of extracts completed by brozzler yt-dlp", labelnames=["youtube_host"])
|
||||
brozzler_ydl_download_successes = Counter("brozzler_ydl_download_successes", "count of downloads completed by brozzler yt-dlp", labelnames=["youtube_host"])
|
||||
# fmt: on
|
||||
|
||||
|
||||
def register_prom_metrics(
|
||||
metrics_port: int = 8888,
|
||||
registry_url: Optional[str] = None,
|
||||
env: Optional[str] = None,
|
||||
):
|
||||
# Start metrics endpoint for scraping
|
||||
start_http_server(metrics_port)
|
||||
|
||||
if registry_url is None:
|
||||
return
|
||||
|
||||
if env == "qa":
|
||||
env_for_prom = Env.qa
|
||||
elif env == "prod":
|
||||
env_for_prom = Env.prod
|
||||
else:
|
||||
env_for_prom = Env.qa
|
||||
|
||||
config = ClientConfig(server_url_base=registry_url)
|
||||
client = Client(config)
|
||||
target = format_self_target(scrape_port=metrics_port)
|
||||
registration = Registration(
|
||||
target=target,
|
||||
env=env_for_prom,
|
||||
scheme=Scheme.http,
|
||||
)
|
||||
client.keep_registered_threaded(registration)
|
@ -35,6 +35,7 @@ import tempfile
|
||||
import urlcanon
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
import rethinkdb as rdb
|
||||
from . import metrics
|
||||
from . import ydl
|
||||
|
||||
r = rdb.RethinkDB()
|
||||
@ -71,6 +72,9 @@ class BrozzlerWorker:
|
||||
stealth=False,
|
||||
window_height=900,
|
||||
window_width=1400,
|
||||
metrics_port=0,
|
||||
registry_url=None,
|
||||
env=None,
|
||||
):
|
||||
self._frontier = frontier
|
||||
self._service_registry = service_registry
|
||||
@ -93,6 +97,9 @@ class BrozzlerWorker:
|
||||
self._window_height = window_height
|
||||
self._window_width = window_width
|
||||
self._stealth = stealth
|
||||
self._metrics_port = metrics_port
|
||||
self._registry_url = registry_url
|
||||
self._env = env
|
||||
|
||||
self._browser_pool = brozzler.browser.BrowserPool(
|
||||
max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True
|
||||
@ -104,6 +111,16 @@ class BrozzlerWorker:
|
||||
self._start_stop_lock = threading.Lock()
|
||||
self._shutdown = threading.Event()
|
||||
|
||||
# set up metrics
|
||||
if self._metrics_port > 0:
|
||||
metrics.register_prom_metrics(
|
||||
self._metrics_port, self._registry_url, self._env
|
||||
)
|
||||
else:
|
||||
logging.warning(
|
||||
"not starting prometheus scrape endpoint: metrics_port is undefined"
|
||||
)
|
||||
|
||||
def _choose_warcprox(self):
|
||||
warcproxes = self._service_registry.available_services("warcprox")
|
||||
if not warcproxes:
|
||||
@ -266,6 +283,7 @@ class BrozzlerWorker:
|
||||
):
|
||||
try:
|
||||
ydl_outlinks = ydl.do_youtube_dl(self, site, page)
|
||||
metrics.brozzler_ydl_urls_checked.inc(1)
|
||||
outlinks.update(ydl_outlinks)
|
||||
except brozzler.ReachedLimit as e:
|
||||
raise
|
||||
@ -273,6 +291,11 @@ class BrozzlerWorker:
|
||||
raise
|
||||
except brozzler.ProxyError:
|
||||
raise
|
||||
except brozzler.VideoExtractorError as e:
|
||||
logging.error(
|
||||
"error extracting video info: %s",
|
||||
e,
|
||||
)
|
||||
except Exception as e:
|
||||
if (
|
||||
hasattr(e, "exc_info")
|
||||
@ -311,7 +334,15 @@ class BrozzlerWorker:
|
||||
return False
|
||||
return True
|
||||
|
||||
@metrics.brozzler_page_processing_duration_seconds.time()
|
||||
@metrics.brozzler_in_progress_pages.track_inprogress()
|
||||
def _browse_page(self, browser, site, page, on_screenshot=None, on_request=None):
|
||||
def update_page_metrics(page, outlinks):
|
||||
"""Update page-level Prometheus metrics."""
|
||||
metrics.brozzler_last_page_crawled_time.set_to_current_time()
|
||||
metrics.brozzler_pages_crawled.inc(1)
|
||||
metrics.brozzler_outlinks_found.inc(len(outlinks))
|
||||
|
||||
def _on_screenshot(screenshot_jpeg):
|
||||
if on_screenshot:
|
||||
on_screenshot(screenshot_jpeg)
|
||||
@ -416,6 +447,7 @@ class BrozzlerWorker:
|
||||
)
|
||||
if final_page_url != page.url:
|
||||
page.note_redirect(final_page_url)
|
||||
update_page_metrics(page, outlinks)
|
||||
return outlinks
|
||||
|
||||
def _fetch_url(self, site, url=None, page=None):
|
||||
|
179
brozzler/ydl.py
179
brozzler/ydl.py
@ -27,10 +27,16 @@ import os
|
||||
import json
|
||||
import doublethink
|
||||
import datetime
|
||||
from . import metrics
|
||||
import threading
|
||||
import time
|
||||
|
||||
thread_local = threading.local()
|
||||
|
||||
YTDLP_PROXY = ""
|
||||
MAX_YTDLP_ATTEMPTS = 4
|
||||
YTDLP_WAIT = 10
|
||||
|
||||
|
||||
def should_ytdlp(site, page, page_status, skip_av_seeds):
|
||||
# called only after we've passed needs_browsing() check
|
||||
@ -65,6 +71,11 @@ def should_ytdlp(site, page, page_status, skip_av_seeds):
|
||||
return True
|
||||
|
||||
|
||||
def isyoutubehost(url):
|
||||
# split 1 splits scheme from url, split 2 splits path from hostname, split 3 splits query string on hostname
|
||||
return "youtube.com" in url.split("//")[-1].split("/")[0].split("?")[0]
|
||||
|
||||
|
||||
class ExtraHeaderAdder(urllib.request.BaseHandler):
|
||||
def __init__(self, extra_headers):
|
||||
self.extra_headers = extra_headers
|
||||
@ -191,15 +202,16 @@ def _build_youtube_dl(worker, destdir, site, page):
|
||||
payload=f,
|
||||
extra_headers=extra_headers,
|
||||
)
|
||||
# consulted by _remember_videos()
|
||||
ydl.pushed_videos.append(
|
||||
{
|
||||
"url": url,
|
||||
"response_code": response.code,
|
||||
"content-type": mimetype,
|
||||
"content-length": size,
|
||||
}
|
||||
)
|
||||
|
||||
# consulted by _remember_videos()
|
||||
ydl.pushed_videos.append(
|
||||
{
|
||||
"url": url,
|
||||
"response_code": response.code,
|
||||
"content-type": mimetype,
|
||||
"content-length": size,
|
||||
}
|
||||
)
|
||||
|
||||
def maybe_heartbeat_site_last_claimed(*args, **kwargs):
|
||||
# in case yt-dlp takes a long time, heartbeat site.last_claimed
|
||||
@ -230,6 +242,9 @@ def _build_youtube_dl(worker, destdir, site, page):
|
||||
worker.logger.info(
|
||||
"[ydl_postprocess_hook] postprocessor: {}".format(d["postprocessor"])
|
||||
)
|
||||
is_youtube_host = isyoutubehost(d["info_dict"]["webpage_url"])
|
||||
|
||||
metrics.brozzler_ydl_download_successes.labels(is_youtube_host).inc(1)
|
||||
if worker._using_warcprox(site):
|
||||
_YoutubeDL._push_video_to_warcprox(
|
||||
_YoutubeDL, site, d["info_dict"], d["postprocessor"]
|
||||
@ -262,9 +277,22 @@ def _build_youtube_dl(worker, destdir, site, page):
|
||||
"logger": logging.getLogger("yt_dlp"),
|
||||
"verbose": False,
|
||||
"quiet": False,
|
||||
# recommended to avoid bot detection
|
||||
"sleep_interval": 25,
|
||||
"max_sleep_interval": 90,
|
||||
}
|
||||
|
||||
# skip proxying yt-dlp v.2023.07.06
|
||||
ytdlp_url = page.redirect_url if page.redirect_url else page.url
|
||||
is_youtube_host = isyoutubehost(ytdlp_url)
|
||||
if is_youtube_host and YTDLP_PROXY:
|
||||
ydl_opts["proxy"] = YTDLP_PROXY
|
||||
# don't log proxy value secrets
|
||||
ytdlp_proxy_for_logs = (
|
||||
YTDLP_PROXY.split("@")[1] if "@" in YTDLP_PROXY else "@@@"
|
||||
)
|
||||
logging.info("using yt-dlp proxy ... %s", ytdlp_proxy_for_logs)
|
||||
|
||||
# skip warcprox proxying yt-dlp v.2023.07.06: youtube extractor using ranges
|
||||
# if worker._proxy_for(site):
|
||||
# ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site))
|
||||
|
||||
@ -272,6 +300,8 @@ def _build_youtube_dl(worker, destdir, site, page):
|
||||
if site.extra_headers():
|
||||
ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers(page)))
|
||||
ydl.pushed_videos = []
|
||||
ydl.url = ytdlp_url
|
||||
ydl.is_youtube_host = is_youtube_host
|
||||
|
||||
return ydl
|
||||
|
||||
@ -295,58 +325,85 @@ def _remember_videos(page, pushed_videos=None):
|
||||
|
||||
|
||||
def _try_youtube_dl(worker, ydl, site, page):
|
||||
ytdlp_url = page.redirect_url if page.redirect_url else page.url
|
||||
try:
|
||||
logging.info("trying yt-dlp on %s", ytdlp_url)
|
||||
|
||||
with brozzler.thread_accept_exceptions():
|
||||
# we do whatwg canonicalization here to avoid "<urlopen error
|
||||
# no host given>" resulting in ProxyError
|
||||
# needs automated test
|
||||
# and yt-dlp needs sanitize_info for extract_info
|
||||
ie_result = ydl.sanitize_info(
|
||||
ydl.extract_info(str(urlcanon.whatwg(ytdlp_url)))
|
||||
)
|
||||
_remember_videos(page, ydl.pushed_videos)
|
||||
if worker._using_warcprox(site):
|
||||
info_json = json.dumps(ie_result, sort_keys=True, indent=4)
|
||||
logging.info(
|
||||
"sending WARCPROX_WRITE_RECORD request to warcprox "
|
||||
"with yt-dlp json for %s",
|
||||
ytdlp_url,
|
||||
)
|
||||
worker._warcprox_write_record(
|
||||
warcprox_address=worker._proxy_for(site),
|
||||
url="youtube-dl:%s" % str(urlcanon.semantic(ytdlp_url)),
|
||||
warc_type="metadata",
|
||||
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
|
||||
payload=info_json.encode("utf-8"),
|
||||
extra_headers=site.extra_headers(page),
|
||||
)
|
||||
return ie_result
|
||||
except brozzler.ShutdownRequested as e:
|
||||
raise
|
||||
except Exception as e:
|
||||
if hasattr(e, "exc_info") and e.exc_info[0] == yt_dlp.utils.UnsupportedError:
|
||||
return None
|
||||
elif (
|
||||
hasattr(e, "exc_info")
|
||||
and e.exc_info[0] == urllib.error.HTTPError
|
||||
and hasattr(e.exc_info[1], "code")
|
||||
and e.exc_info[1].code == 420
|
||||
):
|
||||
raise brozzler.ReachedLimit(e.exc_info[1])
|
||||
elif (
|
||||
hasattr(e, "exc_info")
|
||||
and e.exc_info[0] == urllib.error.URLError
|
||||
and worker._proxy_for(site)
|
||||
):
|
||||
# connection problem when using a proxy == proxy error (XXX?)
|
||||
raise brozzler.ProxyError(
|
||||
"yt-dlp hit apparent proxy error from " "%s" % ytdlp_url
|
||||
) from e
|
||||
else:
|
||||
attempt = 0
|
||||
while attempt < MAX_YTDLP_ATTEMPTS:
|
||||
try:
|
||||
logging.info("trying yt-dlp on %s", ydl.url)
|
||||
# should_download_vid = not ydl.is_youtube_host
|
||||
# then
|
||||
# ydl.extract_info(str(urlcanon.whatwg(ydl.url)), download=should_download_vid)
|
||||
# if ydl.is_youtube_host and ie_result:
|
||||
# download_url = ie_result.get("url")
|
||||
metrics.brozzler_ydl_extract_attempts.labels(ydl.is_youtube_host).inc(1)
|
||||
with brozzler.thread_accept_exceptions():
|
||||
# we do whatwg canonicalization here to avoid "<urlopen error
|
||||
# no host given>" resulting in ProxyError
|
||||
# needs automated test
|
||||
# and yt-dlp needs sanitize_info for extract_info
|
||||
ie_result = ydl.sanitize_info(
|
||||
ydl.extract_info(str(urlcanon.whatwg(ydl.url)))
|
||||
)
|
||||
metrics.brozzler_ydl_extract_successes.labels(ydl.is_youtube_host).inc(1)
|
||||
break
|
||||
except brozzler.ShutdownRequested as e:
|
||||
raise
|
||||
except Exception as e:
|
||||
if (
|
||||
hasattr(e, "exc_info")
|
||||
and e.exc_info[0] == yt_dlp.utils.UnsupportedError
|
||||
):
|
||||
return None
|
||||
elif (
|
||||
hasattr(e, "exc_info")
|
||||
and e.exc_info[0] == urllib.error.HTTPError
|
||||
and hasattr(e.exc_info[1], "code")
|
||||
and e.exc_info[1].code == 420
|
||||
):
|
||||
raise brozzler.ReachedLimit(e.exc_info[1])
|
||||
else:
|
||||
# todo: other errors to handle separately?
|
||||
# OSError('Tunnel connection failed: 464 Host Not Allowed') (caused by ProxyError...)
|
||||
# and others...
|
||||
attempt += 1
|
||||
if attempt == MAX_YTDLP_ATTEMPTS:
|
||||
logging.warning(
|
||||
"Failed after %s attempts. Error: %s", MAX_YTDLP_ATTEMPTS, e
|
||||
)
|
||||
raise brozzler.VideoExtractorError(
|
||||
"yt-dlp hit error extracting info for %s" % ydl.url
|
||||
)
|
||||
else:
|
||||
retry_wait = min(60, YTDLP_WAIT * (1.5 ** (attempt - 1)))
|
||||
logging.info(
|
||||
"Attempt %s failed. Retrying in %s seconds...",
|
||||
attempt,
|
||||
retry_wait,
|
||||
)
|
||||
time.sleep(retry_wait)
|
||||
else:
|
||||
raise brozzler.VideoExtractorError(
|
||||
"yt-dlp hit unknown error extracting info for %s" % ydl.url
|
||||
)
|
||||
|
||||
logging.info("ytdlp completed successfully")
|
||||
|
||||
_remember_videos(page, ydl.pushed_videos)
|
||||
if worker._using_warcprox(site):
|
||||
info_json = json.dumps(ie_result, sort_keys=True, indent=4)
|
||||
logging.info(
|
||||
"sending WARCPROX_WRITE_RECORD request to warcprox "
|
||||
"with yt-dlp json for %s",
|
||||
ydl.url,
|
||||
)
|
||||
worker._warcprox_write_record(
|
||||
warcprox_address=worker._proxy_for(site),
|
||||
url="youtube-dl:%s" % str(urlcanon.semantic(ydl.url)),
|
||||
warc_type="metadata",
|
||||
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
|
||||
payload=info_json.encode("utf-8"),
|
||||
extra_headers=site.extra_headers(page),
|
||||
)
|
||||
return ie_result
|
||||
|
||||
|
||||
def do_youtube_dl(worker, site, page):
|
||||
|
5
setup.py
5
setup.py
@ -34,7 +34,7 @@ def find_package_data(package):
|
||||
|
||||
setuptools.setup(
|
||||
name="brozzler",
|
||||
version="1.5.54",
|
||||
version="1.5.55",
|
||||
description="Distributed web crawling with browsers",
|
||||
url="https://github.com/internetarchive/brozzler",
|
||||
author="Noah Levitt",
|
||||
@ -77,9 +77,10 @@ setuptools.setup(
|
||||
"jinja2>=2.10",
|
||||
"cryptography>=2.3",
|
||||
"python-magic>=0.4.15",
|
||||
"prometheus-client>=0.20.0",
|
||||
],
|
||||
extras_require={
|
||||
"yt-dlp": ["yt-dlp==2024.7.25"],
|
||||
"yt-dlp": ["yt-dlp>=2024.7.25"],
|
||||
"dashboard": ["flask>=1.0", "gunicorn>=19.8.1"],
|
||||
"easy": [
|
||||
"warcprox>=2.4.31",
|
||||
|
Loading…
x
Reference in New Issue
Block a user