#!/usr/bin/env python # vim: set sw=4 et: import argparse import os import sys import logging import brozzler import threading import time import surt import signal import kombu from brozzler import hq import pprint import traceback import youtube_dl import urllib.request import json arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='crawl-url - browse urls, follow links', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='executable to use to invoke chrome') arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', help='max number of chrome instances simultaneously browsing pages') arg_parser.add_argument('--proxy-server', dest='proxy_server', default=None, help='configure browser to use specified proxy server') arg_parser.add_argument('--ignore-certificate-errors', dest='ignore_cert_errors', action='store_true', help='configure browser to ignore certificate errors') arg_parser.add_argument('--enable-warcprox-features', dest='enable_warcprox_features', action='store_true', help='enable special features that assume the configured proxy is warcprox') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('--version', action='version', version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') shutdown_requested = threading.Event() def next_url(site): """Raises kombu.simple.Empty if queue is empty""" with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) msg = q.get(block=True, timeout=0.5) crawl_url_dict = msg.payload crawl_url = brozzler.CrawlUrl(**crawl_url_dict) msg.ack() return crawl_url def completed_url(site, crawl_url): with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) logging.info("putting {} on queue {}".format(crawl_url, q.queue.name)) q.put(crawl_url.to_dict()) def disclaim_site(site): # XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed" with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id)) logging.info("putting {} on queue {}".format(site, q.queue.name)) q.put(site.to_dict()) ydl_opts = { "outtmpl": "/dev/null", "verbose": False, "retries": 1, "logger": logging, "nocheckcertificate": True, "hls_prefer_native": True, "noprogress": True, "nopart": True, "no_color": True, } if args.proxy_server: ydl_opts["proxy"] = "http://{}".format(args.proxy_server) # see https://github.com/rg3/youtube-dl/issues/6087 os.environ["http_proxy"] = "http://{}".format(args.proxy_server) ydl = youtube_dl.YoutubeDL(ydl_opts) def putmeta(url, content_type, payload): assert args.enable_warcprox_features request = urllib.request.Request(url, method="PUTMETA", headers={"Content-Type":content_type}, data=payload) # XXX evil hack to keep urllib from trying to tunnel https urls here request.type = "http" request.set_proxy("localhost:8000", "http") try: with urllib.request.urlopen(request) as response: if response.status != 204: logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(response.status, response.reason)) except urllib.error.HTTPError as e: logging.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(e.getcode(), e.info())) def try_youtube_dl(site, crawl_url): try: logging.info("trying youtube-dl on {}".format(crawl_url)) info = ydl.extract_info(crawl_url.url) if args.proxy_server and args.enable_warcprox_features: info_json = json.dumps(info, sort_keys=True, indent=4) logging.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(crawl_url)) putmeta(url=crawl_url.url, content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", payload=info_json.encode("utf-8")) except BaseException as e: if youtube_dl.utils.UnsupportedError in e.exc_info: pass else: raise def brozzle_site(site, browser): start = time.time() try: with browser: while not shutdown_requested.is_set() and time.time() - start < 60: try: crawl_url = next_url(site) logging.info("crawling {}".format(crawl_url)) try_youtube_dl(site, crawl_url) crawl_url.outlinks = browser.browse_page(crawl_url.url) completed_url(site, crawl_url) except kombu.simple.Empty: # if some timeout reached, re-raise? pass # except kombu.simple.Empty: # logging.info("finished {} (queue is empty)".format(site)) except brozzler.browser.BrowsingAborted: logging.info("{} shut down".format(browser)) finally: disclaim_site(site) browser_pool.release(browser) class ShutdownRequested(Exception): pass def sigterm(signum, frame): raise ShutdownRequested('shutdown requested (caught SIGTERM)') def sigint(signum, frame): raise ShutdownRequested('shutdown requested (caught SIGINT)') def dump_state(signum, frame): pp = pprint.PrettyPrinter(indent=4) state_strs = [] for th in threading.enumerate(): state_strs.append(str(th)) stack = traceback.format_stack(sys._current_frames()[th.ident]) state_strs.append("".join(stack)) logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) browser_pool = brozzler.browser.BrowserPool(int(args.max_browsers), chrome_exe=args.chrome_exe, proxy_server=args.proxy_server, ignore_cert_errors=args.ignore_cert_errors) latest_state = None try: while True: with kombu.Connection(args.amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.unclaimed") q_empty = False if len(q) > 0: try: browser = browser_pool.acquire() try: msg = q.get(block=True, timeout=0.5) site = hq.Site(**msg.payload) msg.ack() logging.info("browsing site {}".format(site)) th = threading.Thread(target=lambda: brozzle_site(site, browser), name="BrowsingThread-{}".format(site.scope_surt)) th.start() except kombu.simple.Empty: q_empty = True except KeyError: if latest_state != "browsers-busy": logging.info("all {} browsers are busy".format(args.max_browsers)) latest_state = "browsers-busy" else: q_empty = True if q_empty: if latest_state != "no-unclaimed-sites": logging.info("no unclaimed sites to browse") latest_state = "no-unclaimed-sites" time.sleep(0.5) except ShutdownRequested as e: logging.info("shutting down browsers") shutdown_requested.set() browser_pool.shutdown_now() for th in threading.enumerate(): if th != threading.current_thread(): th.join() logging.info("all done, exiting")