From ad543e6134943becc91d7ecc6ad32a3a175177ec Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 19 Aug 2015 20:16:25 +0000 Subject: [PATCH] enforce time limits; move scope_and_schedule_outlinks into frontier.py; fix bugs around scoping on seed redirect --- brozzler/frontier.py | 59 +++++++++++++++++++++++++++++++++++++------- brozzler/site.py | 22 +++++++++++++---- brozzler/worker.py | 30 +++------------------- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 8128b5f..10b572e 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -91,15 +91,33 @@ class RethinkDbFrontier: def claim_site(self): # XXX keep track of aggregate priority and prioritize sites accordingly? - with self._random_server_connection() as conn: - result = (r.db(self.db).table("sites") - .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") - .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn)) - self._vet_result(result, replaced=[0,1]) - if result["replaced"] == 1: - return brozzler.Site(**result["changes"][0]["new_val"]) - else: - raise brozzler.NothingToClaim + while True: + with self._random_server_connection() as conn: + result = (r.db(self.db).table("sites") + .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") + .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn)) + self._vet_result(result, replaced=[0,1]) + if result["replaced"] == 1: + site = brozzler.Site(**result["changes"][0]["new_val"]) + else: + raise brozzler.NothingToClaim + # XXX This is the only place we enforce time limit for now. Worker + # loop should probably check time limit. Maybe frontier needs a + # housekeeping thread to ensure that time limits get enforced in a + # timely fashion. + if not self._enforce_time_limit(site): + return site + + def _enforce_time_limit(self, site): + if (site.time_limit and site.time_limit > 0 + and time.time() - site.start_time > site.time_limit): + self.logger.info("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s", + site.time_limit, site.start_time, time.time() - site.start_time, site) + site.status = "FINISHED_TIME_LIMIT" + self.update_site(site) + return True + else: + return False def claim_page(self, site): with self._random_server_connection() as conn: @@ -147,3 +165,26 @@ class RethinkDbFrontier: page.claimed = False self.update_page(page) + def scope_and_schedule_outlinks(self, site, parent_page, outlinks): + counts = {"added":0,"updated":0,"rejected":0,"blocked":0} + if outlinks: + for url in outlinks: + if site.is_in_scope(url, parent_page): + if brozzler.is_permitted_by_robots(site, url): + new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1) + existing_child_page = self.get_page(new_child_page) + if existing_child_page: + existing_child_page.priority += new_child_page.priority + self.update_page(existing_child_page) + counts["updated"] += 1 + else: + self.new_page(new_child_page) + counts["added"] += 1 + else: + counts["blocked"] += 1 + else: + counts["rejected"] += 1 + + self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s", + counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page) + diff --git a/brozzler/site.py b/brozzler/site.py index dbcf45e..b76bc5a 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -5,6 +5,7 @@ import json import logging import brozzler import hashlib +import time __all__ = ["Site", "Page"] @@ -25,7 +26,8 @@ class Site(BaseDictable): def __init__(self, seed, id=None, scope=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, enable_warcprox_features=False, reached_limit=None, status="ACTIVE", - claimed=False, last_disclaimed=0): + claimed=False, start_time=time.time(), last_disclaimed=0): + self.seed = seed self.id = id self.proxy = proxy @@ -36,11 +38,13 @@ class Site(BaseDictable): self.reached_limit = reached_limit self.status = status self.claimed = bool(claimed) - self.last_disclaimed = last_disclaimed # time as seconds since epoch + # times as seconds since epoch + self.start_time = start_time + self.last_disclaimed = last_disclaimed self.scope = scope or {} if not "surt" in self.scope: - self.scope["surt"] = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(seed)).getURLString(surt=True, trailing_comma=True) + self.scope["surt"] = self._to_surt(seed) def __repr__(self): return """Site(id={},seed={},scope={},proxy={},enable_warcprox_features={},ignore_robots={},extra_headers={},reached_limit={})""".format( @@ -48,8 +52,16 @@ class Site(BaseDictable): repr(self.proxy), self.enable_warcprox_features, self.ignore_robots, self.extra_headers, self.reached_limit) + def _to_surt(self, url): + hurl = surt.handyurl.parse(url) + surt.GoogleURLCanonicalizer.canonicalize(hurl) + hurl.query = None + hurl.hash = None + # XXX chop off path after last slash?? + return hurl.getURLString(surt=True, trailing_comma=True) + def note_seed_redirect(self, url): - new_scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(url)).getURLString(surt=True, trailing_comma=True) + new_scope_surt = self._to_surt(url) if not new_scope_surt.startswith(self.scope["surt"]): self.logger.info("changing site scope surt from {} to {}".format(self.scope["surt"], new_scope_surt)) self.scope["surt"] = new_scope_surt @@ -78,7 +90,7 @@ class Site(BaseDictable): surtt = surt.GoogleURLCanonicalizer.canonicalize(hurl).getURLString(surt=True, trailing_comma=True) return surtt.startswith(self.scope["surt"]) except: - self.logger.warn("""problem parsing url "{}" """.format(url)) + self.logger.warn("problem parsing url %s", repr(url)) return False class Page(BaseDictable): diff --git a/brozzler/worker.py b/brozzler/worker.py index 06a6ac9..0123bf6 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -84,29 +84,6 @@ class BrozzlerWorker: else: raise - def _scope_and_schedule_outlinks(self, site, parent_page, outlinks): - counts = {"added":0,"updated":0,"rejected":0,"blocked":0} - if outlinks: - for url in outlinks: - if site.is_in_scope(url, parent_page): - if brozzler.is_permitted_by_robots(site, url): - new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1) - existing_child_page = self._frontier.get_page(new_child_page) - if existing_child_page: - existing_child_page.priority += new_child_page.priority - self._frontier.update_page(existing_child_page) - counts["updated"] += 1 - else: - self._frontier.new_page(new_child_page) - counts["added"] += 1 - else: - counts["blocked"] += 1 - else: - counts["rejected"] += 1 - - self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s", - counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page) - def brozzle_page(self, browser, ydl, site, page): def on_screenshot(screenshot_png): if site.proxy and site.enable_warcprox_features: @@ -126,17 +103,18 @@ class BrozzlerWorker: outlinks = browser.browse_page(page.url, extra_headers=site.extra_headers, on_screenshot=on_screenshot, on_url_change=page.note_redirect) - self._scope_and_schedule_outlinks(site, page, outlinks) + return outlinks def _brozzle_site(self, browser, ydl, site): start = time.time() page = None try: browser.start(proxy=site.proxy) - while not self._shutdown_requested.is_set() and time.time() - start < 300: + while not self._shutdown_requested.is_set() and time.time() - start < 60: page = self._frontier.claim_page(site) - self.brozzle_page(browser, ydl, site, page) + outlinks = self.brozzle_page(browser, ydl, site, page) self._frontier.completed_page(site, page) + self._frontier.scope_and_schedule_outlinks(site, page, outlinks) page = None except brozzler.NothingToClaim: self.logger.info("no pages left for site %s", site)