Merge pull request #285 from internetarchive/adam/connection_failure_retry

feat: Detect connection failures forwarded from warcprox and retry th…
This commit is contained in:
Adam Miller 2024-12-04 15:38:28 -08:00 committed by GitHub
commit 18d3c8a697
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 46 additions and 6 deletions

View File

@ -47,6 +47,10 @@ class ProxyError(Exception):
pass pass
class PageConnectionError(Exception):
pass
class ReachedTimeLimit(Exception): class ReachedTimeLimit(Exception):
pass pass

View File

@ -138,7 +138,14 @@ class RethinkDbFrontier:
emit=lambda acc, site, new_acc: r.branch( emit=lambda acc, site, new_acc: r.branch(
r.and_( r.and_(
r.or_( r.or_(
site["claimed"].not_(), # Avoid tight loop when unclaimed site was recently disclaimed
r.and_(
site["claimed"].not_(),
r.or_(
site.has_fields("last_disclaimed").not_(),
site["last_disclaimed"].lt(r.now().sub(20)),
),
),
site["last_claimed"].lt(r.now().sub(60 * 60)), site["last_claimed"].lt(r.now().sub(60 * 60)),
), ),
r.or_( r.or_(
@ -218,6 +225,11 @@ class RethinkDbFrontier:
index="priority_by_site", index="priority_by_site",
) )
.order_by(index=r.desc("priority_by_site")) .order_by(index=r.desc("priority_by_site"))
.filter(
lambda page: r.or_(
page.has_fields("retry_after").not_(), r.now() > page["retry_after"]
)
)
.limit(1) .limit(1)
.update( .update(
{"claimed": True, "last_claimed_by": worker_id}, return_changes="always" {"claimed": True, "last_claimed_by": worker_id}, return_changes="always"

View File

@ -386,6 +386,10 @@ class Page(doublethink.Document):
return hashlib.sha1(digest_this.encode("utf-8")).hexdigest() return hashlib.sha1(digest_this.encode("utf-8")).hexdigest()
def populate_defaults(self): def populate_defaults(self):
if not "retry_after" in self:
self.retry_after = None
if not "failed_attempts" in self:
self.failed_attempts = 0
if not "hops_from_seed" in self: if not "hops_from_seed" in self:
self.hops_from_seed = 0 self.hops_from_seed = 0
if not "hop_path" in self: if not "hop_path" in self:

View File

@ -21,6 +21,7 @@ limitations under the License.
import logging import logging
import brozzler import brozzler
import brozzler.browser import brozzler.browser
import datetime
import threading import threading
import time import time
import urllib.request import urllib.request
@ -277,11 +278,14 @@ class BrozzlerWorker:
browser, site, page, on_screenshot, on_request browser, site, page, on_screenshot, on_request
) )
outlinks.update(browser_outlinks) outlinks.update(browser_outlinks)
status_code = browser.websock_thread.page_status
if status_code in [502, 504]:
raise brozzler.PageConnectionError()
except brozzler.PageInterstitialShown: except brozzler.PageInterstitialShown:
self.logger.info("page interstitial shown (http auth): %s", page) self.logger.info("page interstitial shown (http auth): %s", page)
if enable_youtube_dl and ydl.should_ytdlp( if enable_youtube_dl and ydl.should_ytdlp(
site, page, browser.websock_thread.page_status, self._skip_av_seeds site, page, status_code, self._skip_av_seeds
): ):
try: try:
ydl_outlinks = ydl.do_youtube_dl(self, site, page) ydl_outlinks = ydl.do_youtube_dl(self, site, page)
@ -535,11 +539,25 @@ class BrozzlerWorker:
# using brozzler-worker --proxy, nothing to do but try the # using brozzler-worker --proxy, nothing to do but try the
# same proxy again next time # same proxy again next time
logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1) logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1)
except: except (brozzler.PageConnectionError, Exception) as e:
self.logger.error( if isinstance(e, brozzler.PageConnectionError):
"unexpected exception site=%r page=%r", site, page, exc_info=True self.logger.error(
) "Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r",
site,
page,
exc_info=True,
)
else:
self.logger.error(
"unexpected exception site=%r page=%r", site, page, exc_info=True
)
if page: if page:
# Calculate backoff in seconds based on number of failed attempts.
# Minimum of 60, max of 135 giving delays of 60, 90, 135, 135...
retry_delay = min(135, 60 * (1.5**page.failed_attempts))
page.retry_after = doublethink.utcnow() + datetime.timedelta(
seconds=retry_delay
)
page.failed_attempts = (page.failed_attempts or 0) + 1 page.failed_attempts = (page.failed_attempts or 0) + 1
if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES: if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES:
self.logger.info( self.logger.info(
@ -550,6 +568,8 @@ class BrozzlerWorker:
) )
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
page = None page = None
else:
page.save()
finally: finally:
if start: if start:
site.active_brozzling_time = ( site.active_brozzling_time = (