enforce time limits; move scope_and_schedule_outlinks into frontier.py; fix bugs around scoping on seed redirect

This commit is contained in:
Noah Levitt 2015-08-19 20:16:25 +00:00
parent ddce1cdc71
commit ad543e6134
3 changed files with 71 additions and 40 deletions

View File

@ -91,15 +91,33 @@ class RethinkDbFrontier:
def claim_site(self):
# XXX keep track of aggregate priority and prioritize sites accordingly?
with self._random_server_connection() as conn:
result = (r.db(self.db).table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
return brozzler.Site(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
while True:
with self._random_server_connection() as conn:
result = (r.db(self.db).table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
site = brozzler.Site(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
# XXX This is the only place we enforce time limit for now. Worker
# loop should probably check time limit. Maybe frontier needs a
# housekeeping thread to ensure that time limits get enforced in a
# timely fashion.
if not self._enforce_time_limit(site):
return site
def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0
and time.time() - site.start_time > site.time_limit):
self.logger.info("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s",
site.time_limit, site.start_time, time.time() - site.start_time, site)
site.status = "FINISHED_TIME_LIMIT"
self.update_site(site)
return True
else:
return False
def claim_page(self, site):
with self._random_server_connection() as conn:
@ -147,3 +165,26 @@ class RethinkDbFrontier:
page.claimed = False
self.update_page(page)
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
if outlinks:
for url in outlinks:
if site.is_in_scope(url, parent_page):
if brozzler.is_permitted_by_robots(site, url):
new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1)
existing_child_page = self.get_page(new_child_page)
if existing_child_page:
existing_child_page.priority += new_child_page.priority
self.update_page(existing_child_page)
counts["updated"] += 1
else:
self.new_page(new_child_page)
counts["added"] += 1
else:
counts["blocked"] += 1
else:
counts["rejected"] += 1
self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s",
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page)

View File

@ -5,6 +5,7 @@ import json
import logging
import brozzler
import hashlib
import time
__all__ = ["Site", "Page"]
@ -25,7 +26,8 @@ class Site(BaseDictable):
def __init__(self, seed, id=None, scope=None, proxy=None,
ignore_robots=False, time_limit=None, extra_headers=None,
enable_warcprox_features=False, reached_limit=None, status="ACTIVE",
claimed=False, last_disclaimed=0):
claimed=False, start_time=time.time(), last_disclaimed=0):
self.seed = seed
self.id = id
self.proxy = proxy
@ -36,11 +38,13 @@ class Site(BaseDictable):
self.reached_limit = reached_limit
self.status = status
self.claimed = bool(claimed)
self.last_disclaimed = last_disclaimed # time as seconds since epoch
# times as seconds since epoch
self.start_time = start_time
self.last_disclaimed = last_disclaimed
self.scope = scope or {}
if not "surt" in self.scope:
self.scope["surt"] = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(seed)).getURLString(surt=True, trailing_comma=True)
self.scope["surt"] = self._to_surt(seed)
def __repr__(self):
return """Site(id={},seed={},scope={},proxy={},enable_warcprox_features={},ignore_robots={},extra_headers={},reached_limit={})""".format(
@ -48,8 +52,16 @@ class Site(BaseDictable):
repr(self.proxy), self.enable_warcprox_features,
self.ignore_robots, self.extra_headers, self.reached_limit)
def _to_surt(self, url):
hurl = surt.handyurl.parse(url)
surt.GoogleURLCanonicalizer.canonicalize(hurl)
hurl.query = None
hurl.hash = None
# XXX chop off path after last slash??
return hurl.getURLString(surt=True, trailing_comma=True)
def note_seed_redirect(self, url):
new_scope_surt = surt.GoogleURLCanonicalizer.canonicalize(surt.handyurl.parse(url)).getURLString(surt=True, trailing_comma=True)
new_scope_surt = self._to_surt(url)
if not new_scope_surt.startswith(self.scope["surt"]):
self.logger.info("changing site scope surt from {} to {}".format(self.scope["surt"], new_scope_surt))
self.scope["surt"] = new_scope_surt
@ -78,7 +90,7 @@ class Site(BaseDictable):
surtt = surt.GoogleURLCanonicalizer.canonicalize(hurl).getURLString(surt=True, trailing_comma=True)
return surtt.startswith(self.scope["surt"])
except:
self.logger.warn("""problem parsing url "{}" """.format(url))
self.logger.warn("problem parsing url %s", repr(url))
return False
class Page(BaseDictable):

View File

@ -84,29 +84,6 @@ class BrozzlerWorker:
else:
raise
def _scope_and_schedule_outlinks(self, site, parent_page, outlinks):
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
if outlinks:
for url in outlinks:
if site.is_in_scope(url, parent_page):
if brozzler.is_permitted_by_robots(site, url):
new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1)
existing_child_page = self._frontier.get_page(new_child_page)
if existing_child_page:
existing_child_page.priority += new_child_page.priority
self._frontier.update_page(existing_child_page)
counts["updated"] += 1
else:
self._frontier.new_page(new_child_page)
counts["added"] += 1
else:
counts["blocked"] += 1
else:
counts["rejected"] += 1
self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s",
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page)
def brozzle_page(self, browser, ydl, site, page):
def on_screenshot(screenshot_png):
if site.proxy and site.enable_warcprox_features:
@ -126,17 +103,18 @@ class BrozzlerWorker:
outlinks = browser.browse_page(page.url,
extra_headers=site.extra_headers, on_screenshot=on_screenshot,
on_url_change=page.note_redirect)
self._scope_and_schedule_outlinks(site, page, outlinks)
return outlinks
def _brozzle_site(self, browser, ydl, site):
start = time.time()
page = None
try:
browser.start(proxy=site.proxy)
while not self._shutdown_requested.is_set() and time.time() - start < 300:
while not self._shutdown_requested.is_set() and time.time() - start < 60:
page = self._frontier.claim_page(site)
self.brozzle_page(browser, ydl, site, page)
outlinks = self.brozzle_page(browser, ydl, site, page)
self._frontier.completed_page(site, page)
self._frontier.scope_and_schedule_outlinks(site, page, outlinks)
page = None
except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site)