From 18a976f82c493f98a011eb89e9e7442225411679 Mon Sep 17 00:00:00 2001 From: Adam Miller Date: Fri, 30 Aug 2024 21:11:02 +0000 Subject: [PATCH] feat: Detect connection failures forwarded from warcprox and retry them with backoff --- brozzler/__init__.py | 4 ++++ brozzler/browser.py | 2 +- brozzler/frontier.py | 9 ++++++++- brozzler/model.py | 4 ++++ brozzler/worker.py | 27 +++++++++++++++++++-------- 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 5040e69..3c2944d 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -43,6 +43,10 @@ class ProxyError(Exception): pass +class PageConnectionError(Exception): + pass + + class ReachedTimeLimit(Exception): pass diff --git a/brozzler/browser.py b/brozzler/browser.py index ca92ffa..3dfc530 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -596,7 +596,7 @@ class Browser: outlinks = self.extract_outlinks(timeout=extract_outlinks_timeout) if run_behaviors and not skip_visit_hashtags: self.visit_hashtags(final_page_url, hashtags, outlinks) - return final_page_url, outlinks + return final_page_url, outlinks, self.websock_thread.page_status except brozzler.ReachedLimit: # websock_thread has stashed the ReachedLimit exception with # more information, raise that one diff --git a/brozzler/frontier.py b/brozzler/frontier.py index afb2a57..6035967 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -138,7 +138,13 @@ class RethinkDbFrontier: emit=lambda acc, site, new_acc: r.branch( r.and_( r.or_( - site["claimed"].not_(), + r.and_( + site["claimed"].not_(), + r.or_( + site.has_fields("last_disclaimed").not_(), + site["last_disclaimed"].lt(r.now().sub(20)), + ), + ), site["last_claimed"].lt(r.now().sub(60 * 60)), ), r.or_( @@ -218,6 +224,7 @@ class RethinkDbFrontier: index="priority_by_site", ) .order_by(index=r.desc("priority_by_site")) + .filter(lambda page: r.or_(page.has_fields("retry_after").not_(), r.now() > page["retry_after"])) .limit(1) .update( {"claimed": True, "last_claimed_by": worker_id}, return_changes="always" diff --git a/brozzler/model.py b/brozzler/model.py index fe9f8c0..585d69d 100644 --- a/brozzler/model.py +++ b/brozzler/model.py @@ -386,6 +386,10 @@ class Page(doublethink.Document): return hashlib.sha1(digest_this.encode("utf-8")).hexdigest() def populate_defaults(self): + if not "retry_after" in self: + self.retry_after = None + if not "failed_attempts" in self: + self.failed_attempts = 0 if not "hops_from_seed" in self: self.hops_from_seed = 0 if not "hop_path" in self: diff --git a/brozzler/worker.py b/brozzler/worker.py index 479dfa7..05f5bbe 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -21,6 +21,7 @@ limitations under the License. import logging import brozzler import brozzler.browser +import datetime import threading import time import urllib.request @@ -254,10 +255,12 @@ class BrozzlerWorker: else: self.logger.info("needs browsing: %s", page) try: - browser_outlinks = self._browse_page( + browser_outlinks, status_code = self._browse_page( browser, site, page, on_screenshot, on_request ) outlinks.update(browser_outlinks) + if status_code in [502, 504]: + raise brozzler.PageConnectionError() except brozzler.PageInterstitialShown: self.logger.info("page interstitial shown (http auth): %s", page) @@ -391,7 +394,7 @@ class BrozzlerWorker: window_height=self._window_height, window_width=self._window_width, ) - final_page_url, outlinks = browser.browse_page( + final_page_url, outlinks, status_code = browser.browse_page( page.url, extra_headers=site.extra_headers(page), behavior_parameters=site.get("behavior_parameters"), @@ -416,7 +419,7 @@ class BrozzlerWorker: ) if final_page_url != page.url: page.note_redirect(final_page_url) - return outlinks + return outlinks, status_code def _fetch_url(self, site, url=None, page=None): proxies = None @@ -499,11 +502,18 @@ class BrozzlerWorker: # using brozzler-worker --proxy, nothing to do but try the # same proxy again next time logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1) - except: - self.logger.error( - "unexpected exception site=%r page=%r", site, page, exc_info=True - ) + except (brozzler.PageConnectionError, Exception) as e: + if isinstance(e, brozzler.PageConnectionError): + self.logger.error( + "Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r", site, page, exc_info=True + ) + else: + self.logger.error( + "unexpected exception site=%r page=%r", site, page, exc_info=True + ) if page: + retry_delay = min(60, 60 * (1.5 ** page.failed_attempts)) + page.retry_after = doublethink.utcnow() + datetime.timedelta(seconds=retry_delay) page.failed_attempts = (page.failed_attempts or 0) + 1 if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES: self.logger.info( @@ -513,7 +523,8 @@ class BrozzlerWorker: page, ) self._frontier.completed_page(site, page) - page = None + else: + page.save() finally: if start: site.active_brozzling_time = (