mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-06-09 07:33:14 -04:00
handle reached limit response from warcprox
This commit is contained in:
parent
11fbbc9d49
commit
7b98af7d9f
5 changed files with 56 additions and 12 deletions
|
@ -43,4 +43,7 @@ worker = brozzler.BrozzlerWorker()
|
||||||
ydl = worker._youtube_dl(site)
|
ydl = worker._youtube_dl(site)
|
||||||
|
|
||||||
with brozzler.Browser(chrome_exe=args.chrome_exe) as browser:
|
with brozzler.Browser(chrome_exe=args.chrome_exe) as browser:
|
||||||
worker.brozzle_page(browser, ydl, site, page)
|
try:
|
||||||
|
worker.brozzle_page(browser, ydl, site, page)
|
||||||
|
except brozzler.ReachedLimit as e:
|
||||||
|
logging.error("reached limit %s", e)
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import json as _json
|
||||||
from brozzler.browser import Browser, BrowserPool
|
from brozzler.browser import Browser, BrowserPool
|
||||||
from brozzler.site import Page, Site
|
from brozzler.site import Page, Site
|
||||||
from brozzler.hq import BrozzlerHQ
|
from brozzler.hq import BrozzlerHQ
|
||||||
|
@ -15,4 +16,18 @@ version = _read_version()
|
||||||
class ShutdownRequested(Exception):
|
class ShutdownRequested(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class ReachedLimit(Exception):
|
||||||
|
def __init__(self, http_error):
|
||||||
|
if "warcprox-meta" in http_error.headers:
|
||||||
|
self.warcprox_meta = _json.loads(http_error.headers["warcprox-meta"])
|
||||||
|
else:
|
||||||
|
self.warcprox_meta = None
|
||||||
|
self.http_payload = http_error.read()
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "ReachedLimit(warcprox_meta={},http_payload={})".format(repr(self.warcprox_meta), repr(self.http_payload))
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.__repr__()
|
||||||
|
|
||||||
# vim: set sw=4 et:
|
# vim: set sw=4 et:
|
||||||
|
|
|
@ -69,7 +69,11 @@ class BrozzlerHQDb:
|
||||||
|
|
||||||
def update_site(self, site):
|
def update_site(self, site):
|
||||||
cursor = self._conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id))
|
if site.reached_limit:
|
||||||
|
self.logger.info("setting status=FINISHED_REACHED_LIMIT because site.reached_limit=%s", site.reached_limit)
|
||||||
|
cursor.execute("update brozzler_sites set status=?, site_json=? where id=?", ("FINISHED_REACHED_LIMIT", site.to_json(), site.id))
|
||||||
|
else:
|
||||||
|
cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id))
|
||||||
self._conn.commit()
|
self._conn.commit()
|
||||||
|
|
||||||
def schedule_page(self, page, priority=0):
|
def schedule_page(self, page, priority=0):
|
||||||
|
@ -80,7 +84,7 @@ class BrozzlerHQDb:
|
||||||
|
|
||||||
def sites(self):
|
def sites(self):
|
||||||
cursor = self._conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
cursor.execute("select id, site_json from brozzler_sites where status != 'FINISHED'")
|
cursor.execute("select id, site_json from brozzler_sites where status not like 'FINISHED%'")
|
||||||
while True:
|
while True:
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
if row is None:
|
if row is None:
|
||||||
|
@ -186,16 +190,19 @@ class BrozzlerHQ:
|
||||||
def _disclaimed_site(self):
|
def _disclaimed_site(self):
|
||||||
try:
|
try:
|
||||||
msg = self._disclaimed_sites_q.get(block=False)
|
msg = self._disclaimed_sites_q.get(block=False)
|
||||||
|
self.logger.info("msg.payload=%s", msg.payload)
|
||||||
site = brozzler.Site(**msg.payload)
|
site = brozzler.Site(**msg.payload)
|
||||||
|
self.logger.info("site=%s", site)
|
||||||
|
self._db.update_site(site)
|
||||||
msg.ack()
|
msg.ack()
|
||||||
self.logger.info("received disclaimed site {}".format(site))
|
self.logger.info("received disclaimed site {}".format(site))
|
||||||
|
|
||||||
status = self._db.get_status(site)
|
status = self._db.get_status(site)
|
||||||
if status != "FINISHED":
|
if not status.startswith("FINISHED"):
|
||||||
self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name))
|
self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name))
|
||||||
self._unclaimed_sites_q.put(site.to_dict())
|
self._unclaimed_sites_q.put(site.to_dict())
|
||||||
else:
|
else:
|
||||||
self.logger.info("disclaimed site is FINISHED {}".format(site))
|
self.logger.info("disclaimed site is %s %s", status, site)
|
||||||
except kombu.simple.Empty:
|
except kombu.simple.Empty:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -3,13 +3,14 @@
|
||||||
import surt
|
import surt
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import brozzler
|
||||||
|
|
||||||
class Site:
|
class Site:
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
|
|
||||||
def __init__(self, seed, id=None, scope_surt=None, proxy=None,
|
def __init__(self, seed, id=None, scope_surt=None, proxy=None,
|
||||||
ignore_robots=False, time_limit=None, extra_headers=None,
|
ignore_robots=False, time_limit=None, extra_headers=None,
|
||||||
enable_warcprox_features=False):
|
enable_warcprox_features=False, reached_limit=None):
|
||||||
self.seed = seed
|
self.seed = seed
|
||||||
self.id = id
|
self.id = id
|
||||||
self.proxy = proxy
|
self.proxy = proxy
|
||||||
|
@ -17,6 +18,7 @@ class Site:
|
||||||
self.enable_warcprox_features = enable_warcprox_features
|
self.enable_warcprox_features = enable_warcprox_features
|
||||||
self.time_limit = time_limit
|
self.time_limit = time_limit
|
||||||
self.extra_headers = extra_headers
|
self.extra_headers = extra_headers
|
||||||
|
self.reached_limit = reached_limit
|
||||||
|
|
||||||
if scope_surt:
|
if scope_surt:
|
||||||
self.scope_surt = scope_surt
|
self.scope_surt = scope_surt
|
||||||
|
@ -24,8 +26,10 @@ class Site:
|
||||||
self.scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(seed)).getURLString(surt=True, trailing_comma=True)
|
self.scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(seed)).getURLString(surt=True, trailing_comma=True)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return """Site(id={},seed={},scope_surt={},proxy={},enable_warcprox_features={},ignore_robots={},extra_headers={})""".format(
|
return """Site(id={},seed={},scope_surt={},proxy={},enable_warcprox_features={},ignore_robots={},extra_headers={},reached_limit={})""".format(
|
||||||
self.id, repr(self.seed), repr(self.scope_surt), repr(self.proxy), self.enable_warcprox_features, self.ignore_robots, self.extra_headers)
|
self.id, repr(self.seed), repr(self.scope_surt),
|
||||||
|
repr(self.proxy), self.enable_warcprox_features,
|
||||||
|
self.ignore_robots, self.extra_headers, self.reached_limit)
|
||||||
|
|
||||||
def note_seed_redirect(self, url):
|
def note_seed_redirect(self, url):
|
||||||
new_scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(url)).getURLString(surt=True, trailing_comma=True)
|
new_scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(url)).getURLString(surt=True, trailing_comma=True)
|
||||||
|
@ -33,6 +37,15 @@ class Site:
|
||||||
self.logger.info("changing site scope surt from {} to {}".format(self.scope_surt, new_scope_surt))
|
self.logger.info("changing site scope surt from {} to {}".format(self.scope_surt, new_scope_surt))
|
||||||
self.scope_surt = new_scope_surt
|
self.scope_surt = new_scope_surt
|
||||||
|
|
||||||
|
def note_limit_reached(self, e):
|
||||||
|
self.logger.info("reached_limit e=%s", e)
|
||||||
|
assert isinstance(e, brozzler.ReachedLimit)
|
||||||
|
if self.reached_limit and self.reached_limit != e.warcprox_meta["reached-limit"]:
|
||||||
|
self.logger.warn("reached limit %s but site had already reached limit %s",
|
||||||
|
e.warcprox_meta["reached-limit"], self.reached_limit)
|
||||||
|
else:
|
||||||
|
self.reached_limit = e.warcprox_meta["reached-limit"]
|
||||||
|
|
||||||
def is_in_scope(self, url):
|
def is_in_scope(self, url):
|
||||||
try:
|
try:
|
||||||
hurl = surt.handyurl.parse(url)
|
hurl = surt.handyurl.parse(url)
|
||||||
|
|
|
@ -7,7 +7,6 @@ import threading
|
||||||
import time
|
import time
|
||||||
import signal
|
import signal
|
||||||
import kombu
|
import kombu
|
||||||
import brozzler.hq
|
|
||||||
import youtube_dl
|
import youtube_dl
|
||||||
import urllib.request
|
import urllib.request
|
||||||
import json
|
import json
|
||||||
|
@ -102,9 +101,12 @@ class BrozzlerWorker:
|
||||||
payload=info_json.encode("utf-8"),
|
payload=info_json.encode("utf-8"),
|
||||||
extra_headers=site.extra_headers)
|
extra_headers=site.extra_headers)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
if hasattr(e, "exc_info") and youtube_dl.utils.UnsupportedError in e.exc_info:
|
if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError:
|
||||||
pass
|
pass
|
||||||
# elif hasattr(e, "exc_info") and youtube_dl.utils.UnsupportedError in e.exc_info:
|
elif (hasattr(e, "exc_info") and e.exc_info[0] ==
|
||||||
|
urllib.error.HTTPError and hasattr(e.exc_info[1], "code")
|
||||||
|
and e.exc_info[1].code == 420):
|
||||||
|
raise brozzler.ReachedLimit(e.exc_info[1])
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -119,8 +121,10 @@ class BrozzlerWorker:
|
||||||
self.logger.info("brozzling {}".format(page))
|
self.logger.info("brozzling {}".format(page))
|
||||||
try:
|
try:
|
||||||
self._try_youtube_dl(ydl, site, page)
|
self._try_youtube_dl(ydl, site, page)
|
||||||
|
except brozzler.ReachedLimit as e:
|
||||||
|
raise
|
||||||
except:
|
except:
|
||||||
self.logger.error("youtube_dl raised unexpected exception on {}".format(page), exc_info=True)
|
self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True)
|
||||||
|
|
||||||
page.outlinks = browser.browse_page(page.url,
|
page.outlinks = browser.browse_page(page.url,
|
||||||
extra_headers=site.extra_headers,
|
extra_headers=site.extra_headers,
|
||||||
|
@ -143,6 +147,8 @@ class BrozzlerWorker:
|
||||||
pass
|
pass
|
||||||
# except kombu.simple.Empty:
|
# except kombu.simple.Empty:
|
||||||
# self.logger.info("finished {} (queue is empty)".format(site))
|
# self.logger.info("finished {} (queue is empty)".format(site))
|
||||||
|
except brozzler.ReachedLimit as e:
|
||||||
|
site.note_limit_reached(e)
|
||||||
except brozzler.browser.BrowsingAborted:
|
except brozzler.browser.BrowsingAborted:
|
||||||
self.logger.info("{} shut down".format(browser))
|
self.logger.info("{} shut down".format(browser))
|
||||||
finally:
|
finally:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue