update scope if seed redirects

This commit is contained in:
Noah Levitt 2015-07-16 18:27:47 -07:00
parent 140a441eb5
commit d2650a2547
4 changed files with 61 additions and 25 deletions

View File

@ -110,7 +110,7 @@ class Browser:
def abort_browse_page(self):
self._abort_browse_page = True
def browse_page(self, url, on_request=None, on_screenshot=None):
def browse_page(self, url, on_request=None, on_screenshot=None, on_url_change=None):
"""Synchronously loads a page, takes a screenshot, and runs behaviors.
Raises BrowsingException if browsing the page fails in a non-critical
@ -127,6 +127,9 @@ class Browser:
self._waiting_on_outlinks_msg_id = None
self._outlinks = None
self.on_url_change = on_url_change
self._waiting_on_document_url_msg_id = None
self._websock = websocket.WebSocketApp(self._websocket_url,
on_open=self._visit_page, on_message=self._handle_message)
@ -214,6 +217,7 @@ class Browser:
elif "method" in message and message["method"] == "Page.loadEventFired":
self.logger.info("Page.loadEventFired, requesting screenshot url={} message={}".format(self.url, message))
self._waiting_on_screenshot_msg_id = self.send_to_chrome(method="Page.captureScreenshot")
self._waiting_on_document_url_msg_id = self.send_to_chrome(method="Runtime.evaluate", params={"expression":"document.URL"})
elif "method" in message and message["method"] == "Console.messageAdded":
self.logger.debug("{} console.{} {}".format(websock.url,
message["params"]["message"]["level"],
@ -242,6 +246,11 @@ class Browser:
self.logger.debug("got outlinks message={}".format(message))
# {'result': {'wasThrown': False, 'result': {'value': 'https://archive-it.org/cgi-bin/dedup-test/change_every_second https://archive-it.org/cgi-bin/dedup-test/change_every_minute https://archive-it.org/cgi-bin/dedup-test/change_every_10minutes https://archive-it.org/cgi-bin/dedup-test/change_every_hour https://archive-it.org/cgi-bin/dedup-test/change_every_day https://archive-it.org/cgi-bin/dedup-test/change_every_month https://archive-it.org/cgi-bin/dedup-test/change_every_year https://archive-it.org/cgi-bin/dedup-test/change_never http://validator.w3.org/check?uri=referer', 'type': 'string'}}, 'id': 32}
self._outlinks = frozenset(message["result"]["result"]["value"].split(" "))
elif message["id"] == self._waiting_on_document_url_msg_id:
if message["result"]["result"]["value"] != self.url:
if self.on_url_change:
self.on_url_change(message["result"]["result"]["value"])
self._waiting_on_document_url_msg_id = None
elif self._behavior and self._behavior.is_waiting_on_result(message["id"]):
self._behavior.notify_of_result(message)
# elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"):

View File

@ -63,6 +63,11 @@ class BrozzlerHQDb:
self._conn.commit()
return cursor.lastrowid
def update_site(self, site):
cursor = self._conn.cursor()
cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id))
self._conn.commit()
def schedule_url(self, crawl_url, priority=0):
cursor = self._conn.cursor()
cursor.execute("insert into brozzler_urls (site_id, priority, canon_url, crawl_url_json, in_progress) values (?, ?, ?, ?, 0)",
@ -176,6 +181,9 @@ class BrozzlerHQ:
completed_url = brozzler.CrawlUrl(**msg.payload)
msg.ack()
self._db.completed(completed_url)
if completed_url.redirect_url and completed_url.hops_from_seed == 0:
site.note_seed_redirect(completed_url.redirect_url)
self._db.update_site(site)
self._scope_and_schedule_outlinks(site, completed_url)
except kombu.simple.Empty:
pass

View File

@ -31,6 +31,16 @@ class Site:
req_sesh.proxies = {"http":proxie,"https":proxie}
self._robots_cache = reppy.cache.RobotsCache(session=req_sesh)
def __repr__(self):
return """Site(seed="{}",scope_surt="{}",proxy="{}",enable_warcprox_features={},ignore_robots={})""".format(
self.seed, self.scope_surt, self.proxy, self.enable_warcprox_features, self.ignore_robots)
def note_seed_redirect(self, url):
new_scope_surt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
if not new_scope_surt.startswith(self.scope_surt):
self.logger.info("changing site scope surt from {} to {}".format(self.scope_surt, new_scope_surt))
self.scope_surt = new_scope_surt
def is_permitted_by_robots(self, url):
return self.ignore_robots or self._robots_cache.allowed(url, "brozzler")
@ -53,18 +63,22 @@ class Site:
return json.dumps(self.to_dict(), separators=(',', ':'))
class CrawlUrl:
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, outlinks=None):
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, outlinks=None, redirect_url=None):
self.id = id
self.site_id = site_id
self.url = url
self.hops_from_seed = hops_from_seed
self._canon_hurl = None
self.outlinks = outlinks
self.redirect_url = redirect_url
def __repr__(self):
return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format(
self.url, self.site_id, self.hops_from_seed)
def note_redirect(self, url):
self.redirect_url = url
def calc_priority(self):
priority = 0
priority += max(0, 10 - self.hops_from_seed)

View File

@ -28,7 +28,7 @@ class BrozzlerWorker:
"outtmpl": "/dev/null",
"verbose": False,
"retries": 1,
"logger": logging,
"logger": self.logger,
"nocheckcertificate": True,
"hls_prefer_native": True,
"noprogress": True,
@ -56,18 +56,18 @@ class BrozzlerWorker:
def _completed_url(self, site, crawl_url):
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id))
logging.info("putting {} on queue {}".format(crawl_url, q.queue.name))
self.logger.info("putting {} on queue {}".format(crawl_url, q.queue.name))
q.put(crawl_url.to_dict())
def _disclaim_site(self, site, crawl_url=None):
# XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed"
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id))
logging.info("putting {} on queue {}".format(site, q.queue.name))
self.logger.info("putting {} on queue {}".format(site, q.queue.name))
q.put(site.to_dict())
if crawl_url:
q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
logging.info("putting unfinished url {} on queue {}".format(crawl_url, q.queue.name))
self.logger.info("putting unfinished url {} on queue {}".format(crawl_url, q.queue.name))
q.put(crawl_url.to_dict())
def _putmeta(self, warcprox_address, url, content_type, payload):
@ -82,17 +82,17 @@ class BrozzlerWorker:
try:
with urllib.request.urlopen(request) as response:
if response.status != 204:
logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(response.status, response.reason))
self.logger.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(response.status, response.reason))
except urllib.error.HTTPError as e:
logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(e.getcode(), e.info()))
self.logger.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(e.getcode(), e.info()))
def _try_youtube_dl(self, ydl, site, crawl_url):
try:
logging.info("trying youtube-dl on {}".format(crawl_url))
self.logger.info("trying youtube-dl on {}".format(crawl_url))
info = ydl.extract_info(crawl_url.url)
if site.proxy and site.enable_warcprox_features:
info_json = json.dumps(info, sort_keys=True, indent=4)
logging.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(crawl_url))
self.logger.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(crawl_url))
self._putmeta(warcprox_address, site.proxy, url=crawl_url.url,
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"))
@ -102,11 +102,19 @@ class BrozzlerWorker:
else:
raise
def _on_screenshot(self, site, crawl_url, screenshot_png):
if site.proxy and site.enable_warcprox_features:
logging.info("sending PUTMETA request to warcprox with screenshot for {}".format(crawl_url))
self._putmeta(warcprox_address=site.proxy, url=crawl_url.url,
content_type="image/png", payload=screenshot_png)
def _brozzle_page(self, browser, ydl, site, crawl_url):
def on_screenshot(screenshot_png):
if site.proxy and site.enable_warcprox_features:
self.logger.info("sending PUTMETA request to warcprox with screenshot for {}".format(crawl_url))
self._putmeta(warcprox_address=site.proxy, url=crawl_url.url,
content_type="image/png", payload=screenshot_png)
self.logger.info("brozzling {}".format(crawl_url))
self._try_youtube_dl(ydl, site, crawl_url)
crawl_url.outlinks = browser.browse_page(crawl_url.url,
on_screenshot=on_screenshot,
on_url_change=crawl_url.note_redirect)
def _brozzle_site(self, browser, ydl, site):
start = time.time()
@ -116,19 +124,16 @@ class BrozzlerWorker:
while not self._shutdown_requested.is_set() and time.time() - start < 60:
try:
crawl_url = self._next_url(site)
logging.info("crawling {}".format(crawl_url))
self._try_youtube_dl(ydl, site, crawl_url)
crawl_url.outlinks = browser.browse_page(crawl_url.url,
on_screenshot=lambda screenshot_png: self._on_screenshot(site, crawl_url, screenshot_png))
self._brozzle_page(browser, ydl, site, crawl_url)
self._completed_url(site, crawl_url)
crawl_url = None
except kombu.simple.Empty:
# if some timeout reached, re-raise?
pass
# except kombu.simple.Empty:
# logging.info("finished {} (queue is empty)".format(site))
# self.logger.info("finished {} (queue is empty)".format(site))
except brozzler.browser.BrowsingAborted:
logging.info("{} shut down".format(browser))
self.logger.info("{} shut down".format(browser))
finally:
browser.stop()
self._disclaim_site(site, crawl_url)
@ -147,7 +152,7 @@ class BrozzlerWorker:
msg = q.get(block=True, timeout=0.5)
site = brozzler.Site(**msg.payload)
msg.ack() # XXX ack only after browsing finished? kinda complicated
logging.info("browsing site {}".format(site))
self.logger.info("browsing site {}".format(site))
ydl = self._youtube_dl(site)
th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),
name="BrowsingThread-{}".format(site.scope_surt))
@ -156,14 +161,14 @@ class BrozzlerWorker:
q_empty = True
except KeyError:
if latest_state != "browsers-busy":
logging.info("all {} browsers are busy".format(self._max_browsers))
self.logger.info("all {} browsers are busy".format(self._max_browsers))
latest_state = "browsers-busy"
else:
q_empty = True
if q_empty:
if latest_state != "no-unclaimed-sites":
logging.info("no unclaimed sites to browse")
self.logger.info("no unclaimed sites to browse")
latest_state = "no-unclaimed-sites"
time.sleep(0.5)
@ -172,7 +177,7 @@ class BrozzlerWorker:
th.start()
def shutdown_now(self):
logging.info("brozzler worker shutting down")
self.logger.info("brozzler worker shutting down")
self._shutdown_requested.set()
self._browser_pool.shutdown_now()