From 5aea76ab6dd39eb962f9951a0265d2da9805481e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 15 Jul 2015 15:42:40 -0700 Subject: [PATCH] refactor worker code into worker module --- bin/brozzler-worker | 162 ++++------------------------------------- brozzler/worker.py | 174 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 147 deletions(-) create mode 100755 brozzler/worker.py diff --git a/bin/brozzler-worker b/bin/brozzler-worker index 567c48f..95477e5 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -8,15 +8,10 @@ import logging import brozzler import threading import time -import surt import signal -import kombu -from brozzler import hq import pprint import traceback -import youtube_dl -import urllib.request -import json +import brozzler.worker arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='crawl-url - browse urls, follow links', @@ -42,109 +37,10 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -shutdown_requested = threading.Event() - -def next_url(site): - """Raises kombu.simple.Empty if queue is empty""" - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) - msg = q.get(block=True, timeout=0.5) - crawl_url_dict = msg.payload - crawl_url = brozzler.CrawlUrl(**crawl_url_dict) - msg.ack() - return crawl_url - -def completed_url(site, crawl_url): - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) - logging.info("putting {} on queue {}".format(crawl_url, q.queue.name)) - q.put(crawl_url.to_dict()) - -def disclaim_site(site): - # XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed" - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id)) - logging.info("putting {} on queue {}".format(site, q.queue.name)) - q.put(site.to_dict()) - -ydl_opts = { - "outtmpl": "/dev/null", - "verbose": False, - "retries": 1, - "logger": logging, - "nocheckcertificate": True, - "hls_prefer_native": True, - "noprogress": True, - "nopart": True, - "no_color": True, -} -if args.proxy_server: - ydl_opts["proxy"] = "http://{}".format(args.proxy_server) - # see https://github.com/rg3/youtube-dl/issues/6087 - os.environ["http_proxy"] = "http://{}".format(args.proxy_server) -ydl = youtube_dl.YoutubeDL(ydl_opts) - -def putmeta(url, content_type, payload): - assert args.enable_warcprox_features - request = urllib.request.Request(url, method="PUTMETA", - headers={"Content-Type":content_type}, data=payload) - - # XXX evil hack to keep urllib from trying to tunnel https urls here - request.type = "http" - request.set_proxy("localhost:8000", "http") - - try: - with urllib.request.urlopen(request) as response: - if response.status != 204: - logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(response.status, response.reason)) - except urllib.error.HTTPError as e: - logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(e.getcode(), e.info())) - -def try_youtube_dl(site, crawl_url): - try: - logging.info("trying youtube-dl on {}".format(crawl_url)) - info = ydl.extract_info(crawl_url.url) - if args.proxy_server and args.enable_warcprox_features: - info_json = json.dumps(info, sort_keys=True, indent=4) - logging.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(crawl_url)) - putmeta(url=crawl_url.url, - content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", - payload=info_json.encode("utf-8")) - except BaseException as e: - if youtube_dl.utils.UnsupportedError in e.exc_info: - pass - else: - raise - -def brozzle_site(site, browser): - start = time.time() - try: - with browser: - while not shutdown_requested.is_set() and time.time() - start < 60: - try: - crawl_url = next_url(site) - logging.info("crawling {}".format(crawl_url)) - try_youtube_dl(site, crawl_url) - crawl_url.outlinks = browser.browse_page(crawl_url.url) - completed_url(site, crawl_url) - except kombu.simple.Empty: - # if some timeout reached, re-raise? - pass - # except kombu.simple.Empty: - # logging.info("finished {} (queue is empty)".format(site)) - except brozzler.browser.BrowsingAborted: - logging.info("{} shut down".format(browser)) - finally: - disclaim_site(site) - browser_pool.release(browser) - -class ShutdownRequested(Exception): - pass - def sigterm(signum, frame): - raise ShutdownRequested('shutdown requested (caught SIGTERM)') + raise brozzler.ShutdownRequested('shutdown requested (caught SIGTERM)') def sigint(signum, frame): - raise ShutdownRequested('shutdown requested (caught SIGINT)') + raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)') def dump_state(signum, frame): pp = pprint.PrettyPrinter(indent=4) @@ -161,50 +57,22 @@ signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) -browser_pool = brozzler.browser.BrowserPool(int(args.max_browsers), - chrome_exe=args.chrome_exe, proxy_server=args.proxy_server, - ignore_cert_errors=args.ignore_cert_errors) +worker = brozzler.worker.BrozzlerWorker(amqp_url=args.amqp_url, + max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe, + proxy_server=args.proxy_server, + ignore_cert_errors=args.ignore_cert_errors, + enable_warcprox_features=args.enable_warcprox_features) + +worker.start() -latest_state = None try: while True: - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.unclaimed") - q_empty = False - if len(q) > 0: - try: - browser = browser_pool.acquire() - try: - msg = q.get(block=True, timeout=0.5) - site = hq.Site(**msg.payload) - msg.ack() - logging.info("browsing site {}".format(site)) - th = threading.Thread(target=lambda: brozzle_site(site, browser), - name="BrowsingThread-{}".format(site.scope_surt)) - th.start() - except kombu.simple.Empty: - q_empty = True - except KeyError: - if latest_state != "browsers-busy": - logging.info("all {} browsers are busy".format(args.max_browsers)) - latest_state = "browsers-busy" - else: - q_empty = True - - if q_empty: - if latest_state != "no-unclaimed-sites": - logging.info("no unclaimed sites to browse") - latest_state = "no-unclaimed-sites" time.sleep(0.5) -except ShutdownRequested as e: - logging.info("shutting down browsers") - shutdown_requested.set() - - browser_pool.shutdown_now() - +except brozzler.ShutdownRequested as e: + worker.shutdown_now() + for th in threading.enumerate(): if th != threading.current_thread(): th.join() - - logging.info("all done, exiting") - + +logging.info("all done, exiting") diff --git a/brozzler/worker.py b/brozzler/worker.py new file mode 100755 index 0000000..f751bac --- /dev/null +++ b/brozzler/worker.py @@ -0,0 +1,174 @@ +# vim: set sw=4 et: + +import os +import logging +import brozzler +import threading +import time +import signal +import kombu +import brozzler.hq +import youtube_dl +import urllib.request +import json + +class BrozzlerWorker: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", + max_browsers=1, chrome_exe="chromium-browser", + proxy_server=None, ignore_cert_errors=False, + enable_warcprox_features=False): + + self._amqp_url = amqp_url + self._max_browsers = max_browsers + self._proxy_server = proxy_server + self._enable_warcprox_features = enable_warcprox_features + + self._browser_pool = brozzler.browser.BrowserPool(max_browsers, + chrome_exe=chrome_exe, proxy_server=proxy_server, + ignore_cert_errors=ignore_cert_errors) + + self._shutdown_requested = threading.Event() + + ydl_opts = { + "outtmpl": "/dev/null", + "verbose": False, + "retries": 1, + "logger": logging, + "nocheckcertificate": True, + "hls_prefer_native": True, + "noprogress": True, + "nopart": True, + "no_color": True, + } + if self._proxy_server: + ydl_opts["proxy"] = "http://{}".format(self._proxy_server) + # see https://github.com/rg3/youtube-dl/issues/6087 + os.environ["http_proxy"] = "http://{}".format(self._proxy_server) + self._ydl = youtube_dl.YoutubeDL(ydl_opts) + + def _next_url(self, site): + """Raises kombu.simple.Empty if queue is empty""" + with kombu.Connection(self._amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) + msg = q.get(block=True, timeout=0.5) + crawl_url_dict = msg.payload + crawl_url = brozzler.CrawlUrl(**crawl_url_dict) + msg.ack() + return crawl_url + + def _completed_url(self, site, crawl_url): + with kombu.Connection(self._amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) + logging.info("putting {} on queue {}".format(crawl_url, q.queue.name)) + q.put(crawl_url.to_dict()) + + def _disclaim_site(self, site, crawl_url=None): + # XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed" + with kombu.Connection(self._amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id)) + logging.info("putting {} on queue {}".format(site, q.queue.name)) + q.put(site.to_dict()) + if crawl_url: + q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) + logging.info("putting unfinished url {} on queue {}".format(crawl_url, q.queue.name)) + q.put(crawl_url.to_dict()) + + def _putmeta(self, url, content_type, payload): + assert self._enable_warcprox_features + request = urllib.request.Request(url, method="PUTMETA", + headers={"Content-Type":content_type}, data=payload) + + # XXX evil hack to keep urllib from trying to tunnel https urls here + request.type = "http" + request.set_proxy("localhost:8000", "http") + + try: + with urllib.request.urlopen(request) as response: + if response.status != 204: + logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(response.status, response.reason)) + except urllib.error.HTTPError as e: + logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(e.getcode(), e.info())) + + def _try_youtube_dl(self, site, crawl_url): + try: + logging.info("trying youtube-dl on {}".format(crawl_url)) + info = self._ydl.extract_info(crawl_url.url) + if self._proxy_server and self._enable_warcprox_features: + info_json = json.dumps(info, sort_keys=True, indent=4) + logging.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(crawl_url)) + self._putmeta(url=crawl_url.url, + content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", + payload=info_json.encode("utf-8")) + except BaseException as e: + if youtube_dl.utils.UnsupportedError in e.exc_info: + pass + else: + raise + + def _brozzle_site(self, browser, site): + start = time.time() + crawl_url = None + try: + with browser: + while not self._shutdown_requested.is_set() and time.time() - start < 60: + try: + crawl_url = self._next_url(site) + logging.info("crawling {}".format(crawl_url)) + self._try_youtube_dl(site, crawl_url) + crawl_url.outlinks = browser.browse_page(crawl_url.url) + self._completed_url(site, crawl_url) + crawl_url = None + except kombu.simple.Empty: + # if some timeout reached, re-raise? + pass + # except kombu.simple.Empty: + # logging.info("finished {} (queue is empty)".format(site)) + except brozzler.browser.BrowsingAborted: + logging.info("{} shut down".format(browser)) + finally: + self._disclaim_site(site, crawl_url) + self._browser_pool.release(browser) + + def run(self): + latest_state = None + while not self._shutdown_requested.is_set(): + with kombu.Connection(self._amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.unclaimed") + q_empty = False + if len(q) > 0: + try: + browser = self._browser_pool.acquire() + try: + msg = q.get(block=True, timeout=0.5) + site = brozzler.hq.Site(**msg.payload) + msg.ack() # XXX ack only after browsing finished? kinda complicated + logging.info("browsing site {}".format(site)) + th = threading.Thread(target=lambda: self._brozzle_site(browser, site), + name="BrowsingThread-{}".format(site.scope_surt)) + th.start() + except kombu.simple.Empty: + q_empty = True + except KeyError: + if latest_state != "browsers-busy": + logging.info("all {} browsers are busy".format(self._max_browsers)) + latest_state = "browsers-busy" + else: + q_empty = True + + if q_empty: + if latest_state != "no-unclaimed-sites": + logging.info("no unclaimed sites to browse") + latest_state = "no-unclaimed-sites" + time.sleep(0.5) + + def start(self): + th = threading.Thread(target=self.run, name="BrozzlerWorker") + th.start() + + def shutdown_now(self): + logging.info("brozzler worker shutting down") + self._shutdown_requested.set() + self._browser_pool.shutdown_now() +