make frontier prioritize least recently brozzled site; move disclaim_site() and completed_page() into frontier.py

This commit is contained in:
Noah Levitt 2015-08-19 18:45:19 +00:00
parent b8506a2ab4
commit b7df0a1f37
5 changed files with 36 additions and 29 deletions

View File

@ -13,7 +13,7 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('seed', metavar='SEED', help='seed url') arg_parser.add_argument('seed', metavar='SEED', help='seed url')
arg_parser.add_argument("--db", dest="db", default="localhost", arg_parser.add_argument("--db", dest="db", default="localhost",
help="comma-separated list of RethinkDB server addresses, e.g. db0.example.com,db0.example.com:39015,db1.example.com") help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org")
arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site") arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site")
arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site") arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site")
arg_parser.add_argument("-H", "--extra-header", action="append", arg_parser.add_argument("-H", "--extra-header", action="append",

View File

@ -5,6 +5,7 @@ import brozzler
import rethinkdb import rethinkdb
r = rethinkdb r = rethinkdb
import random import random
import time
class UnexpectedDbResult(Exception): class UnexpectedDbResult(Exception):
pass pass
@ -41,6 +42,7 @@ class RethinkDbFrontier:
r.db_create(self.db).run(conn) r.db_create(self.db).run(conn)
# r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn) # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(conn) r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]).run(conn)
r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(conn) r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run(conn) r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run(conn)
self.logger.info("created database %s with tables 'sites' and 'pages'", self.db) self.logger.info("created database %s with tables 'sites' and 'pages'", self.db)
@ -70,13 +72,13 @@ class RethinkDbFrontier:
def update_site(self, site): def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site) self.logger.debug("updating 'sites' table entry %s", site)
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").get(site.id).update(site.to_dict()).run(conn) result = r.db(self.db).table("sites").get(site.id).replace(site.to_dict()).run(conn)
self._vet_result(result, replaced=1) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_page(self, page): def update_page(self, page):
self.logger.debug("updating 'pages' table entry %s", page) self.logger.debug("updating 'pages' table entry %s", page)
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").get(page.id).update(page.to_dict()).run(conn) result = r.db(self.db).table("pages").get(page.id).replace(page.to_dict()).run(conn)
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def new_page(self, page): def new_page(self, page):
@ -88,7 +90,9 @@ class RethinkDbFrontier:
def claim_site(self): def claim_site(self):
# XXX keep track of aggregate priority and prioritize sites accordingly? # XXX keep track of aggregate priority and prioritize sites accordingly?
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").filter({"claimed":False,"status":"ACTIVE"}).limit(1).update({"claimed":True},return_changes=True).run(conn) result = (r.db(self.db).table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn))
self._vet_result(result, replaced=[0,1]) self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
return brozzler.Site(**result["changes"][0]["new_val"]) return brozzler.Site(**result["changes"][0]["new_val"])
@ -119,3 +123,25 @@ class RethinkDbFrontier:
return brozzler.Page(**result) return brozzler.Page(**result)
else: else:
return None return None
def completed_page(self, site, page):
page.brozzle_count += 1
page.claimed = False
# XXX set priority?
self.update_page(page)
if page.redirect_url and page.hops_from_seed == 0:
site.note_seed_redirect(page.redirect_url)
self.update_site(site)
def disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site)
site.claimed = False
site.last_disclaimed = time.time()
if not page and not self.has_outstanding_pages(site):
self.logger.info("site FINISHED! %s", site)
site.status = "FINISHED"
self.update_site(site)
if page:
page.claimed = False
self.update_page(page)

View File

@ -12,7 +12,7 @@ def _robots_cache(site):
def get(self, url, *args, **kwargs): def get(self, url, *args, **kwargs):
res = super().get(url, *args, **kwargs) res = super().get(url, *args, **kwargs)
if res.status_code == 420 and 'warcprox-meta' in res.headers: if res.status_code == 420 and 'warcprox-meta' in res.headers:
raise ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text) raise brozzler.ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text)
else: else:
return res return res

View File

@ -23,7 +23,7 @@ class Site(BaseDictable):
def __init__(self, seed, id=None, scope=None, proxy=None, def __init__(self, seed, id=None, scope=None, proxy=None,
ignore_robots=False, time_limit=None, extra_headers=None, ignore_robots=False, time_limit=None, extra_headers=None,
enable_warcprox_features=False, reached_limit=None, status="ACTIVE", enable_warcprox_features=False, reached_limit=None, status="ACTIVE",
claimed=False): claimed=False, last_disclaimed=0):
self.seed = seed self.seed = seed
self.id = id self.id = id
self.proxy = proxy self.proxy = proxy
@ -34,6 +34,7 @@ class Site(BaseDictable):
self.reached_limit = reached_limit self.reached_limit = reached_limit
self.status = status self.status = status
self.claimed = bool(claimed) self.claimed = bool(claimed)
self.last_disclaimed = last_disclaimed # time as seconds since epoch
self.scope = scope or {} self.scope = scope or {}
if not "surt" in self.scope: if not "surt" in self.scope:

View File

@ -42,26 +42,6 @@ class BrozzlerWorker:
## os.environ["http_proxy"] = "http://{}".format(site.proxy) ## os.environ["http_proxy"] = "http://{}".format(site.proxy)
return youtube_dl.YoutubeDL(ydl_opts) return youtube_dl.YoutubeDL(ydl_opts)
def _completed_page(self, site, page):
page.brozzle_count += 1
page.claimed = False
# XXX set priority?
self._frontier.update_page(page)
if page.redirect_url and page.hops_from_seed == 0:
site.note_seed_redirect(page.redirect_url)
self._frontier.update_site(site)
def _disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site)
site.claimed = False
if not page and not self._frontier.has_outstanding_pages(site):
self.logger.info("site FINISHED! %s", site)
site.status = "FINISHED"
self._frontier.update_site(site)
if page:
page.claimed = False
self._frontier.update_page(page)
def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None): def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None):
headers = {"Content-Type":content_type} headers = {"Content-Type":content_type}
if extra_headers: if extra_headers:
@ -154,7 +134,7 @@ class BrozzlerWorker:
while not self._shutdown_requested.is_set() and time.time() - start < 300: while not self._shutdown_requested.is_set() and time.time() - start < 300:
page = self._frontier.claim_page(site) page = self._frontier.claim_page(site)
self.brozzle_page(browser, ydl, site, page) self.brozzle_page(browser, ydl, site, page)
self._completed_page(site, page) self._frontier.completed_page(site, page)
page = None page = None
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site) self.logger.info("no pages left for site %s", site)
@ -167,7 +147,7 @@ class BrozzlerWorker:
finally: finally:
self.logger.info("finished session brozzling site, stopping browser and disclaiming site") self.logger.info("finished session brozzling site, stopping browser and disclaiming site")
browser.stop() browser.stop()
self._disclaim_site(site, page) self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser) self._browser_pool.release(browser)
def run(self): def run(self):