From 1fb336cb2ee375155af270bbb43c19a92930203c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 11 Jul 2015 02:29:19 -0700 Subject: [PATCH] crawling outlinks not totally working --- bin/brozzler-hq | 65 ++++++++++++++++++++++++++++++++++++--------- bin/brozzler-worker | 58 +++++++++++++++++++--------------------- umbra/url.py | 16 ++++++++--- 3 files changed, 92 insertions(+), 47 deletions(-) diff --git a/bin/brozzler-hq b/bin/brozzler-hq index 398e736..13899e1 100644 --- a/bin/brozzler-hq +++ b/bin/brozzler-hq @@ -12,6 +12,7 @@ import time import kombu import kombu.simple import json +import umbra.hq arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description="brozzler-hq - headquarters of distributed brozzler crawl", @@ -73,10 +74,9 @@ class BrozzlerHQDb: else: return None - def new_site(self, site_dict): - site_json = json.dumps(site_dict, separators=(',', ':')) + def new_site(self, site): cursor = self._conn.cursor() - cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,)) + cursor.execute("insert into brozzler_sites (site_json) values (?)", (site.to_json(),)) self._conn.commit() return cursor.lastrowid @@ -95,7 +95,23 @@ class BrozzlerHQDb: break site_dict = json.loads(row[1]) site_dict["id"] = row[0] - yield site_dict + yield umbra.hq.Site(**site_dict) + + def update_crawl_url(self, crawl_url): + cursor = self._conn.cursor() + # CREATE TABLE brozzler_urls ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), crawl_url_json text + cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id=? and canon_url=?", (crawl_url.site_id, crawl_url.canonical())) + row = cursor.fetchone() + if row: + # (id, priority, existing_crawl_url) = row + new_priority = crawl_url.calc_priority() + row[1] + existing_crawl_url = CrawlUrl(**json.loads(row[2])) + existing_crawl_url.hops_from_seed = min(crawl_url.hops_from_seed, existing_crawl_url.hops_from_seed) + + cursor.execute("update brozzler_urls set priority=?, crawl_url_json=? where id=?", (new_priority, existing_crawl_url.to_json(), row["id"])) + self._conn.commit() + else: + raise KeyError("crawl url not in brozzler_urls site_id={} url={}".format(crawl_url.site_id, crawl_url.canonical())) class BrozzlerHQ: logger = logging.getLogger(__module__ + "." + __qualname__) @@ -114,7 +130,7 @@ class BrozzlerHQ: try: while True: self._new_site() - # self._consume_completed_url() + self._consume_completed_url() self._feed_crawl_urls() time.sleep(0.5) finally: @@ -123,13 +139,16 @@ class BrozzlerHQ: def _new_site(self): try: msg = self._new_sites_q.get(block=False) - new_site_dict = msg.payload - site_id = self._db.new_site(new_site_dict) - new_site_dict["id"] = site_id - self._schedule_seed(site_id, new_site_dict["seed"]) - self._unclaimed_sites_q.put(new_site_dict) + new_site = umbra.hq.Site(**msg.payload) msg.ack() - self.logger.info("new site {}".format(new_site_dict)) + + site_id = self._db.new_site(new_site) + new_site.id = site_id + + self._schedule_seed(site_id, new_site.seed) + + self._unclaimed_sites_q.put(new_site.to_dict()) + self.logger.info("new site {}".format(new_site)) except kombu.simple.Empty: pass @@ -139,13 +158,33 @@ class BrozzlerHQ: def _feed_crawl_urls(self): for site in self._db.sites(): - q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site["id"])) + q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) if len(q) == 0: - url = self._db.pop_url(site["id"]) + url = self._db.pop_url(site.id) if url: self.logger.info("feeding {} to {}".format(url, q.queue.name)) q.put(url) + def _consume_completed_url(self): + for site in self._db.sites(): + q = self._conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) + try: + msg = q.get(block=False) + completed_url = umbra.CrawlUrl(**msg.payload) + msg.ack() + + self.logger.info("adding outlinks from {} outlinks={}".format(completed_url, completed_url.outlinks)) + if completed_url.outlinks: + for url in completed_url.outlinks: + crawl_url = umbra.CrawlUrl(url, site_id=site.id, hops_from_seed=completed_url.hops_from_seed+1) + try: + self._db.update_crawl_url(crawl_url) + except KeyError: + self._db.schedule_url(crawl_url, priority=crawl_url.calc_priority()) + except kombu.simple.Empty: + pass + + logging.info("brozzler-hq starting") db = BrozzlerHQDb(db_file=args.db_file) diff --git a/bin/brozzler-worker b/bin/brozzler-worker index 081e901..f9e59ee 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -11,6 +11,7 @@ import time import surt import signal import kombu +from umbra import hq arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='crawl-url - browse urls, follow links', @@ -30,36 +31,27 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -class Site: - def __init__(self, id, seed): - self.id = id - self.seed = seed - self.scope_surt = surt.surt(seed, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) - - def is_in_scope(self, url): - surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) - return surtt.startswith(self.scope_surt) - - def next_url(self): - """Raises kombu.simple.Empty if queue is empty""" - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(self.id)) - msg = q.get(block=True, timeout=0.5) - crawl_url_dict = msg.payload - crawl_url = umbra.CrawlUrl(**crawl_url_dict) - msg.ack() - return crawl_url - - def completed(self, crawl_url): - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(self.id)) - q.put(crawl_url.to_json()) - browsers = set() browsers_lock = threading.Lock() shutdown_requested = threading.Event() +def next_url(site): + """Raises kombu.simple.Empty if queue is empty""" + with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) + msg = q.get(block=True, timeout=0.5) + crawl_url_dict = msg.payload + crawl_url = umbra.CrawlUrl(**crawl_url_dict) + msg.ack() + return crawl_url + +def completed(site, crawl_url): + with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) + logging.info("putting {} on queue {}".format(crawl_url, q.queue.name)) + q.put(crawl_url.to_dict()) + def brozzle_site(site, chrome_port): with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: with browsers_lock: @@ -67,10 +59,10 @@ def brozzle_site(site, chrome_port): try: while not shutdown_requested.is_set(): try: - crawl_url = site.next_url() + crawl_url = next_url(site) logging.info("crawling {}".format(crawl_url)) crawl_url.outlinks = browser.browse_page(crawl_url.url) - site.completed(crawl_url) + completed(site, crawl_url) except kombu.simple.Empty: # if some timeout reached, raise pass @@ -93,6 +85,7 @@ def sigint(signum, frame): signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) +latest_state = None chrome_port = 9200 try: while True: @@ -101,16 +94,21 @@ try: q = conn.SimpleQueue("brozzler.sites.unclaimed") try: msg = q.get(block=True, timeout=0.5) - site_dict = msg.payload - site = Site(**site_dict) + site = hq.Site(**msg.payload) + logging.info("browsing site {}".format(site)) msg.ack() th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), name="BrowsingThread-{}".format(site.scope_surt)) th.start() chrome_port += 1 except kombu.simple.Empty: - pass + if latest_state != "no-unclaimed-sites": + logging.info("no unclaimed sites to browse") + latest_state = "no-unclaimed-sites" else: + if latest_state != "browsers-busy": + logging.info("all {} browsers are busy, not looking for unclaimed sites".format(args.max_browsers)) + latest_state = "browsers-busy" time.sleep(0.5) except ShutdownRequested as e: logging.info("shutting down browsers") diff --git a/umbra/url.py b/umbra/url.py index d955285..81ae4a6 100644 --- a/umbra/url.py +++ b/umbra/url.py @@ -10,26 +10,34 @@ class CrawlUrl: self.url = url self.hops_from_seed = hops_from_seed self._canon_hurl = None - self.outlinks = None + self.outlinks = outlinks def __repr__(self): return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format( self.url, self.site_id, self.hops_from_seed) + def calc_priority(self): + priority = 0 + priority += max(0, 10 - self.hops_from_seed) + priority += max(0, 6 - self.canonical().count("/")) + return priority + def canonical(self): if self._canon_hurl is None: self._canon_hurl = surt.handyurl.parse(self.url) surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) return self._canon_hurl.geturl() - def to_json(self): + def to_dict(self): if self.outlinks is not None and not isinstance(self.outlinks, list): outlinks = [] outlinks.extend(self.outlinks) else: outlinks = self.outlinks - d = dict(id=self.id, site_id=self.site_id, url=self.url, + return dict(id=self.id, site_id=self.site_id, url=self.url, hops_from_seed=self.hops_from_seed, outlinks=outlinks) - return json.dumps(d, separators=(',', ':')) + + def to_json(self): + return json.dumps(self.to_dict(), separators=(',', ':'))