use new rethinkstuff ORM

This commit is contained in:
Noah Levitt 2017-02-28 16:12:50 -08:00
parent a1f1681cad
commit 700b08b7d7
8 changed files with 134 additions and 247 deletions

View file

@ -50,20 +50,6 @@ class ReachedLimit(Exception):
def __str__(self): def __str__(self):
return self.__repr__() return self.__repr__()
class BaseDictable:
def to_dict(self):
d = dict(vars(self))
for k in vars(self):
if k.startswith("_") or d[k] is None:
del d[k]
return d
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))
def __repr__(self):
return "{}(**{})".format(self.__class__.__name__, self.to_dict())
def fixup(url, hash_strip=False): def fixup(url, hash_strip=False):
''' '''
Does rudimentary canonicalization, such as converting IDN to punycode. Does rudimentary canonicalization, such as converting IDN to punycode.
@ -81,8 +67,6 @@ def fixup(url, hash_strip=False):
TRACE = 5 TRACE = 5
import logging as _logging import logging as _logging
def _logging_trace(msg, *args, **kwargs): def _logging_trace(msg, *args, **kwargs):
if len(_logging.root.handlers) == 0:
basicConfig()
_logging.root.trace(msg, *args, **kwargs) _logging.root.trace(msg, *args, **kwargs)
def _logger_trace(self, msg, *args, **kwargs): def _logger_trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE): if self.isEnabledFor(TRACE):

View file

@ -97,7 +97,6 @@ class BrowserPool:
with self._lock: with self._lock:
for browser in self._in_use: for browser in self._in_use:
browser.stop() browser.stop()
self._in_use.clear()
def num_available(self): def num_available(self):
return self.size - len(self._in_use) return self.size - len(self._in_use)

View file

@ -174,12 +174,12 @@ def brozzle_page():
behavior_parameters = {} behavior_parameters = {}
if args.behavior_parameters: if args.behavior_parameters:
behavior_parameters = json.loads(args.behavior_parameters) behavior_parameters = json.loads(args.behavior_parameters)
site = brozzler.Site( site = brozzler.Site(None, {
id=-1, seed=args.url, proxy=args.proxy, 'id': -1, 'seed': args.url, 'proxy': args.proxy,
enable_warcprox_features=args.enable_warcprox_features, 'enable_warcprox_features': args.enable_warcprox_features,
behavior_parameters=behavior_parameters, username=args.username, 'behavior_parameters': behavior_parameters,
password=args.password) 'username': args.username, 'password': args.password})
page = brozzler.Page(url=args.url, site_id=site.id) page = brozzler.Page(None, {'url': args.url, 'site_id': site.id})
worker = brozzler.BrozzlerWorker(frontier=None) worker = brozzler.BrozzlerWorker(frontier=None)
def on_screenshot(screenshot_png): def on_screenshot(screenshot_png):
@ -273,24 +273,26 @@ def brozzler_new_site():
args = arg_parser.parse_args(args=sys.argv[1:]) args = arg_parser.parse_args(args=sys.argv[1:])
configure_logging(args) configure_logging(args)
site = brozzler.Site(
seed=args.seed, proxy=args.proxy,
time_limit=int(args.time_limit) if args.time_limit else None,
ignore_robots=args.ignore_robots,
enable_warcprox_features=args.enable_warcprox_features,
warcprox_meta=json.loads(
args.warcprox_meta) if args.warcprox_meta else None,
behavior_parameters=json.loads(
args.behavior_parameters) if args.behavior_parameters else None,
username=args.username, password=args.password)
r = rethinker(args) r = rethinker(args)
site = brozzler.Site(r, {
'seed': args.seed,
'proxy': args.proxy,
'time_limit': int(args.time_limit) if args.time_limit else None,
'ignore_robots': args.ignore_robots,
'enable_warcprox_features': args.enable_warcprox_features,
'warcprox_meta': json.loads(
args.warcprox_meta) if args.warcprox_meta else None,
'behavior_parameters': json.loads(
args.behavior_parameters) if args.behavior_parameters else None,
'username': args.username,
'password': args.password})
frontier = brozzler.RethinkDbFrontier(r) frontier = brozzler.RethinkDbFrontier(r)
brozzler.new_site(frontier, site) brozzler.new_site(frontier, site)
def brozzler_worker(): def brozzler_worker():
''' '''
Main entrypoint for brozzler, gets sites and pages to brozzle from Main entry point for brozzler, gets sites and pages to brozzle from
rethinkdb, brozzles them. rethinkdb, brozzles them.
''' '''
arg_parser = argparse.ArgumentParser( arg_parser = argparse.ArgumentParser(

View file

@ -92,43 +92,6 @@ class RethinkDbFrontier:
if result.get(k) != expected: if result.get(k) != expected:
raise UnexpectedDbResult("expected {} to be {} in {}".format(repr(k), expected, result)) raise UnexpectedDbResult("expected {} to be {} in {}".format(repr(k), expected, result))
def new_job(self, job):
self.logger.info("inserting into 'jobs' table %s", repr(job))
result = self.r.table("jobs").insert(job.to_dict()).run()
self._vet_result(result, inserted=1)
if not job.id:
# only if "id" has not already been set
job.id = result["generated_keys"][0]
return job
def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site)
result = self.r.table("sites").insert(site.to_dict()).run()
self._vet_result(result, inserted=1)
if not site.id:
# only if "id" has not already been set
site.id = result["generated_keys"][0]
def update_job(self, job):
self.logger.debug("updating 'jobs' table entry %s", job)
result = self.r.table("jobs").get(job.id).replace(job.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site)
result = self.r.table("sites").get(site.id).replace(site.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_page(self, page):
self.logger.trace("updating 'pages' table entry %s", page)
result = self.r.table("pages").get(page.id).replace(page.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def new_page(self, page):
self.logger.trace("inserting into 'pages' table %s", page)
result = self.r.table("pages").insert(page.to_dict()).run()
self._vet_result(result, inserted=1)
def claim_site(self, worker_id): def claim_site(self, worker_id):
# XXX keep track of aggregate priority and prioritize sites accordingly? # XXX keep track of aggregate priority and prioritize sites accordingly?
while True: while True:
@ -165,7 +128,7 @@ class RethinkDbFrontier:
"at %s, and presumably some error stopped it from " "at %s, and presumably some error stopped it from "
"being disclaimed", "being disclaimed",
result["changes"][0]["old_val"]["last_claimed"]) result["changes"][0]["old_val"]["last_claimed"])
site = brozzler.Site(**result["changes"][0]["new_val"]) site = brozzler.Site(self.r, result["changes"][0]["new_val"])
else: else:
raise brozzler.NothingToClaim raise brozzler.NothingToClaim
# XXX This is the only place we enforce time limit for now. Worker # XXX This is the only place we enforce time limit for now. Worker
@ -204,7 +167,7 @@ class RethinkDbFrontier:
if result["unchanged"] == 0 and result["replaced"] == 0: if result["unchanged"] == 0 and result["replaced"] == 0:
raise brozzler.NothingToClaim raise brozzler.NothingToClaim
else: else:
return brozzler.Page(**result["changes"][0]["new_val"]) return brozzler.Page(self.r, result["changes"][0]["new_val"])
def has_outstanding_pages(self, site): def has_outstanding_pages(self, site):
results_iter = self.r.table("pages").between( results_iter = self.r.table("pages").between(
@ -213,55 +176,30 @@ class RethinkDbFrontier:
index="priority_by_site").limit(1).run() index="priority_by_site").limit(1).run()
return len(list(results_iter)) > 0 return len(list(results_iter)) > 0
def page(self, id):
result = self.r.table("pages").get(id).run()
if result:
return brozzler.Page(**result)
else:
return None
def completed_page(self, site, page): def completed_page(self, site, page):
page.brozzle_count += 1 page.brozzle_count += 1
page.claimed = False page.claimed = False
# XXX set priority? # XXX set priority?
self.update_page(page) page.save()
if page.redirect_url and page.hops_from_seed == 0: if page.redirect_url and page.hops_from_seed == 0:
site.note_seed_redirect(page.redirect_url) site.note_seed_redirect(page.redirect_url)
self.update_site(site) site.save()
def active_jobs(self): def active_jobs(self):
results = self.r.table("jobs").filter({"status":"ACTIVE"}).run() results = self.r.table("jobs").filter({"status":"ACTIVE"}).run()
for result in results: for result in results:
yield brozzler.Job(**result) yield brozzler.Job(self.r, result)
def job(self, id):
if id is None:
return None
result = self.r.table("jobs").get(id).run()
if result:
return brozzler.Job(**result)
else:
return None
def site(self, id):
if id is None:
return None
result = self.r.table("sites").get(id).run()
if result:
return brozzler.Site(**result)
else:
return None
def honor_stop_request(self, job_id): def honor_stop_request(self, job_id):
"""Raises brozzler.CrawlJobStopped if stop has been requested.""" """Raises brozzler.CrawlJobStopped if stop has been requested."""
job = self.job(job_id) job = brozzler.Job.load(self.r, job_id)
if job and job.stop_requested: if job and job.get('stop_requested'):
self.logger.info("stop requested for job %s", job_id) self.logger.info("stop requested for job %s", job_id)
raise brozzler.CrawlJobStopped raise brozzler.CrawlJobStopped
def _maybe_finish_job(self, job_id): def _maybe_finish_job(self, job_id):
"""Returns True if job is finished.""" """Returns True if job is finished."""
job = self.job(job_id) job = brozzler.Job.load(self.r, job_id)
if not job: if not job:
return False return False
if job.status.startswith("FINISH"): if job.status.startswith("FINISH"):
@ -271,7 +209,7 @@ class RethinkDbFrontier:
results = self.r.table("sites").get_all(job_id, index="job_id").run() results = self.r.table("sites").get_all(job_id, index="job_id").run()
n = 0 n = 0
for result in results: for result in results:
site = brozzler.Site(**result) site = brozzler.Site(self.r, result)
if not site.status.startswith("FINISH"): if not site.status.startswith("FINISH"):
results.close() results.close()
return False return False
@ -279,7 +217,7 @@ class RethinkDbFrontier:
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
job.finish() job.finish()
self.update_job(job) job.save()
return True return True
def finished(self, site, status): def finished(self, site, status):
@ -288,7 +226,7 @@ class RethinkDbFrontier:
site.claimed = False site.claimed = False
site.last_disclaimed = rethinkstuff.utcnow() site.last_disclaimed = rethinkstuff.utcnow()
site.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow() site.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
self.update_site(site) site.save()
if site.job_id: if site.job_id:
self._maybe_finish_job(site.job_id) self._maybe_finish_job(site.job_id)
@ -299,34 +237,34 @@ class RethinkDbFrontier:
if not page and not self.has_outstanding_pages(site): if not page and not self.has_outstanding_pages(site):
self.finished(site, "FINISHED") self.finished(site, "FINISHED")
else: else:
self.update_site(site) site.save()
if page: if page:
page.claimed = False page.claimed = False
self.update_page(page) page.save()
def resume_job(self, job): def resume_job(self, job):
job.status = "ACTIVE" job.status = "ACTIVE"
job.starts_and_stops.append( job.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None}) {"start":rethinkstuff.utcnow(), "stop":None})
self.update_job(job) job.save()
for site in self.job_sites(job.id): for site in self.job_sites(job.id):
site.status = "ACTIVE" site.status = "ACTIVE"
site.starts_and_stops.append( site.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None}) {"start":rethinkstuff.utcnow(), "stop":None})
self.update_site(site) site.save()
def resume_site(self, site): def resume_site(self, site):
if site.job_id: if site.job_id:
# can't call resume_job since that would resume jobs's other sites # can't call resume_job since that would resume jobs's other sites
job = self.job(site.job_id) job = brozzler.job.load(self.r, site.job_id)
job.status = "ACTIVE" job.status = "ACTIVE"
job.starts_and_stops.append( job.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None}) {"start":rethinkstuff.utcnow(), "stop":None})
self.update_job(job) job.save()
site.status = "ACTIVE" site.status = "ACTIVE"
site.starts_and_stops.append( site.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None}) {"start":rethinkstuff.utcnow(), "stop":None})
self.update_site(site) site.save()
def scope_and_schedule_outlinks(self, site, parent_page, outlinks): def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
if site.remember_outlinks: if site.remember_outlinks:
@ -340,18 +278,18 @@ class RethinkDbFrontier:
hops_off_surt = parent_page.hops_off_surt + 1 hops_off_surt = parent_page.hops_off_surt + 1
else: else:
hops_off_surt = 0 hops_off_surt = 0
new_child_page = brozzler.Page( new_child_page = brozzler.Page(self.r, {
url, site_id=site.id, job_id=site.job_id, 'url': url, 'site_id': site.id, 'job_id': site.job_id,
hops_from_seed=parent_page.hops_from_seed+1, 'hops_from_seed': parent_page.hops_from_seed+1,
via_page_id=parent_page.id, 'via_page_id': parent_page.id,
hops_off_surt=hops_off_surt) 'hops_off_surt': hops_off_surt})
existing_child_page = self.page(new_child_page.id) existing_child_page = brozzler.Page.load(new_child_page.id)
if existing_child_page: if existing_child_page:
existing_child_page.priority += new_child_page.priority existing_child_page.priority += new_child_page.priority
self.update_page(existing_child_page) existing_child_page.save()
counts["updated"] += 1 counts["updated"] += 1
else: else:
self.new_page(new_child_page) new_child_page.save()
counts["added"] += 1 counts["added"] += 1
if site.remember_outlinks: if site.remember_outlinks:
parent_page.outlinks["accepted"].append(url) parent_page.outlinks["accepted"].append(url)
@ -365,7 +303,7 @@ class RethinkDbFrontier:
parent_page.outlinks["rejected"].append(url) parent_page.outlinks["rejected"].append(url)
if site.remember_outlinks: if site.remember_outlinks:
self.update_page(parent_page) parent_page.save()
self.logger.info( self.logger.info(
"%s new links added, %s existing links updated, %s links " "%s new links added, %s existing links updated, %s links "
@ -386,7 +324,7 @@ class RethinkDbFrontier:
def job_sites(self, job_id): def job_sites(self, job_id):
results = self.r.table('sites').get_all(job_id, index="job_id").run() results = self.r.table('sites').get_all(job_id, index="job_id").run()
for result in results: for result in results:
yield brozzler.Site(**result) yield brozzler.Site(self.r, result)
def seed_page(self, site_id): def seed_page(self, site_id):
results = self.r.table("pages").between( results = self.r.table("pages").between(
@ -399,7 +337,7 @@ class RethinkDbFrontier:
"more than one seed page for site_id %s ?", site_id) "more than one seed page for site_id %s ?", site_id)
if len(pages) < 1: if len(pages) < 1:
return None return None
return brozzler.Page(**pages[0]) return brozzler.Page(self.r, pages[0])
def site_pages(self, site_id, unbrozzled_only=False): def site_pages(self, site_id, unbrozzled_only=False):
results = self.r.table("pages").between( results = self.r.table("pages").between(
@ -409,5 +347,5 @@ class RethinkDbFrontier:
self.r.maxval, self.r.maxval], self.r.maxval, self.r.maxval],
index="priority_by_site").run() index="priority_by_site").run()
for result in results: for result in results:
yield brozzler.Page(**result) yield brozzler.Page(self.r, result)

View file

@ -70,31 +70,32 @@ def new_job_file(frontier, job_conf_file):
def new_job(frontier, job_conf): def new_job(frontier, job_conf):
'''Returns new Job.''' '''Returns new Job.'''
validate_conf(job_conf) validate_conf(job_conf)
job = Job( job = Job(frontier.r, {
id=job_conf.get("id"), conf=job_conf, status="ACTIVE", "conf": job_conf,
started=rethinkstuff.utcnow()) "status": "ACTIVE", "started": rethinkstuff.utcnow()})
if "id" in job_conf:
# insert the job now to make sure it has an id job.id = job_conf["id"]
job = frontier.new_job(job) job.save()
sites = [] sites = []
for seed_conf in job_conf["seeds"]: for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf) merged_conf = merge(seed_conf, job_conf)
site = brozzler.Site( site = brozzler.Site(frontier.r, {
job_id=job.id, seed=merged_conf["url"], "job_id": job.id,
scope=merged_conf.get("scope"), "seed": merged_conf["url"],
time_limit=merged_conf.get("time_limit"), "scope": merged_conf.get("scope"),
proxy=merged_conf.get("proxy"), "time_limit": merged_conf.get("time_limit"),
ignore_robots=merged_conf.get("ignore_robots"), "proxy": merged_conf.get("proxy"),
enable_warcprox_features=merged_conf.get( "ignore_robots": merged_conf.get("ignore_robots"),
"enable_warcprox_features": merged_conf.get(
"enable_warcprox_features"), "enable_warcprox_features"),
warcprox_meta=merged_conf.get("warcprox_meta"), "warcprox_meta": merged_conf.get("warcprox_meta"),
metadata=merged_conf.get("metadata"), "metadata": merged_conf.get("metadata"),
remember_outlinks=merged_conf.get("remember_outlinks"), "remember_outlinks": merged_conf.get("remember_outlinks"),
user_agent=merged_conf.get("user_agent"), "user_agent": merged_conf.get("user_agent"),
behavior_parameters=merged_conf.get("behavior_parameters"), "behavior_parameters": merged_conf.get("behavior_parameters"),
username=merged_conf.get("username"), "username": merged_conf.get("username"),
password=merged_conf.get("password")) "password": merged_conf.get("password")})
sites.append(site) sites.append(site)
for site in sites: for site in sites:
@ -110,31 +111,31 @@ def new_site(frontier, site):
# where a brozzler worker immediately claims the site, finds no pages # where a brozzler worker immediately claims the site, finds no pages
# to crawl, and decides the site is finished # to crawl, and decides the site is finished
try: try:
page = brozzler.Page( page = brozzler.Page(frontier.r, {
site.seed, site_id=site.id, job_id=site.job_id, "url": site.seed, "site_id": site.get("id"),
hops_from_seed=0, priority=1000, needs_robots_check=True) "job_id": site.get("job_id"), "hops_from_seed": 0,
frontier.new_page(page) "priority": 1000, "needs_robots_check": True})
page.save()
logging.info("queued page %s", page) logging.info("queued page %s", page)
finally: finally:
# finally block because we want to insert the Site no matter what # finally block because we want to insert the Site no matter what
frontier.new_site(site) site.save()
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
frontier.reached_limit(site, e) frontier.reached_limit(site, e)
class Job(brozzler.BaseDictable): class Job(rethinkstuff.Document):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
table = "jobs"
def __init__( def __init__(self, rethinker, d={}):
self, id=None, conf=None, status="ACTIVE", started=None, rethinkstuff.Document.__init__(self, rethinker, d)
finished=None, stop_requested=None, starts_and_stops=None): self.status = self.get("status", "ACTIVE")
self.id = id if not "starts_and_stops" in self:
self.conf = conf if self.get("started"): # backward compatibility
self.status = status self.starts_and_stops = [{
self.stop_requested = stop_requested "start": self.get("started"),
self.starts_and_stops = starts_and_stops "stop": self.get("finished")}]
if not self.starts_and_stops: del self["started"]
if started: # backward compatibility
self.starts_and_stops = [{"start":started,"stop":finished}]
else: else:
self.starts_and_stops = [ self.starts_and_stops = [
{"start":rethinkstuff.utcnow(),"stop":None}] {"start":rethinkstuff.utcnow(),"stop":None}]
@ -148,6 +149,3 @@ class Job(brozzler.BaseDictable):
self.status = "FINISHED" self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow() self.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
def __str__(self):
return 'Job(id=%s)' % self.id

View file

@ -88,53 +88,29 @@ class Url:
return host_parts[-len(domain_parts):] == domain_parts return host_parts[-len(domain_parts):] == domain_parts
class Site(brozzler.BaseDictable): class Site(rethinkstuff.Document):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
table = 'sites'
def __init__( def __init__(self, rethinker, d={}):
self, seed, id=None, job_id=None, scope=None, proxy=None, rethinkstuff.Document.__init__(self, rethinker, d)
ignore_robots=False, time_limit=None, warcprox_meta=None, if not self.get('status'):
enable_warcprox_features=False, reached_limit=None, self.status = 'ACTIVE'
status="ACTIVE", claimed=False, start_time=None, self.enable_warcprox_features = bool(self.get('enable_warcprox_features'))
last_disclaimed=_EPOCH_UTC, last_claimed_by=None, self.claimed = bool(self.get('claimed'))
last_claimed=_EPOCH_UTC, metadata={}, remember_outlinks=None, self.last_disclaimed = self.get('last_disclaimed', _EPOCH_UTC)
cookie_db=None, user_agent=None, behavior_parameters=None, self.last_claimed = self.get('last_claimed', _EPOCH_UTC)
username=None, password=None, starts_and_stops=None): if not self.get('starts_and_stops'):
if self.get('start_time'): # backward compatibility
self.seed = seed
self.id = id
self.job_id = job_id
self.proxy = proxy
self.ignore_robots = ignore_robots
self.enable_warcprox_features = bool(enable_warcprox_features)
self.warcprox_meta = warcprox_meta
self.time_limit = time_limit
self.reached_limit = reached_limit
self.status = status
self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by
self.last_disclaimed = last_disclaimed
self.last_claimed = last_claimed
self.metadata = metadata
self.remember_outlinks = remember_outlinks
self.cookie_db = cookie_db
self.user_agent = user_agent
self.behavior_parameters = behavior_parameters
self.username = username
self.password = password
self.starts_and_stops = starts_and_stops
if not self.starts_and_stops:
if start_time: # backward compatibility
self.starts_and_stops = [{"start":start_time,"stop":None}] self.starts_and_stops = [{"start":start_time,"stop":None}]
if self.status != "ACTIVE": if self.get('status') != "ACTIVE":
self.starts_and_stops[0]["stop"] = self.last_disclaimed self.starts_and_stops[0]["stop"] = self.last_disclaimed
else: else:
self.starts_and_stops = [ self.starts_and_stops = [
{"start":rethinkstuff.utcnow(),"stop":None}] {"start":rethinkstuff.utcnow(),"stop":None}]
self.scope = scope or {} def __str__(self):
if not "surt" in self.scope: return 'Site({"id":"%s","seed":"%s",...})' % (self.id, self.seed)
self.scope["surt"] = Url(seed).surt
def elapsed(self): def elapsed(self):
'''Returns elapsed crawl time as a float in seconds.''' '''Returns elapsed crawl time as a float in seconds.'''
@ -148,9 +124,6 @@ class Site(brozzler.BaseDictable):
dt += (rethinkstuff.utcnow() - ss['start']).total_seconds() dt += (rethinkstuff.utcnow() - ss['start']).total_seconds()
return dt return dt
def __str__(self):
return "Site-%s-%s" % (self.id, self.seed)
def note_seed_redirect(self, url): def note_seed_redirect(self, url):
new_scope_surt = Url(url).surt new_scope_surt = Url(url).surt
if not new_scope_surt.startswith(self.scope["surt"]): if not new_scope_surt.startswith(self.scope["surt"]):
@ -305,41 +278,26 @@ class Site(brozzler.BaseDictable):
return True return True
class Page(brozzler.BaseDictable): class Page(rethinkstuff.Document):
def __init__( logger = logging.getLogger(__module__ + "." + __qualname__)
self, url, id=None, site_id=None, job_id=None, hops_from_seed=0, table = 'pages'
redirect_url=None, priority=None, claimed=False, brozzle_count=0,
via_page_id=None, last_claimed_by=None, hops_off_surt=0, def __init__(self, rethinker, d={}):
outlinks=None, needs_robots_check=False, blocked_by_robots=None): rethinkstuff.Document.__init__(self, rethinker, d)
self.site_id = site_id self.hops_from_seed = self.get('hops_from_seed', 0)
self.job_id = job_id self.brozzle_count = self.get('brozzle_count', 0)
self.url = url self.claimed = self.get('claimed', False)
self.hops_from_seed = hops_from_seed self.hops_off_surt = self.get('hops_off_surt', 0)
self.redirect_url = redirect_url self.needs_robots_check = self.get('needs_robots_check', False)
self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by
self.brozzle_count = brozzle_count
self.via_page_id = via_page_id
self.hops_off_surt = hops_off_surt
self.outlinks = outlinks
self.needs_robots_check = needs_robots_check
self.blocked_by_robots = blocked_by_robots
self._canon_hurl = None self._canon_hurl = None
if priority is not None: self.priority = self.get('priority', self._calc_priority())
self.priority = priority if self.get('id') is None:
else: digest_this = "site_id:%s,url:%s" % (self.site_id, self.url)
self.priority = self._calc_priority()
if id is not None:
self.id = id
else:
digest_this = "site_id:{},url:{}".format(self.site_id, self.url)
self.id = hashlib.sha1(digest_this.encode("utf-8")).hexdigest() self.id = hashlib.sha1(digest_this.encode("utf-8")).hexdigest()
def __repr__(self): def __str__(self):
return """Page(url={},job_id={},site_id={},hops_from_seed={})""".format( return 'Page({"id":"%s","url":"%s",...})' % (self.id, self.url)
repr(self.url), self.job_id, self.site_id, self.hops_from_seed)
def note_redirect(self, url): def note_redirect(self, url):
self.redirect_url = url self.redirect_url = url

View file

@ -126,6 +126,10 @@ class BrozzlerWorker:
site.enable_warcprox_features site.enable_warcprox_features
or self._default_enable_warcprox_features): or self._default_enable_warcprox_features):
svc = self._service_registry.available_service('warcprox') svc = self._service_registry.available_service('warcprox')
if svc is None:
raise Exception(
'no available instances of warcprox in the service '
'registry')
site.proxy = '%s:%s' % (svc['host'], svc['port']) site.proxy = '%s:%s' % (svc['host'], svc['port'])
self._frontier.update_site(site) self._frontier.update_site(site)
self.logger.info( self.logger.info(
@ -284,12 +288,15 @@ class BrozzlerWorker:
if self._needs_browsing(page, ydl_spy): if self._needs_browsing(page, ydl_spy):
self.logger.info('needs browsing: %s', page) self.logger.info('needs browsing: %s', page)
if not browser.is_running(): if not browser.is_running():
browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db) browser.start(
proxy=self._proxy(site),
cookie_db=site.get('cookie_db'))
final_page_url, outlinks = browser.browse_page( final_page_url, outlinks = browser.browse_page(
page.url, extra_headers=site.extra_headers(), page.url, extra_headers=site.extra_headers(),
behavior_parameters=site.behavior_parameters, behavior_parameters=site.get('behavior_parameters'),
username=site.username, password=site.password, username=site.get('username'),
user_agent=site.user_agent, password=site.get('password'),
user_agent=site.get('user_agent'),
on_screenshot=_on_screenshot) on_screenshot=_on_screenshot)
if final_page_url != page.url: if final_page_url != page.url:
page.note_redirect(final_page_url) page.note_redirect(final_page_url)
@ -408,6 +415,7 @@ class BrozzlerWorker:
self._service_heartbeat() self._service_heartbeat()
def run(self): def run(self):
self.logger.info("brozzler worker starting")
try: try:
latest_state = None latest_state = None
while True: while True:

View file

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup( setuptools.setup(
name='brozzler', name='brozzler',
version='1.1b9.dev196', version='1.1b9.dev197',
description='Distributed web crawling with browsers', description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler', url='https://github.com/internetarchive/brozzler',
author='Noah Levitt', author='Noah Levitt',
@ -69,7 +69,7 @@ setuptools.setup(
'websocket-client!=0.39.0', 'websocket-client!=0.39.0',
'pillow==3.3.0', 'pillow==3.3.0',
'surt>=0.3.0', 'surt>=0.3.0',
'rethinkstuff>=0.1.5', 'rethinkstuff>=0.2.0.dev62',
'rethinkdb>=2.3,<2.4', 'rethinkdb>=2.3,<2.4',
'cerberus==1.0.1', 'cerberus==1.0.1',
'jinja2', 'jinja2',