From 610f9c8cf40962272f7749335eb883d2500e2aa9 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 11 Jul 2015 13:09:45 -0700 Subject: [PATCH] add missing file hq.py, improve some logging, fix little race condition bug --- bin/brozzler-hq | 9 ++++++++- bin/brozzler-worker | 6 ++++-- umbra/hq.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 umbra/hq.py diff --git a/bin/brozzler-hq b/bin/brozzler-hq index d83242a..d42f1f2 100644 --- a/bin/brozzler-hq +++ b/bin/brozzler-hq @@ -173,15 +173,22 @@ class BrozzlerHQ: completed_url = umbra.CrawlUrl(**msg.payload) msg.ack() - self.logger.info("adding outlinks from {} outlinks={}".format(completed_url, completed_url.outlinks)) + counts = {"added":0,"updated":0,"rejected":0} if completed_url.outlinks: for url in completed_url.outlinks: if site.is_in_scope(url): crawl_url = umbra.CrawlUrl(url, site_id=site.id, hops_from_seed=completed_url.hops_from_seed+1) try: self._db.update_crawl_url(crawl_url) + counts["updated"] += 1 except KeyError: self._db.schedule_url(crawl_url, priority=crawl_url.calc_priority()) + counts["added"] += 1 + else: + counts["rejected"] += 1 + + self.logger.info("{} new links added, {} existing links updated, {} links rejected from {}".format( + counts["added"], counts["updated"], counts["rejected"], completed_url)) except kombu.simple.Empty: pass diff --git a/bin/brozzler-worker b/bin/brozzler-worker index f9e59ee..33cc3dc 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -33,6 +33,7 @@ logging.basicConfig(stream=sys.stdout, level=args.log_level, browsers = set() browsers_lock = threading.Lock() +num_browsers = 0 shutdown_requested = threading.Event() @@ -64,7 +65,7 @@ def brozzle_site(site, chrome_port): crawl_url.outlinks = browser.browse_page(crawl_url.url) completed(site, crawl_url) except kombu.simple.Empty: - # if some timeout reached, raise + # if some timeout reached, re-raise? pass # except kombu.simple.Empty: # logging.info("finished {} (queue is empty)".format(site)) @@ -89,13 +90,14 @@ latest_state = None chrome_port = 9200 try: while True: - if len(browsers) < int(args.max_browsers): + if num_browsers < int(args.max_browsers): with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.unclaimed") try: msg = q.get(block=True, timeout=0.5) site = hq.Site(**msg.payload) logging.info("browsing site {}".format(site)) + num_browsers += 1 msg.ack() th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), name="BrowsingThread-{}".format(site.scope_surt)) diff --git a/umbra/hq.py b/umbra/hq.py new file mode 100644 index 0000000..fc11db6 --- /dev/null +++ b/umbra/hq.py @@ -0,0 +1,29 @@ +# vim: set sw=4 et: + +import surt +import kombu +import json +import logging + +class Site: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, seed, id=None): + self.seed = seed + self.id = id + self.scope_surt = surt.surt(seed, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) + + def is_in_scope(self, url): + try: + surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) + return surtt.startswith(self.scope_surt) + except: + self.logger.warn("""problem parsing url "{}" """.format(url), exc_info=True) + return False + + def to_dict(self): + return dict(id=self.id, seed=self.seed) + + def to_json(self): + return json.dumps(self.to_dict(), separators=(',', ':')) +