#!/usr/bin/env python # vim: set sw=4 et: import argparse import os import sys import logging import umbra import threading import time import surt import signal import kombu from umbra import hq arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='crawl-url - browse urls, follow links', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='executable to use to invoke chrome') arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', help='max number of chrome instances simultaneously browsing pages') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('--version', action='version', version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') browsers = set() browsers_lock = threading.Lock() num_browsers = 0 shutdown_requested = threading.Event() def next_url(site): """Raises kombu.simple.Empty if queue is empty""" with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) msg = q.get(block=True, timeout=0.5) crawl_url_dict = msg.payload crawl_url = umbra.CrawlUrl(**crawl_url_dict) msg.ack() return crawl_url def completed(site, crawl_url): with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) logging.info("putting {} on queue {}".format(crawl_url, q.queue.name)) q.put(crawl_url.to_dict()) def brozzle_site(site, chrome_port): with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: with browsers_lock: browsers.add(browser) try: while not shutdown_requested.is_set(): try: crawl_url = next_url(site) logging.info("crawling {}".format(crawl_url)) crawl_url.outlinks = browser.browse_page(crawl_url.url) completed(site, crawl_url) except kombu.simple.Empty: # if some timeout reached, re-raise? pass # except kombu.simple.Empty: # logging.info("finished {} (queue is empty)".format(site)) except umbra.browser.BrowsingAborted: logging.info("{} shut down") finally: with browsers_lock: browsers.remove(browser) class ShutdownRequested(Exception): pass def sigterm(signum, frame): raise ShutdownRequested('shutdown requested (caught SIGTERM)') def sigint(signum, frame): raise ShutdownRequested('shutdown requested (caught SIGINT)') signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) latest_state = None chrome_port = 9200 try: while True: if num_browsers < int(args.max_browsers): with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.unclaimed") try: msg = q.get(block=True, timeout=0.5) site = hq.Site(**msg.payload) logging.info("browsing site {}".format(site)) num_browsers += 1 msg.ack() th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), name="BrowsingThread-{}".format(site.scope_surt)) th.start() chrome_port += 1 except kombu.simple.Empty: if latest_state != "no-unclaimed-sites": logging.info("no unclaimed sites to browse") latest_state = "no-unclaimed-sites" else: if latest_state != "browsers-busy": logging.info("all {} browsers are busy, not looking for unclaimed sites".format(args.max_browsers)) latest_state = "browsers-busy" time.sleep(0.5) except ShutdownRequested as e: logging.info("shutting down browsers") shutdown_requested.set() with browsers_lock: for browser in browsers: browser.abort_browse_page() for th in threading.enumerate(): if th != threading.current_thread(): th.join() logging.info("all done, exiting")