From 7b98af7d9f32e4f852453988527f2fa6f81a659b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 1 Aug 2015 00:09:57 +0000 Subject: [PATCH] handle reached limit response from warcprox --- bin/brozzle-page | 5 ++++- brozzler/__init__.py | 15 +++++++++++++++ brozzler/hq.py | 15 +++++++++++---- brozzler/site.py | 19 ++++++++++++++++--- brozzler/worker.py | 14 ++++++++++---- 5 files changed, 56 insertions(+), 12 deletions(-) diff --git a/bin/brozzle-page b/bin/brozzle-page index dd637d8..17bbf3d 100755 --- a/bin/brozzle-page +++ b/bin/brozzle-page @@ -43,4 +43,7 @@ worker = brozzler.BrozzlerWorker() ydl = worker._youtube_dl(site) with brozzler.Browser(chrome_exe=args.chrome_exe) as browser: - worker.brozzle_page(browser, ydl, site, page) + try: + worker.brozzle_page(browser, ydl, site, page) + except brozzler.ReachedLimit as e: + logging.error("reached limit %s", e) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index ffc07cf..bdb52ac 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -1,3 +1,4 @@ +import json as _json from brozzler.browser import Browser, BrowserPool from brozzler.site import Page, Site from brozzler.hq import BrozzlerHQ @@ -15,4 +16,18 @@ version = _read_version() class ShutdownRequested(Exception): pass +class ReachedLimit(Exception): + def __init__(self, http_error): + if "warcprox-meta" in http_error.headers: + self.warcprox_meta = _json.loads(http_error.headers["warcprox-meta"]) + else: + self.warcprox_meta = None + self.http_payload = http_error.read() + + def __repr__(self): + return "ReachedLimit(warcprox_meta={},http_payload={})".format(repr(self.warcprox_meta), repr(self.http_payload)) + + def __str__(self): + return self.__repr__() + # vim: set sw=4 et: diff --git a/brozzler/hq.py b/brozzler/hq.py index 15af4ac..973b4c0 100644 --- a/brozzler/hq.py +++ b/brozzler/hq.py @@ -69,7 +69,11 @@ class BrozzlerHQDb: def update_site(self, site): cursor = self._conn.cursor() - cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id)) + if site.reached_limit: + self.logger.info("setting status=FINISHED_REACHED_LIMIT because site.reached_limit=%s", site.reached_limit) + cursor.execute("update brozzler_sites set status=?, site_json=? where id=?", ("FINISHED_REACHED_LIMIT", site.to_json(), site.id)) + else: + cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id)) self._conn.commit() def schedule_page(self, page, priority=0): @@ -80,7 +84,7 @@ class BrozzlerHQDb: def sites(self): cursor = self._conn.cursor() - cursor.execute("select id, site_json from brozzler_sites where status != 'FINISHED'") + cursor.execute("select id, site_json from brozzler_sites where status not like 'FINISHED%'") while True: row = cursor.fetchone() if row is None: @@ -186,16 +190,19 @@ class BrozzlerHQ: def _disclaimed_site(self): try: msg = self._disclaimed_sites_q.get(block=False) + self.logger.info("msg.payload=%s", msg.payload) site = brozzler.Site(**msg.payload) + self.logger.info("site=%s", site) + self._db.update_site(site) msg.ack() self.logger.info("received disclaimed site {}".format(site)) status = self._db.get_status(site) - if status != "FINISHED": + if not status.startswith("FINISHED"): self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name)) self._unclaimed_sites_q.put(site.to_dict()) else: - self.logger.info("disclaimed site is FINISHED {}".format(site)) + self.logger.info("disclaimed site is %s %s", status, site) except kombu.simple.Empty: pass diff --git a/brozzler/site.py b/brozzler/site.py index 016237e..6adde74 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -3,13 +3,14 @@ import surt import json import logging +import brozzler class Site: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, seed, id=None, scope_surt=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, - enable_warcprox_features=False): + enable_warcprox_features=False, reached_limit=None): self.seed = seed self.id = id self.proxy = proxy @@ -17,6 +18,7 @@ class Site: self.enable_warcprox_features = enable_warcprox_features self.time_limit = time_limit self.extra_headers = extra_headers + self.reached_limit = reached_limit if scope_surt: self.scope_surt = scope_surt @@ -24,8 +26,10 @@ class Site: self.scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(seed)).getURLString(surt=True, trailing_comma=True) def __repr__(self): - return """Site(id={},seed={},scope_surt={},proxy={},enable_warcprox_features={},ignore_robots={},extra_headers={})""".format( - self.id, repr(self.seed), repr(self.scope_surt), repr(self.proxy), self.enable_warcprox_features, self.ignore_robots, self.extra_headers) + return """Site(id={},seed={},scope_surt={},proxy={},enable_warcprox_features={},ignore_robots={},extra_headers={},reached_limit={})""".format( + self.id, repr(self.seed), repr(self.scope_surt), + repr(self.proxy), self.enable_warcprox_features, + self.ignore_robots, self.extra_headers, self.reached_limit) def note_seed_redirect(self, url): new_scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(url)).getURLString(surt=True, trailing_comma=True) @@ -33,6 +37,15 @@ class Site: self.logger.info("changing site scope surt from {} to {}".format(self.scope_surt, new_scope_surt)) self.scope_surt = new_scope_surt + def note_limit_reached(self, e): + self.logger.info("reached_limit e=%s", e) + assert isinstance(e, brozzler.ReachedLimit) + if self.reached_limit and self.reached_limit != e.warcprox_meta["reached-limit"]: + self.logger.warn("reached limit %s but site had already reached limit %s", + e.warcprox_meta["reached-limit"], self.reached_limit) + else: + self.reached_limit = e.warcprox_meta["reached-limit"] + def is_in_scope(self, url): try: hurl = surt.handyurl.parse(url) diff --git a/brozzler/worker.py b/brozzler/worker.py index 826dc63..203b220 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -7,7 +7,6 @@ import threading import time import signal import kombu -import brozzler.hq import youtube_dl import urllib.request import json @@ -102,9 +101,12 @@ class BrozzlerWorker: payload=info_json.encode("utf-8"), extra_headers=site.extra_headers) except BaseException as e: - if hasattr(e, "exc_info") and youtube_dl.utils.UnsupportedError in e.exc_info: + if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError: pass - # elif hasattr(e, "exc_info") and youtube_dl.utils.UnsupportedError in e.exc_info: + elif (hasattr(e, "exc_info") and e.exc_info[0] == + urllib.error.HTTPError and hasattr(e.exc_info[1], "code") + and e.exc_info[1].code == 420): + raise brozzler.ReachedLimit(e.exc_info[1]) else: raise @@ -119,8 +121,10 @@ class BrozzlerWorker: self.logger.info("brozzling {}".format(page)) try: self._try_youtube_dl(ydl, site, page) + except brozzler.ReachedLimit as e: + raise except: - self.logger.error("youtube_dl raised unexpected exception on {}".format(page), exc_info=True) + self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True) page.outlinks = browser.browse_page(page.url, extra_headers=site.extra_headers, @@ -143,6 +147,8 @@ class BrozzlerWorker: pass # except kombu.simple.Empty: # self.logger.info("finished {} (queue is empty)".format(site)) + except brozzler.ReachedLimit as e: + site.note_limit_reached(e) except brozzler.browser.BrowsingAborted: self.logger.info("{} shut down".format(browser)) finally: