From b7df0a1f3716c87e42c196bfb56fb5cfc6c16f97 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 19 Aug 2015 18:45:19 +0000 Subject: [PATCH] make frontier prioritize least recently brozzled site; move disclaim_site() and completed_page() into frontier.py --- bin/brozzler-new-site | 2 +- brozzler/frontier.py | 34 ++++++++++++++++++++++++++++++---- brozzler/robots.py | 2 +- brozzler/site.py | 3 ++- brozzler/worker.py | 24 ++---------------------- 5 files changed, 36 insertions(+), 29 deletions(-) diff --git a/bin/brozzler-new-site b/bin/brozzler-new-site index ca97b9a..3f3f0c9 100755 --- a/bin/brozzler-new-site +++ b/bin/brozzler-new-site @@ -13,7 +13,7 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('seed', metavar='SEED', help='seed url') arg_parser.add_argument("--db", dest="db", default="localhost", - help="comma-separated list of RethinkDB server addresses, e.g. db0.example.com,db0.example.com:39015,db1.example.com") + help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site") arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site") arg_parser.add_argument("-H", "--extra-header", action="append", diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 100784a..137687a 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -5,6 +5,7 @@ import brozzler import rethinkdb r = rethinkdb import random +import time class UnexpectedDbResult(Exception): pass @@ -41,6 +42,7 @@ class RethinkDbFrontier: r.db_create(self.db).run(conn) # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn) r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(conn) + r.db(self.db).table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]).run(conn) r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(conn) r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run(conn) self.logger.info("created database %s with tables 'sites' and 'pages'", self.db) @@ -70,13 +72,13 @@ class RethinkDbFrontier: def update_site(self, site): self.logger.debug("updating 'sites' table entry %s", site) with self._random_server_connection() as conn: - result = r.db(self.db).table("sites").get(site.id).update(site.to_dict()).run(conn) - self._vet_result(result, replaced=1) + result = r.db(self.db).table("sites").get(site.id).replace(site.to_dict()).run(conn) + self._vet_result(result, replaced=[0,1], unchanged=[0,1]) def update_page(self, page): self.logger.debug("updating 'pages' table entry %s", page) with self._random_server_connection() as conn: - result = r.db(self.db).table("pages").get(page.id).update(page.to_dict()).run(conn) + result = r.db(self.db).table("pages").get(page.id).replace(page.to_dict()).run(conn) self._vet_result(result, replaced=[0,1], unchanged=[0,1]) def new_page(self, page): @@ -88,7 +90,9 @@ class RethinkDbFrontier: def claim_site(self): # XXX keep track of aggregate priority and prioritize sites accordingly? with self._random_server_connection() as conn: - result = r.db(self.db).table("sites").filter({"claimed":False,"status":"ACTIVE"}).limit(1).update({"claimed":True},return_changes=True).run(conn) + result = (r.db(self.db).table("sites") + .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") + .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn)) self._vet_result(result, replaced=[0,1]) if result["replaced"] == 1: return brozzler.Site(**result["changes"][0]["new_val"]) @@ -119,3 +123,25 @@ class RethinkDbFrontier: return brozzler.Page(**result) else: return None + + def completed_page(self, site, page): + page.brozzle_count += 1 + page.claimed = False + # XXX set priority? + self.update_page(page) + if page.redirect_url and page.hops_from_seed == 0: + site.note_seed_redirect(page.redirect_url) + self.update_site(site) + + def disclaim_site(self, site, page=None): + self.logger.info("disclaiming %s", site) + site.claimed = False + site.last_disclaimed = time.time() + if not page and not self.has_outstanding_pages(site): + self.logger.info("site FINISHED! %s", site) + site.status = "FINISHED" + self.update_site(site) + if page: + page.claimed = False + self.update_page(page) + diff --git a/brozzler/robots.py b/brozzler/robots.py index 84daab7..1296400 100644 --- a/brozzler/robots.py +++ b/brozzler/robots.py @@ -12,7 +12,7 @@ def _robots_cache(site): def get(self, url, *args, **kwargs): res = super().get(url, *args, **kwargs) if res.status_code == 420 and 'warcprox-meta' in res.headers: - raise ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text) + raise brozzler.ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text) else: return res diff --git a/brozzler/site.py b/brozzler/site.py index 9f062dc..6246431 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -23,7 +23,7 @@ class Site(BaseDictable): def __init__(self, seed, id=None, scope=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, enable_warcprox_features=False, reached_limit=None, status="ACTIVE", - claimed=False): + claimed=False, last_disclaimed=0): self.seed = seed self.id = id self.proxy = proxy @@ -34,6 +34,7 @@ class Site(BaseDictable): self.reached_limit = reached_limit self.status = status self.claimed = bool(claimed) + self.last_disclaimed = last_disclaimed # time as seconds since epoch self.scope = scope or {} if not "surt" in self.scope: diff --git a/brozzler/worker.py b/brozzler/worker.py index 9c0c39e..55fbd9b 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -42,26 +42,6 @@ class BrozzlerWorker: ## os.environ["http_proxy"] = "http://{}".format(site.proxy) return youtube_dl.YoutubeDL(ydl_opts) - def _completed_page(self, site, page): - page.brozzle_count += 1 - page.claimed = False - # XXX set priority? - self._frontier.update_page(page) - if page.redirect_url and page.hops_from_seed == 0: - site.note_seed_redirect(page.redirect_url) - self._frontier.update_site(site) - - def _disclaim_site(self, site, page=None): - self.logger.info("disclaiming %s", site) - site.claimed = False - if not page and not self._frontier.has_outstanding_pages(site): - self.logger.info("site FINISHED! %s", site) - site.status = "FINISHED" - self._frontier.update_site(site) - if page: - page.claimed = False - self._frontier.update_page(page) - def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None): headers = {"Content-Type":content_type} if extra_headers: @@ -154,7 +134,7 @@ class BrozzlerWorker: while not self._shutdown_requested.is_set() and time.time() - start < 300: page = self._frontier.claim_page(site) self.brozzle_page(browser, ydl, site, page) - self._completed_page(site, page) + self._frontier.completed_page(site, page) page = None except brozzler.NothingToClaim: self.logger.info("no pages left for site %s", site) @@ -167,7 +147,7 @@ class BrozzlerWorker: finally: self.logger.info("finished session brozzling site, stopping browser and disclaiming site") browser.stop() - self._disclaim_site(site, page) + self._frontier.disclaim_site(site, page) self._browser_pool.release(browser) def run(self):