From fd99764baa536010f1c58ac01e74cb425c2056b6 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Jul 2015 21:07:47 -0700 Subject: [PATCH] brozzler-worker partially working --- bin/brozzler-hq | 25 +--------- bin/brozzler-worker | 118 ++++++++++++++++++++++++++++++++++++++++++++ umbra/__init__.py | 1 + umbra/url.py | 27 ++++++++++ 4 files changed, 148 insertions(+), 23 deletions(-) create mode 100755 bin/brozzler-worker create mode 100644 umbra/url.py diff --git a/bin/brozzler-hq b/bin/brozzler-hq index 974c312..398e736 100644 --- a/bin/brozzler-hq +++ b/bin/brozzler-hq @@ -29,28 +29,6 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") -class CrawlUrl: - def __init__(self, url, id=None, site_id=None, hops_from_seed=0): - self.id = id - self.site_id = site_id - self.url = url - self.hops_from_seed = hops_from_seed - self._canon_hurl = None - - def __repr__(self): - return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format( - self.url, self.site_id, self.hops_from_seed) - - def canonical(self): - if self._canon_hurl is None: - self._canon_hurl = surt.handyurl.parse(self.url) - surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) - return self._canon_hurl.geturl() - - def to_json(self): - d = dict(id=self.id, site_id=self.site_id, url=self.url, hops_from_seed=self.hops_from_seed) - return json.dumps(d, separators=(',', ':')) - class BrozzlerHQDb: logger = logging.getLogger(__module__ + "." + __qualname__) @@ -156,7 +134,7 @@ class BrozzlerHQ: pass def _schedule_seed(self, site_id, seed_url): - crawl_url = CrawlUrl(seed_url, site_id=site_id, hops_from_seed=0) + crawl_url = umbra.CrawlUrl(seed_url, site_id=site_id, hops_from_seed=0) self._db.schedule_url(crawl_url, priority=1000) def _feed_crawl_urls(self): @@ -165,6 +143,7 @@ class BrozzlerHQ: if len(q) == 0: url = self._db.pop_url(site["id"]) if url: + self.logger.info("feeding {} to {}".format(url, q.queue.name)) q.put(url) logging.info("brozzler-hq starting") diff --git a/bin/brozzler-worker b/bin/brozzler-worker new file mode 100755 index 0000000..be027cc --- /dev/null +++ b/bin/brozzler-worker @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import argparse +import os +import sys +import logging +import umbra +import threading +import time +import surt +import signal +import kombu + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), + description='crawl-url - browse urls, follow links', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the amqp server to talk to') +arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', + help='executable to use to invoke chrome') +arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', + help='max number of chrome instances simultaneously browsing pages') +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument('--version', action='version', + version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +class Site: + def __init__(self, id, seed): + self.id = id + self.seed = seed + self.scope_surt = surt.surt(seed, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) + + def is_in_scope(self, url): + surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) + return surtt.startswith(self.scope_surt) + + # should this be a generator function? + def next_url(self): + """Raises kombu.simple.Empty if queue is empty""" + with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(self.id)) + msg = q.get(block=True, timeout=0.5) + crawl_url_dict = msg.payload + crawl_url = umbra.CrawlUrl(**crawl_url_dict) + msg.ack() + return crawl_url + +browsers = set() +browsers_lock = threading.Lock() + +# "browse" + "crawl" = "brozzle" +def brozzle_site(site, chrome_port): + with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: + with browsers_lock: + browsers.add(browser) + try: + while True: + crawl_url = site.next_url() + logging.info("crawling {}".format(crawl_url)) + outlinks = browser.browse_page(crawl_url.url) + # site.submit(outlinks, hops_from_seed=crawl_url.hops_from_seed+1) + except kombu.simple.Empty: + logging.info("finished {} (queue is empty)".format(site)) + except umbra.browser.BrowsingAborted: + logging.info("{} shut down") + finally: + with browsers_lock: + browsers.remove(browser) + +class ShutdownRequested(Exception): + pass + +def sigterm(signum, frame): + raise ShutdownRequested('shutdown requested (caught SIGTERM)') +def sigint(signum, frame): + raise ShutdownRequested('shutdown requested (caught SIGINT)') + +signal.signal(signal.SIGTERM, sigterm) +signal.signal(signal.SIGINT, sigint) + +chrome_port = 9200 +try: + while True: + if len(browsers) < int(args.max_browsers): + with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.unclaimed") + try: + msg = q.get(block=True, timeout=0.5) + site_dict = msg.payload + site = Site(**site_dict) + msg.ack() + th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), + name="BrowsingThread-{}".format(site.scope_surt)) + th.start() + chrome_port += 1 + except kombu.simple.Empty: + pass + else: + time.sleep(0.5) +except ShutdownRequested as e: + logging.info("shutting down browsers") + + with browsers_lock: + for browser in browsers: + browser.abort_browse_page() + + for th in threading.enumerate(): + if th != threading.current_thread(): + th.join() + + logging.info("all done, exiting") + diff --git a/umbra/__init__.py b/umbra/__init__.py index 7372e40..be84f77 100644 --- a/umbra/__init__.py +++ b/umbra/__init__.py @@ -1,5 +1,6 @@ from umbra.browser import Browser from umbra.controller import AmqpBrowserController +from umbra.url import CrawlUrl Umbra = AmqpBrowserController def _read_version(): diff --git a/umbra/url.py b/umbra/url.py new file mode 100644 index 0000000..a99b831 --- /dev/null +++ b/umbra/url.py @@ -0,0 +1,27 @@ +# vim: set sw=4 et: + +import surt +import json + +class CrawlUrl: + def __init__(self, url, id=None, site_id=None, hops_from_seed=0): + self.id = id + self.site_id = site_id + self.url = url + self.hops_from_seed = hops_from_seed + self._canon_hurl = None + + def __repr__(self): + return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format( + self.url, self.site_id, self.hops_from_seed) + + def canonical(self): + if self._canon_hurl is None: + self._canon_hurl = surt.handyurl.parse(self.url) + surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) + return self._canon_hurl.geturl() + + def to_json(self): + d = dict(id=self.id, site_id=self.site_id, url=self.url, hops_from_seed=self.hops_from_seed) + return json.dumps(d, separators=(',', ':')) +