diff --git a/bin/brozzler-worker b/bin/brozzler-worker index 47de730..d1be9cd 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -35,10 +35,6 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -browsers = set() -browsers_lock = threading.Lock() -num_browsers = 0 - shutdown_requested = threading.Event() def next_url(site): @@ -51,34 +47,39 @@ def next_url(site): msg.ack() return crawl_url -def completed(site, crawl_url): +def completed_url(site, crawl_url): with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) logging.info("putting {} on queue {}".format(crawl_url, q.queue.name)) q.put(crawl_url.to_dict()) -def brozzle_site(site, chrome_port): - with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe, - proxy_server=args.proxy_server, ignore_cert_errors=args.ignore_cert_errors) as browser: - with browsers_lock: - browsers.add(browser) - try: - while not shutdown_requested.is_set(): +def disclaim_site(site): + # XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed" + with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id)) + logging.info("putting {} on queue {}".format(site, q.queue.name)) + q.put(site.to_dict()) + +def brozzle_site(site, browser): + start = time.time() + try: + with browser: + while not shutdown_requested.is_set() and time.time() - start < 60: try: crawl_url = next_url(site) logging.info("crawling {}".format(crawl_url)) crawl_url.outlinks = browser.browse_page(crawl_url.url) - completed(site, crawl_url) + completed_url(site, crawl_url) except kombu.simple.Empty: # if some timeout reached, re-raise? pass - # except kombu.simple.Empty: - # logging.info("finished {} (queue is empty)".format(site)) - except umbra.browser.BrowsingAborted: - logging.info("{} shut down") - finally: - with browsers_lock: - browsers.remove(browser) + # except kombu.simple.Empty: + # logging.info("finished {} (queue is empty)".format(site)) + except umbra.browser.BrowsingAborted: + logging.info("{} shut down".format(browser)) + finally: + disclaim_site(site) + browser_pool.release(browser) class ShutdownRequested(Exception): pass @@ -91,39 +92,46 @@ def sigint(signum, frame): signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) +browser_pool = umbra.browser.BrowserPool(int(args.max_browsers), + chrome_exe=args.chrome_exe, proxy_server=args.proxy_server, + ignore_cert_errors=args.ignore_cert_errors) + latest_state = None -chrome_port = 9200 try: while True: - if num_browsers < int(args.max_browsers): - with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.unclaimed") + with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.unclaimed") + q_empty = False + if len(q) > 0: try: - msg = q.get(block=True, timeout=0.5) - site = hq.Site(**msg.payload) - logging.info("browsing site {}".format(site)) - num_browsers += 1 - msg.ack() - th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), - name="BrowsingThread-{}".format(site.scope_surt)) - th.start() - chrome_port += 1 - except kombu.simple.Empty: - if latest_state != "no-unclaimed-sites": - logging.info("no unclaimed sites to browse") - latest_state = "no-unclaimed-sites" - else: - if latest_state != "browsers-busy": - logging.info("all {} browsers are busy, not looking for unclaimed sites".format(args.max_browsers)) - latest_state = "browsers-busy" - time.sleep(0.5) + browser = browser_pool.acquire() + try: + msg = q.get(block=True, timeout=0.5) + site = hq.Site(**msg.payload) + msg.ack() + logging.info("browsing site {}".format(site)) + th = threading.Thread(target=lambda: brozzle_site(site, browser), + name="BrowsingThread-{}".format(site.scope_surt)) + th.start() + except kombu.simple.Empty: + q_empty = True + except KeyError: + if latest_state != "browsers-busy": + logging.info("all {} browsers are busy".format(args.max_browsers)) + latest_state = "browsers-busy" + else: + q_empty = True + + if q_empty: + if latest_state != "no-unclaimed-sites": + logging.info("no unclaimed sites to browse") + latest_state = "no-unclaimed-sites" + time.sleep(0.5) except ShutdownRequested as e: logging.info("shutting down browsers") shutdown_requested.set() - with browsers_lock: - for browser in browsers: - browser.abort_browse_page() + browser_pool.shutdown_now() for th in threading.enumerate(): if th != threading.current_thread(): diff --git a/umbra/browser.py b/umbra/browser.py index d86173d..601387a 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -22,12 +22,13 @@ class BrowserPool: BASE_PORT = 9200 - def __init__(self, size=3, chrome_exe='chromium-browser'): + def __init__(self, size=3, **kwargs): + """kwargs are passed on to Browser.__init__""" self._available = set() self._in_use = set() for i in range(0, size): - browser = Browser(BrowserPool.BASE_PORT + i, chrome_exe) + browser = Browser(BrowserPool.BASE_PORT + i, **kwargs) self._available.add(browser) self._lock = threading.Lock()