feat: Detect connection failures forwarded from warcprox and retry them with backoff

This commit is contained in:
Adam Miller 2024-08-30 21:11:02 +00:00
parent 0d8721a4d3
commit 18a976f82c
5 changed files with 36 additions and 10 deletions

View File

@ -43,6 +43,10 @@ class ProxyError(Exception):
pass
class PageConnectionError(Exception):
pass
class ReachedTimeLimit(Exception):
pass

View File

@ -596,7 +596,7 @@ class Browser:
outlinks = self.extract_outlinks(timeout=extract_outlinks_timeout)
if run_behaviors and not skip_visit_hashtags:
self.visit_hashtags(final_page_url, hashtags, outlinks)
return final_page_url, outlinks
return final_page_url, outlinks, self.websock_thread.page_status
except brozzler.ReachedLimit:
# websock_thread has stashed the ReachedLimit exception with
# more information, raise that one

View File

@ -138,7 +138,13 @@ class RethinkDbFrontier:
emit=lambda acc, site, new_acc: r.branch(
r.and_(
r.or_(
site["claimed"].not_(),
r.and_(
site["claimed"].not_(),
r.or_(
site.has_fields("last_disclaimed").not_(),
site["last_disclaimed"].lt(r.now().sub(20)),
),
),
site["last_claimed"].lt(r.now().sub(60 * 60)),
),
r.or_(
@ -218,6 +224,7 @@ class RethinkDbFrontier:
index="priority_by_site",
)
.order_by(index=r.desc("priority_by_site"))
.filter(lambda page: r.or_(page.has_fields("retry_after").not_(), r.now() > page["retry_after"]))
.limit(1)
.update(
{"claimed": True, "last_claimed_by": worker_id}, return_changes="always"

View File

@ -386,6 +386,10 @@ class Page(doublethink.Document):
return hashlib.sha1(digest_this.encode("utf-8")).hexdigest()
def populate_defaults(self):
if not "retry_after" in self:
self.retry_after = None
if not "failed_attempts" in self:
self.failed_attempts = 0
if not "hops_from_seed" in self:
self.hops_from_seed = 0
if not "hop_path" in self:

View File

@ -21,6 +21,7 @@ limitations under the License.
import logging
import brozzler
import brozzler.browser
import datetime
import threading
import time
import urllib.request
@ -254,10 +255,12 @@ class BrozzlerWorker:
else:
self.logger.info("needs browsing: %s", page)
try:
browser_outlinks = self._browse_page(
browser_outlinks, status_code = self._browse_page(
browser, site, page, on_screenshot, on_request
)
outlinks.update(browser_outlinks)
if status_code in [502, 504]:
raise brozzler.PageConnectionError()
except brozzler.PageInterstitialShown:
self.logger.info("page interstitial shown (http auth): %s", page)
@ -391,7 +394,7 @@ class BrozzlerWorker:
window_height=self._window_height,
window_width=self._window_width,
)
final_page_url, outlinks = browser.browse_page(
final_page_url, outlinks, status_code = browser.browse_page(
page.url,
extra_headers=site.extra_headers(page),
behavior_parameters=site.get("behavior_parameters"),
@ -416,7 +419,7 @@ class BrozzlerWorker:
)
if final_page_url != page.url:
page.note_redirect(final_page_url)
return outlinks
return outlinks, status_code
def _fetch_url(self, site, url=None, page=None):
proxies = None
@ -499,11 +502,18 @@ class BrozzlerWorker:
# using brozzler-worker --proxy, nothing to do but try the
# same proxy again next time
logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1)
except:
self.logger.error(
"unexpected exception site=%r page=%r", site, page, exc_info=True
)
except (brozzler.PageConnectionError, Exception) as e:
if isinstance(e, brozzler.PageConnectionError):
self.logger.error(
"Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r", site, page, exc_info=True
)
else:
self.logger.error(
"unexpected exception site=%r page=%r", site, page, exc_info=True
)
if page:
retry_delay = min(60, 60 * (1.5 ** page.failed_attempts))
page.retry_after = doublethink.utcnow() + datetime.timedelta(seconds=retry_delay)
page.failed_attempts = (page.failed_attempts or 0) + 1
if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES:
self.logger.info(
@ -513,7 +523,8 @@ class BrozzlerWorker:
page,
)
self._frontier.completed_page(site, page)
page = None
else:
page.save()
finally:
if start:
site.active_brozzling_time = (