diff --git a/brozzler/worker.py b/brozzler/worker.py index ad1a993..b16a70a 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -import logging import brozzler import brozzler.browser import datetime @@ -31,6 +30,7 @@ import io import socket import random import requests +import structlog import urllib3 from urllib3.exceptions import TimeoutError, ProxyError import doublethink @@ -45,7 +45,7 @@ r = rdb.RethinkDB() class BrozzlerWorker: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger() # 3⅓ min heartbeat interval => 10 min ttl # This is kind of a long time, because `frontier.claim_sites()`, which runs @@ -174,9 +174,9 @@ class BrozzlerWorker: site.proxy = "%s:%s" % (svc["host"], svc["port"]) site.save() self.logger.info( - "chose warcprox instance %r from service registry for %r", - site.proxy, - site, + "chose warcprox instance from service registry", + instance=site.proxy, + registry=site, ) return site.proxy return None @@ -223,22 +223,24 @@ class BrozzlerWorker: request.type = "http" request.set_proxy(warcprox_address, "http") + + try: with urllib.request.urlopen(request, timeout=600) as response: if response.getcode() != 204: self.logger.warning( - 'got "%s %s" response on warcprox ' + 'got unexpected response on warcprox ' "WARCPROX_WRITE_RECORD request (expected 204)", - response.getcode(), - response.reason, + code=response.getcode(), + reason=response.reason, ) return request, response except urllib.error.HTTPError as e: self.logger.warning( - 'got "%s %s" response on warcprox ' + 'got unexpected response on warcprox ' "WARCPROX_WRITE_RECORD request (expected 204)", - e.getcode(), - e.info(), + code=e.getcode(), + reason=e.info(), ) return request, None except urllib.error.URLError as e: @@ -271,16 +273,17 @@ class BrozzlerWorker: on_request=None, enable_youtube_dl=True, ): - self.logger.info("brozzling {}".format(page)) + page_logger = self.logger.bind(page=page) + page_logger.info("brozzling") outlinks = set() page_headers = self._get_page_headers(site, page) if not self._needs_browsing(page_headers): - self.logger.info("needs fetch: %s", page) + page_logger.info("needs fetch") self._fetch_url(site, page=page) else: - self.logger.info("needs browsing: %s", page) + page_logger.info("needs browsing") try: browser_outlinks = self._browse_page( browser, site, page, on_screenshot, on_request @@ -290,7 +293,7 @@ class BrozzlerWorker: if status_code in [502, 504]: raise brozzler.PageConnectionError() except brozzler.PageInterstitialShown: - self.logger.info("page interstitial shown (http auth): %s", page) + page_logger.info("page interstitial shown (http auth)") if enable_youtube_dl and ydl.should_ytdlp( site, page, status_code, self._skip_av_seeds @@ -308,10 +311,7 @@ class BrozzlerWorker: except brozzler.ProxyError: raise except brozzler.VideoExtractorError as e: - logging.error( - "error extracting video info: %s", - e, - ) + self.logger.exception("error extracting video info") except Exception as e: if ( hasattr(e, "exc_info") @@ -320,26 +320,27 @@ class BrozzlerWorker: and e.exc_info[1].code == 430 ): self.logger.info( - "youtube-dl got %s %s processing %s", - e.exc_info[1].code, - e.exc_info[1].msg, - page.url, + "youtube-dl encountered an error", + code=e.exc_info[1].code, + message=e.exc_info[1].msg, + url=page.url, ) else: - self.logger.error( - "youtube_dl raised exception on %s", page, exc_info=True + self.logger.exception( + "youtube_dl raised exception", page=page ) return outlinks @metrics.brozzler_header_processing_duration_seconds.time() @metrics.brozzler_in_progress_headers.track_inprogress() def _get_page_headers(self, site, page): + url_logger = self.logger.bind(url=page.url) # bypassing warcprox, requests' stream=True defers downloading the body of the response # see https://docs.python-requests.org/en/latest/user/advanced/#body-content-workflow try: user_agent = site.get("user_agent") headers = {"User-Agent": user_agent} if user_agent else {} - self.logger.info("getting page headers for %s", page.url) + url_logger.info("getting page headers") with requests.get( page.url, stream=True, @@ -349,11 +350,9 @@ class BrozzlerWorker: ) as r: return r.headers except requests.exceptions.Timeout as e: - self.logger.warning( - "Timed out trying to get headers for %s: %s", page.url, e - ) + url_logger.warning("Timed out trying to get headers", exc_info=True) except requests.exceptions.RequestException as e: - self.logger.warning("Failed to get headers for %s: %s", page.url, e) + url_logger.warning("Failed to get headers", exc_info=True) return {} def _needs_browsing(self, page_headers): @@ -378,10 +377,9 @@ class BrozzlerWorker: on_screenshot(screenshot_jpeg) if self._using_warcprox(site): self.logger.info( - "sending WARCPROX_WRITE_RECORD request to %s with " - "screenshot for %s", - self._proxy_for(site), - page, + "sending WARCPROX_WRITE_RECORD request", + proxy=self._proxy_for(site), + screenshot_for_page=page, ) thumbnail_jpeg = self.thumb_jpeg(screenshot_jpeg) self._warcprox_write_record( @@ -437,11 +435,13 @@ class BrozzlerWorker: def _on_service_worker_version_updated(chrome_msg): # https://github.com/internetarchive/brozzler/issues/140 + # FIXME: `trace` is a custom logging level, need to port + # this over to structlog. self.logger.trace("%r", chrome_msg) if chrome_msg.get("params", {}).get("versions"): url = chrome_msg.get("params", {}).get("versions")[0].get("scriptURL") if url and url.startswith("http") and url not in sw_fetched: - self.logger.info("fetching service worker script %s", url) + self.logger.info("fetching service worker script", url=url) self._fetch_url(site, url=url) sw_fetched.add(url) @@ -496,7 +496,7 @@ class BrozzlerWorker: headers = {"User-Agent": user_agent} if user_agent else {} headers.update(site.extra_headers(page)) - self.logger.info("fetching url %s", url) + self.logger.info("fetching url", url=url) try: # response is ignored http.request( @@ -506,17 +506,18 @@ class BrozzlerWorker: timeout=self.FETCH_URL_TIMEOUT, retries=False, ) - self.logger.info("Completed fetching url %s", url) + self.logger.info("Completed fetching url", url=url) except TimeoutError as e: - self.logger.warning("Timed out fetching %s", url) + self.logger.warning("Timed out fetching url", url=url) raise brozzler.PageConnectionError() from e except ProxyError as e: raise brozzler.ProxyError("proxy error fetching %s" % url) from e except urllib3.exceptions.RequestError as e: - self.logger.warning("Failed to fetch url %s: %s", url, e) + self.logger.warning("Failed to fetch url", url=url, exc_info=True) raise brozzler.PageConnectionError() from e def brozzle_site(self, browser, site): + site_logger = self.logger.bind(site=site) try: site.last_claimed_by = "%s:%s" % (socket.gethostname(), browser.chrome.port) site.save() @@ -526,9 +527,7 @@ class BrozzlerWorker: self._frontier.honor_stop_request(site) # _proxy_for() call in log statement can raise brozzler.ProxyError # which is why we honor time limit and stop request first☝🏻 - self.logger.info( - "brozzling site (proxy=%r) %s", self._proxy_for(site), site - ) + site_logger.info("brozzling site", proxy=self._proxy_for(site)) while time.time() - start < self.SITE_SESSION_MINUTES * 60: site.refresh() self._frontier.enforce_time_limit(site) @@ -556,7 +555,7 @@ class BrozzlerWorker: except brozzler.ShutdownRequested: self.logger.info("shutdown requested") except brozzler.NothingToClaim: - self.logger.info("no pages left for site %s", site) + site_logger.info("no pages left for site") except brozzler.ReachedLimit as e: self._frontier.reached_limit(site, e) except brozzler.ReachedTimeLimit as e: @@ -567,28 +566,25 @@ class BrozzlerWorker: # self.logger.info("{} shut down".format(browser)) except brozzler.ProxyError as e: if self._warcprox_auto: - logging.error( - "proxy error (site.proxy=%s), will try to choose a " - "healthy instance next time site is brozzled: %s", - site.proxy, - e, + self.logger.exception( + "proxy error, will try to choose a " + "healthy instance next time site is brozzled", + site_proxy=site.proxy, ) site.proxy = None else: # using brozzler-worker --proxy, nothing to do but try the # same proxy again next time - logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1) + self.logger.exception("proxy error", self_proxy=self._proxy) except (brozzler.PageConnectionError, Exception) as e: if isinstance(e, brozzler.PageConnectionError): - self.logger.error( - "Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r", - site, - page, - exc_info=True, + site_logger.exception( + "Page status code possibly indicates connection failure between host and warcprox", + page=page, ) else: - self.logger.error( - "unexpected exception site=%r page=%r", site, page, exc_info=True + site_logger.exception( + "unexpected exception", page=page ) if page: # Calculate backoff in seconds based on number of failed attempts. @@ -600,10 +596,10 @@ class BrozzlerWorker: page.failed_attempts = (page.failed_attempts or 0) + 1 if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES: self.logger.info( - 'marking page "completed" after %s unexpected ' - "exceptions attempting to brozzle %s", - page.failed_attempts, - page, + 'marking page "completed" after several unexpected ' + "exceptions attempting to brozzle", + failed_attempts=page.failed_attempts, + page=page, ) self._frontier.completed_page(site, page) page = None @@ -641,13 +637,12 @@ class BrozzlerWorker: try: self.status_info = self._service_registry.heartbeat(status_info) + # FIXME: need to implement trace self.logger.trace("status in service registry: %s", self.status_info) except r.ReqlError as e: - self.logger.error( - "failed to send heartbeat and update service registry " - "with info %s: %s", - status_info, - e, + self.logger.exception( + "failed to send heartbeat and update service registry", + info=status_info, ) def _service_heartbeat_if_due(self): @@ -695,6 +690,7 @@ class BrozzlerWorker: self._browser_pool.release(browsers[i]) def run(self): + # FIXME: need to implement notice self.logger.notice( "brozzler %s - brozzler-worker starting", brozzler.__version__ ) @@ -717,8 +713,8 @@ class BrozzlerWorker: self.logger.notice("shutdown requested") except r.ReqlError as e: - self.logger.error( - "caught rethinkdb exception, will try to proceed", exc_info=True + self.logger.exception( + "caught rethinkdb exception, will try to proceed" ) except brozzler.ShutdownRequested: self.logger.info("shutdown requested") @@ -731,12 +727,12 @@ class BrozzlerWorker: try: self._service_registry.unregister(self.status_info["id"]) except: - self.logger.error( - "failed to unregister from service registry", exc_info=True + self.logger.exception( + "failed to unregister from service registry" ) self.logger.info( - "shutting down %s brozzling threads", len(self._browsing_threads) + "shutting down brozzling threads", thread_count=len(self._browsing_threads) ) with self._browsing_threads_lock: for th in self._browsing_threads: diff --git a/setup.py b/setup.py index 808ba87..e58d7b7 100644 --- a/setup.py +++ b/setup.py @@ -76,6 +76,7 @@ setuptools.setup( "cryptography>=2.3", "python-magic>=0.4.15", "prometheus-client>=0.20.0", + "structlog>=25.1.0" ], extras_require={ "yt-dlp": ["yt-dlp>=2024.7.25"],