initial stab at worker

This commit is contained in:
Misty De Méo 2025-02-13 15:39:10 -08:00
parent 69d682beb9
commit 715a1471c0
2 changed files with 67 additions and 70 deletions

View File

@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import logging
import brozzler import brozzler
import brozzler.browser import brozzler.browser
import datetime import datetime
@ -31,6 +30,7 @@ import io
import socket import socket
import random import random
import requests import requests
import structlog
import urllib3 import urllib3
from urllib3.exceptions import TimeoutError, ProxyError from urllib3.exceptions import TimeoutError, ProxyError
import doublethink import doublethink
@ -45,7 +45,7 @@ r = rdb.RethinkDB()
class BrozzlerWorker: class BrozzlerWorker:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger()
# 3⅓ min heartbeat interval => 10 min ttl # 3⅓ min heartbeat interval => 10 min ttl
# This is kind of a long time, because `frontier.claim_sites()`, which runs # This is kind of a long time, because `frontier.claim_sites()`, which runs
@ -174,9 +174,9 @@ class BrozzlerWorker:
site.proxy = "%s:%s" % (svc["host"], svc["port"]) site.proxy = "%s:%s" % (svc["host"], svc["port"])
site.save() site.save()
self.logger.info( self.logger.info(
"chose warcprox instance %r from service registry for %r", "chose warcprox instance from service registry",
site.proxy, instance=site.proxy,
site, registry=site,
) )
return site.proxy return site.proxy
return None return None
@ -223,22 +223,24 @@ class BrozzlerWorker:
request.type = "http" request.type = "http"
request.set_proxy(warcprox_address, "http") request.set_proxy(warcprox_address, "http")
try: try:
with urllib.request.urlopen(request, timeout=600) as response: with urllib.request.urlopen(request, timeout=600) as response:
if response.getcode() != 204: if response.getcode() != 204:
self.logger.warning( self.logger.warning(
'got "%s %s" response on warcprox ' 'got unexpected response on warcprox '
"WARCPROX_WRITE_RECORD request (expected 204)", "WARCPROX_WRITE_RECORD request (expected 204)",
response.getcode(), code=response.getcode(),
response.reason, reason=response.reason,
) )
return request, response return request, response
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
self.logger.warning( self.logger.warning(
'got "%s %s" response on warcprox ' 'got unexpected response on warcprox '
"WARCPROX_WRITE_RECORD request (expected 204)", "WARCPROX_WRITE_RECORD request (expected 204)",
e.getcode(), code=e.getcode(),
e.info(), reason=e.info(),
) )
return request, None return request, None
except urllib.error.URLError as e: except urllib.error.URLError as e:
@ -271,16 +273,17 @@ class BrozzlerWorker:
on_request=None, on_request=None,
enable_youtube_dl=True, enable_youtube_dl=True,
): ):
self.logger.info("brozzling {}".format(page)) page_logger = self.logger.bind(page=page)
page_logger.info("brozzling")
outlinks = set() outlinks = set()
page_headers = self._get_page_headers(site, page) page_headers = self._get_page_headers(site, page)
if not self._needs_browsing(page_headers): if not self._needs_browsing(page_headers):
self.logger.info("needs fetch: %s", page) page_logger.info("needs fetch")
self._fetch_url(site, page=page) self._fetch_url(site, page=page)
else: else:
self.logger.info("needs browsing: %s", page) page_logger.info("needs browsing")
try: try:
browser_outlinks = self._browse_page( browser_outlinks = self._browse_page(
browser, site, page, on_screenshot, on_request browser, site, page, on_screenshot, on_request
@ -290,7 +293,7 @@ class BrozzlerWorker:
if status_code in [502, 504]: if status_code in [502, 504]:
raise brozzler.PageConnectionError() raise brozzler.PageConnectionError()
except brozzler.PageInterstitialShown: except brozzler.PageInterstitialShown:
self.logger.info("page interstitial shown (http auth): %s", page) page_logger.info("page interstitial shown (http auth)")
if enable_youtube_dl and ydl.should_ytdlp( if enable_youtube_dl and ydl.should_ytdlp(
site, page, status_code, self._skip_av_seeds site, page, status_code, self._skip_av_seeds
@ -308,10 +311,7 @@ class BrozzlerWorker:
except brozzler.ProxyError: except brozzler.ProxyError:
raise raise
except brozzler.VideoExtractorError as e: except brozzler.VideoExtractorError as e:
logging.error( self.logger.exception("error extracting video info")
"error extracting video info: %s",
e,
)
except Exception as e: except Exception as e:
if ( if (
hasattr(e, "exc_info") hasattr(e, "exc_info")
@ -320,26 +320,27 @@ class BrozzlerWorker:
and e.exc_info[1].code == 430 and e.exc_info[1].code == 430
): ):
self.logger.info( self.logger.info(
"youtube-dl got %s %s processing %s", "youtube-dl encountered an error",
e.exc_info[1].code, code=e.exc_info[1].code,
e.exc_info[1].msg, message=e.exc_info[1].msg,
page.url, url=page.url,
) )
else: else:
self.logger.error( self.logger.exception(
"youtube_dl raised exception on %s", page, exc_info=True "youtube_dl raised exception", page=page
) )
return outlinks return outlinks
@metrics.brozzler_header_processing_duration_seconds.time() @metrics.brozzler_header_processing_duration_seconds.time()
@metrics.brozzler_in_progress_headers.track_inprogress() @metrics.brozzler_in_progress_headers.track_inprogress()
def _get_page_headers(self, site, page): def _get_page_headers(self, site, page):
url_logger = self.logger.bind(url=page.url)
# bypassing warcprox, requests' stream=True defers downloading the body of the response # bypassing warcprox, requests' stream=True defers downloading the body of the response
# see https://docs.python-requests.org/en/latest/user/advanced/#body-content-workflow # see https://docs.python-requests.org/en/latest/user/advanced/#body-content-workflow
try: try:
user_agent = site.get("user_agent") user_agent = site.get("user_agent")
headers = {"User-Agent": user_agent} if user_agent else {} headers = {"User-Agent": user_agent} if user_agent else {}
self.logger.info("getting page headers for %s", page.url) url_logger.info("getting page headers")
with requests.get( with requests.get(
page.url, page.url,
stream=True, stream=True,
@ -349,11 +350,9 @@ class BrozzlerWorker:
) as r: ) as r:
return r.headers return r.headers
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
self.logger.warning( url_logger.warning("Timed out trying to get headers", exc_info=True)
"Timed out trying to get headers for %s: %s", page.url, e
)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
self.logger.warning("Failed to get headers for %s: %s", page.url, e) url_logger.warning("Failed to get headers", exc_info=True)
return {} return {}
def _needs_browsing(self, page_headers): def _needs_browsing(self, page_headers):
@ -378,10 +377,9 @@ class BrozzlerWorker:
on_screenshot(screenshot_jpeg) on_screenshot(screenshot_jpeg)
if self._using_warcprox(site): if self._using_warcprox(site):
self.logger.info( self.logger.info(
"sending WARCPROX_WRITE_RECORD request to %s with " "sending WARCPROX_WRITE_RECORD request",
"screenshot for %s", proxy=self._proxy_for(site),
self._proxy_for(site), screenshot_for_page=page,
page,
) )
thumbnail_jpeg = self.thumb_jpeg(screenshot_jpeg) thumbnail_jpeg = self.thumb_jpeg(screenshot_jpeg)
self._warcprox_write_record( self._warcprox_write_record(
@ -437,11 +435,13 @@ class BrozzlerWorker:
def _on_service_worker_version_updated(chrome_msg): def _on_service_worker_version_updated(chrome_msg):
# https://github.com/internetarchive/brozzler/issues/140 # https://github.com/internetarchive/brozzler/issues/140
# FIXME: `trace` is a custom logging level, need to port
# this over to structlog.
self.logger.trace("%r", chrome_msg) self.logger.trace("%r", chrome_msg)
if chrome_msg.get("params", {}).get("versions"): if chrome_msg.get("params", {}).get("versions"):
url = chrome_msg.get("params", {}).get("versions")[0].get("scriptURL") url = chrome_msg.get("params", {}).get("versions")[0].get("scriptURL")
if url and url.startswith("http") and url not in sw_fetched: if url and url.startswith("http") and url not in sw_fetched:
self.logger.info("fetching service worker script %s", url) self.logger.info("fetching service worker script", url=url)
self._fetch_url(site, url=url) self._fetch_url(site, url=url)
sw_fetched.add(url) sw_fetched.add(url)
@ -496,7 +496,7 @@ class BrozzlerWorker:
headers = {"User-Agent": user_agent} if user_agent else {} headers = {"User-Agent": user_agent} if user_agent else {}
headers.update(site.extra_headers(page)) headers.update(site.extra_headers(page))
self.logger.info("fetching url %s", url) self.logger.info("fetching url", url=url)
try: try:
# response is ignored # response is ignored
http.request( http.request(
@ -506,17 +506,18 @@ class BrozzlerWorker:
timeout=self.FETCH_URL_TIMEOUT, timeout=self.FETCH_URL_TIMEOUT,
retries=False, retries=False,
) )
self.logger.info("Completed fetching url %s", url) self.logger.info("Completed fetching url", url=url)
except TimeoutError as e: except TimeoutError as e:
self.logger.warning("Timed out fetching %s", url) self.logger.warning("Timed out fetching url", url=url)
raise brozzler.PageConnectionError() from e raise brozzler.PageConnectionError() from e
except ProxyError as e: except ProxyError as e:
raise brozzler.ProxyError("proxy error fetching %s" % url) from e raise brozzler.ProxyError("proxy error fetching %s" % url) from e
except urllib3.exceptions.RequestError as e: except urllib3.exceptions.RequestError as e:
self.logger.warning("Failed to fetch url %s: %s", url, e) self.logger.warning("Failed to fetch url", url=url, exc_info=True)
raise brozzler.PageConnectionError() from e raise brozzler.PageConnectionError() from e
def brozzle_site(self, browser, site): def brozzle_site(self, browser, site):
site_logger = self.logger.bind(site=site)
try: try:
site.last_claimed_by = "%s:%s" % (socket.gethostname(), browser.chrome.port) site.last_claimed_by = "%s:%s" % (socket.gethostname(), browser.chrome.port)
site.save() site.save()
@ -526,9 +527,7 @@ class BrozzlerWorker:
self._frontier.honor_stop_request(site) self._frontier.honor_stop_request(site)
# _proxy_for() call in log statement can raise brozzler.ProxyError # _proxy_for() call in log statement can raise brozzler.ProxyError
# which is why we honor time limit and stop request first☝🏻 # which is why we honor time limit and stop request first☝🏻
self.logger.info( site_logger.info("brozzling site", proxy=self._proxy_for(site))
"brozzling site (proxy=%r) %s", self._proxy_for(site), site
)
while time.time() - start < self.SITE_SESSION_MINUTES * 60: while time.time() - start < self.SITE_SESSION_MINUTES * 60:
site.refresh() site.refresh()
self._frontier.enforce_time_limit(site) self._frontier.enforce_time_limit(site)
@ -556,7 +555,7 @@ class BrozzlerWorker:
except brozzler.ShutdownRequested: except brozzler.ShutdownRequested:
self.logger.info("shutdown requested") self.logger.info("shutdown requested")
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site) site_logger.info("no pages left for site")
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
self._frontier.reached_limit(site, e) self._frontier.reached_limit(site, e)
except brozzler.ReachedTimeLimit as e: except brozzler.ReachedTimeLimit as e:
@ -567,28 +566,25 @@ class BrozzlerWorker:
# self.logger.info("{} shut down".format(browser)) # self.logger.info("{} shut down".format(browser))
except brozzler.ProxyError as e: except brozzler.ProxyError as e:
if self._warcprox_auto: if self._warcprox_auto:
logging.error( self.logger.exception(
"proxy error (site.proxy=%s), will try to choose a " "proxy error, will try to choose a "
"healthy instance next time site is brozzled: %s", "healthy instance next time site is brozzled",
site.proxy, site_proxy=site.proxy,
e,
) )
site.proxy = None site.proxy = None
else: else:
# using brozzler-worker --proxy, nothing to do but try the # using brozzler-worker --proxy, nothing to do but try the
# same proxy again next time # same proxy again next time
logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1) self.logger.exception("proxy error", self_proxy=self._proxy)
except (brozzler.PageConnectionError, Exception) as e: except (brozzler.PageConnectionError, Exception) as e:
if isinstance(e, brozzler.PageConnectionError): if isinstance(e, brozzler.PageConnectionError):
self.logger.error( site_logger.exception(
"Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r", "Page status code possibly indicates connection failure between host and warcprox",
site, page=page,
page,
exc_info=True,
) )
else: else:
self.logger.error( site_logger.exception(
"unexpected exception site=%r page=%r", site, page, exc_info=True "unexpected exception", page=page
) )
if page: if page:
# Calculate backoff in seconds based on number of failed attempts. # Calculate backoff in seconds based on number of failed attempts.
@ -600,10 +596,10 @@ class BrozzlerWorker:
page.failed_attempts = (page.failed_attempts or 0) + 1 page.failed_attempts = (page.failed_attempts or 0) + 1
if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES: if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES:
self.logger.info( self.logger.info(
'marking page "completed" after %s unexpected ' 'marking page "completed" after several unexpected '
"exceptions attempting to brozzle %s", "exceptions attempting to brozzle",
page.failed_attempts, failed_attempts=page.failed_attempts,
page, page=page,
) )
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
page = None page = None
@ -641,13 +637,12 @@ class BrozzlerWorker:
try: try:
self.status_info = self._service_registry.heartbeat(status_info) self.status_info = self._service_registry.heartbeat(status_info)
# FIXME: need to implement trace
self.logger.trace("status in service registry: %s", self.status_info) self.logger.trace("status in service registry: %s", self.status_info)
except r.ReqlError as e: except r.ReqlError as e:
self.logger.error( self.logger.exception(
"failed to send heartbeat and update service registry " "failed to send heartbeat and update service registry",
"with info %s: %s", info=status_info,
status_info,
e,
) )
def _service_heartbeat_if_due(self): def _service_heartbeat_if_due(self):
@ -695,6 +690,7 @@ class BrozzlerWorker:
self._browser_pool.release(browsers[i]) self._browser_pool.release(browsers[i])
def run(self): def run(self):
# FIXME: need to implement notice
self.logger.notice( self.logger.notice(
"brozzler %s - brozzler-worker starting", brozzler.__version__ "brozzler %s - brozzler-worker starting", brozzler.__version__
) )
@ -717,8 +713,8 @@ class BrozzlerWorker:
self.logger.notice("shutdown requested") self.logger.notice("shutdown requested")
except r.ReqlError as e: except r.ReqlError as e:
self.logger.error( self.logger.exception(
"caught rethinkdb exception, will try to proceed", exc_info=True "caught rethinkdb exception, will try to proceed"
) )
except brozzler.ShutdownRequested: except brozzler.ShutdownRequested:
self.logger.info("shutdown requested") self.logger.info("shutdown requested")
@ -731,12 +727,12 @@ class BrozzlerWorker:
try: try:
self._service_registry.unregister(self.status_info["id"]) self._service_registry.unregister(self.status_info["id"])
except: except:
self.logger.error( self.logger.exception(
"failed to unregister from service registry", exc_info=True "failed to unregister from service registry"
) )
self.logger.info( self.logger.info(
"shutting down %s brozzling threads", len(self._browsing_threads) "shutting down brozzling threads", thread_count=len(self._browsing_threads)
) )
with self._browsing_threads_lock: with self._browsing_threads_lock:
for th in self._browsing_threads: for th in self._browsing_threads:

View File

@ -76,6 +76,7 @@ setuptools.setup(
"cryptography>=2.3", "cryptography>=2.3",
"python-magic>=0.4.15", "python-magic>=0.4.15",
"prometheus-client>=0.20.0", "prometheus-client>=0.20.0",
"structlog>=25.1.0"
], ],
extras_require={ extras_require={
"yt-dlp": ["yt-dlp>=2024.7.25"], "yt-dlp": ["yt-dlp>=2024.7.25"],