From fd0c3322eec5194a2028d4c4a1bbc359dfb75d52 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Jul 2015 17:09:39 -0700 Subject: [PATCH] update readme, s/umbra/brozzler/ in most places, delete non-brozzler stuff --- .gitignore | 2 +- README.md | 41 +--- bin/browse-url | 6 +- bin/brozzler-add-site | 4 +- bin/brozzler-hq | 18 +- bin/brozzler-worker | 12 +- bin/crawl-url | 182 --------------- bin/drain-queue | 55 ----- bin/queue-json | 38 ---- bin/queue-url | 39 ---- bin/umbra | 123 ---------- {umbra => brozzler}/__init__.py | 6 +- {umbra => brozzler}/behaviors.d/default.js | 0 {umbra => brozzler}/behaviors.d/facebook.js | 0 {umbra => brozzler}/behaviors.d/flickr.js | 0 {umbra => brozzler}/behaviors.d/instagram.js | 0 .../behaviors.d/marquette_edu.js | 0 .../behaviors.d/simpleclicks.js.in | 0 {umbra => brozzler}/behaviors.d/vimeo.js | 0 {umbra => brozzler}/behaviors.py | 0 {umbra => brozzler}/behaviors.yaml | 0 {umbra => brozzler}/browser.py | 2 +- {umbra => brozzler}/hq.py | 0 {umbra => brozzler}/url.py | 0 requirements.txt | 1 - setup.py | 20 +- umbra/controller.py | 212 ------------------ 27 files changed, 41 insertions(+), 720 deletions(-) mode change 100644 => 100755 bin/brozzler-add-site mode change 100644 => 100755 bin/brozzler-hq delete mode 100755 bin/crawl-url delete mode 100755 bin/drain-queue delete mode 100755 bin/queue-json delete mode 100755 bin/queue-url delete mode 100755 bin/umbra rename {umbra => brozzler}/__init__.py (67%) rename {umbra => brozzler}/behaviors.d/default.js (100%) rename {umbra => brozzler}/behaviors.d/facebook.js (100%) rename {umbra => brozzler}/behaviors.d/flickr.js (100%) rename {umbra => brozzler}/behaviors.d/instagram.js (100%) rename {umbra => brozzler}/behaviors.d/marquette_edu.js (100%) rename {umbra => brozzler}/behaviors.d/simpleclicks.js.in (100%) rename {umbra => brozzler}/behaviors.d/vimeo.js (100%) rename {umbra => brozzler}/behaviors.py (100%) rename {umbra => brozzler}/behaviors.yaml (100%) rename {umbra => brozzler}/browser.py (99%) rename {umbra => brozzler}/hq.py (100%) rename {umbra => brozzler}/url.py (100%) delete mode 100644 umbra/controller.py diff --git a/.gitignore b/.gitignore index 743b95b..14d49f4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ *.pyc *.diff .*.sw* -/umbra.egg-info/ +/brozzler.egg-info/ diff --git a/README.md b/README.md index 11b8d38..4932ca4 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,16 @@ -umbra -===== -Umbra is a browser automation tool, developed for the web archiving service -https://archive-it.org/. +brozzler +======== +"browser" ^ "crawler" = "brozzler" -Umbra receives urls via AMQP. It opens them in the chrome or chromium browser, -with which it communicates using the chrome remote debug protocol (see -https://developer.chrome.com/devtools/docs/debugger-protocol). It runs -javascript behaviors to simulate user interaction with the page. It publishes -information about the the urls requested by the browser back to AMQP. The -format of the incoming and outgoing AMQP messages is described in `pydoc -umbra.controller`. +Brozzler is a distributed web crawler that uses a real browser (chrome or +chromium) to fetch pages and embedded urls and to extract links. -Umbra can be used with the Heritrix web crawler, using these heritrix modules: -* [AMQPUrlReceiver](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java) -* [AMQPPublishProcessor](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java) - -Install ------- -Install via pip from this repo, e.g. - - pip install git+https://github.com/internetarchive/umbra.git - -Umbra requires an AMQP messaging service like RabbitMQ. On Ubuntu, -`sudo apt-get install rabbitmq-server` will install and start RabbitMQ at amqp://guest:guest@localhost:5672/%2f, which is the default AMQP url for umbra. - -Run ---- -The command `umbra` will start umbra with default configuration. `umbra --help` -describes all command line options. - -Umbra also comes with these utilities: -* browse-url - open urls in chrome/chromium and run behaviors (without involving AMQP) -* queue-url - send url to umbra via AMQP -* drain-queue - consume messages from AMQP queue +It is forked from https://github.com/internetarchive/umbra. License ------- -Copyright 2014 Internet Archive +Copyright 2015 Internet Archive Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. diff --git a/bin/browse-url b/bin/browse-url index e979fda..47e106c 100755 --- a/bin/browse-url +++ b/bin/browse-url @@ -5,7 +5,7 @@ import argparse import os import sys import logging -import umbra +import brozzler import re import datetime @@ -20,14 +20,14 @@ arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromi arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('--version', action='version', - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) + version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -with umbra.Browser(chrome_exe=args.chrome_exe) as browser: +with brozzler.Browser(chrome_exe=args.chrome_exe) as browser: for url in args.urls: def on_screenshot(screenshot_png): diff --git a/bin/brozzler-add-site b/bin/brozzler-add-site old mode 100644 new mode 100755 index 641441d..dedb1e7 --- a/bin/brozzler-add-site +++ b/bin/brozzler-add-site @@ -5,7 +5,7 @@ import argparse import os import sys import logging -import umbra +import brozzler import kombu arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), @@ -17,7 +17,7 @@ arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:gu arg_parser.add_argument("-v", "--verbose", dest="log_level", action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument("--version", action="version", - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) + version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, diff --git a/bin/brozzler-hq b/bin/brozzler-hq old mode 100644 new mode 100755 index 1014287..bdee5c3 --- a/bin/brozzler-hq +++ b/bin/brozzler-hq @@ -5,14 +5,14 @@ import argparse import os import sys import logging -import umbra +import brozzler import surt import sqlite3 import time import kombu import kombu.simple import json -import umbra.hq +import brozzler.hq import signal arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), @@ -25,7 +25,7 @@ arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:gu arg_parser.add_argument("-v", "--verbose", dest="log_level", action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument("--version", action="version", - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) + version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, @@ -101,7 +101,7 @@ class BrozzlerHQDb: break site_dict = json.loads(row[1]) site_dict["id"] = row[0] - yield umbra.hq.Site(**site_dict) + yield brozzler.hq.Site(**site_dict) def update_crawl_url(self, crawl_url): cursor = self._conn.cursor() @@ -111,7 +111,7 @@ class BrozzlerHQDb: if row: # (id, priority, existing_crawl_url) = row new_priority = crawl_url.calc_priority() + row[1] - existing_crawl_url = umbra.CrawlUrl(**json.loads(row[2])) + existing_crawl_url = brozzler.CrawlUrl(**json.loads(row[2])) existing_crawl_url.hops_from_seed = min(crawl_url.hops_from_seed, existing_crawl_url.hops_from_seed) cursor.execute("update brozzler_urls set priority=?, crawl_url_json=? where id=?", (new_priority, existing_crawl_url.to_json(), row[0])) @@ -145,7 +145,7 @@ class BrozzlerHQ: def _new_site(self): try: msg = self._new_sites_q.get(block=False) - new_site = umbra.hq.Site(**msg.payload) + new_site = brozzler.hq.Site(**msg.payload) msg.ack() self.logger.info("new site {}".format(new_site)) @@ -153,7 +153,7 @@ class BrozzlerHQ: new_site.id = site_id if new_site.is_permitted_by_robots(new_site.seed): - crawl_url = umbra.CrawlUrl(new_site.seed, site_id=new_site.id, hops_from_seed=0) + crawl_url = brozzler.CrawlUrl(new_site.seed, site_id=new_site.id, hops_from_seed=0) self._db.schedule_url(crawl_url, priority=1000) self._unclaimed_sites_q.put(new_site.to_dict()) else: @@ -176,7 +176,7 @@ class BrozzlerHQ: for url in parent_url.outlinks: if site.is_in_scope(url): if site.is_permitted_by_robots(url): - crawl_url = umbra.CrawlUrl(url, site_id=site.id, hops_from_seed=parent_url.hops_from_seed+1) + crawl_url = brozzler.CrawlUrl(url, site_id=site.id, hops_from_seed=parent_url.hops_from_seed+1) try: self._db.update_crawl_url(crawl_url) counts["updated"] += 1 @@ -196,7 +196,7 @@ class BrozzlerHQ: q = self._conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) try: msg = q.get(block=False) - completed_url = umbra.CrawlUrl(**msg.payload) + completed_url = brozzler.CrawlUrl(**msg.payload) msg.ack() self._db.completed(completed_url) self._scope_and_schedule_outlinks(site, completed_url) diff --git a/bin/brozzler-worker b/bin/brozzler-worker index fb6bfb2..a337456 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -5,13 +5,13 @@ import argparse import os import sys import logging -import umbra +import brozzler import threading import time import surt import signal import kombu -from umbra import hq +from brozzler import hq import pprint import traceback import youtube_dl @@ -32,7 +32,7 @@ arg_parser.add_argument('--ignore-certificate-errors', dest='ignore_cert_errors' arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('--version', action='version', - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) + version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, @@ -46,7 +46,7 @@ def next_url(site): q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) msg = q.get(block=True, timeout=0.5) crawl_url_dict = msg.payload - crawl_url = umbra.CrawlUrl(**crawl_url_dict) + crawl_url = brozzler.CrawlUrl(**crawl_url_dict) msg.ack() return crawl_url @@ -87,7 +87,7 @@ def brozzle_site(site, browser): pass # except kombu.simple.Empty: # logging.info("finished {} (queue is empty)".format(site)) - except umbra.browser.BrowsingAborted: + except brozzler.browser.BrowsingAborted: logging.info("{} shut down".format(browser)) finally: disclaim_site(site) @@ -116,7 +116,7 @@ signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) -browser_pool = umbra.browser.BrowserPool(int(args.max_browsers), +browser_pool = brozzler.browser.BrowserPool(int(args.max_browsers), chrome_exe=args.chrome_exe, proxy_server=args.proxy_server, ignore_cert_errors=args.ignore_cert_errors) diff --git a/bin/crawl-url b/bin/crawl-url deleted file mode 100755 index 159c72c..0000000 --- a/bin/crawl-url +++ /dev/null @@ -1,182 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import argparse -import os -import sys -import logging -import umbra -import threading -import time -import sortedcontainers -import surt -import signal - -arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description='crawl-url - browse urls, follow links', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') -arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', - help='seconds to wait for browser initialization') -arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', - help='executable to use to invoke chrome') -arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', - help='max number of chrome instances simultaneously browsing pages') -arg_parser.add_argument('-v', '--verbose', dest='log_level', - action="store_const", default=logging.INFO, const=logging.DEBUG) -arg_parser.add_argument('--version', action='version', - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) -args = arg_parser.parse_args(args=sys.argv[1:]) - -logging.basicConfig(stream=sys.stdout, level=args.log_level, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -class CrawlUrl: - def __init__(self, url, priority=None, hops_from_seed=0): - self.url = url - self.hops_from_seed = hops_from_seed - self._surt = None - if priority: - self.set_priority(priority) - else: - self.set_priority(self.calc_priority()) - - def set_priority(self, priority): - # priority_key is both a sortable priority (higher value is higher - # priority) and a unique hash key - self.priority_key = (priority << 32) | (hash(self.surt) & (2**32 - 1)) - - def calc_priority(self): - priority = 0 - priority += max(0, 10 - self.hops_from_seed) - priority += max(0, 6 - self.surt.count('/')) - return priority - - def __repr__(self): - return """CrawlUrl(url="{}",priority={},hops_from_seed={})""".format(self.url, self.priority, self.hops_from_seed) - - @property - def surt(self): - if self._surt is None: - self._surt = surt.surt(self.url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) - return self._surt - - @property - def priority(self): - return self.priority_key >> 32 - -class CrawlUrlQueue: - def __init__(self): - # {priority_key:CrawlUrl} - self._pq = sortedcontainers.SortedDict() - # {surt:CrawlUrl} - self._urls = {} - self.aggregate_priority = 0 - - def __len__(self): - assert len(self._urls) == len(self._pq) - return len(self._urls) - - def schedule(self, crawl_url): - self.aggregate_priority += crawl_url.priority - - try: - old_priority_key = self._urls.pop(crawl_url.surt).priority_key - old_crawl_url = self._pq.pop(old_priority_key) - - # XXX dumb calculation of new priority, may not belong here - crawl_url.set_priority(crawl_url.priority + old_crawl_url.priority) - crawl_url.hops_from_seed = min(old_crawl_url.hops_from_seed, crawl_url.hops_from_seed) - except KeyError: - pass - - self._urls[crawl_url.surt] = crawl_url - self._pq[crawl_url.priority_key] = crawl_url - - def next_url(self): - res0 = self._pq.popitem(last=True)[1] - res1 = self._urls.pop(res0.surt) - assert res0 is res1 - - new_low_priority = CrawlUrl(res0.url, priority=-1000, hops_from_seed=res0.hops_from_seed) - self.schedule(new_low_priority) - - return res0 - -class Site: - """A seed url, scope definition, and prioritized url queue.""" - def __init__(self, seed_url): - self.seed = CrawlUrl(seed_url, priority=1000) - - self.q = CrawlUrlQueue() - self.q.schedule(self.seed) - - def is_in_scope(self, url): - surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) - return surtt.startswith(self.seed.surt) - - def submit(self, urls, hops_from_seed): - for url in urls: - if self.is_in_scope(url): - logging.debug("{} accepted {}".format(self.seed.surt, url)) - crawl_url = CrawlUrl(url, hops_from_seed=hops_from_seed) - self.q.schedule(crawl_url) - else: - logging.debug("{} rejected {}".format(self.seed.surt, url)) - -browsers = [] -browsers_lock = threading.Lock() - -# "browse" + "crawl" = "brozzle" -def brozzle_site(site, chrome_port): - with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: - with browsers_lock: - browsers.append(browser) - - try: - while True: - crawl_url = site.q.next_url() - logging.info("crawling {}".format(crawl_url)) - outlinks = browser.browse_page(crawl_url.url) - site.submit(outlinks, hops_from_seed=crawl_url.hops_from_seed+1) - except umbra.browser.BrowsingAborted: - pass - -class ShutdownRequested(Exception): - pass - -def sigterm(signum, frame): - raise ShutdownRequested('shutdown requested (caught SIGTERM)') -def sigint(signum, frame): - raise ShutdownRequested('shutdown requested (caught SIGINT)') - -signal.signal(signal.SIGTERM, sigterm) -signal.signal(signal.SIGINT, sigint) - -chrome_port = 9200 -for seed_url in args.urls: - site = Site(seed_url) - - th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), - name="BrowsingThread-{}".format(site.seed.surt)) - th.start() - - chrome_port += 1 - -try: - while True: - time.sleep(0.5) -except ShutdownRequested as e: - pass - -logging.info("shutting down browsers") - -with browsers_lock: - for browser in browsers: - browser.abort_browse_page() - -for th in threading.enumerate(): - if th != threading.current_thread(): - th.join() - -logging.info("all done, exiting") diff --git a/bin/drain-queue b/bin/drain-queue deleted file mode 100755 index 3a51490..0000000 --- a/bin/drain-queue +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: -import os -import sys -import argparse -import logging -import socket -import umbra -from kombu import Connection, Exchange, Queue - -arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description='drain-queue - consume messages from AMQP queue', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the AMQP server to talk to') -arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', - help='AMQP exchange name') -arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', - help='AMQP queue name') -arg_parser.add_argument('-n', '--no-ack', dest='no_ack', action="store_const", - default=False, const=True, help="leave messages on the queue (default: remove them from the queue)") -arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store_const", - default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)") -arg_parser.add_argument('-v', '--verbose', dest='log_level', - action="store_const", default=logging.INFO, const=logging.DEBUG) -arg_parser.add_argument('--version', action='version', - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) -args = arg_parser.parse_args(args=sys.argv[1:]) - -logging.basicConfig(stream=sys.stderr, level=args.log_level, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -def print_and_maybe_ack(body, message): - # do this instead of print(body) so that output syntax is json, not python - # dict (they are similar but not identical) - print(message.body.decode('utf-8')) - - if not args.no_ack: - message.ack() - -exchange = Exchange(args.amqp_exchange, 'direct', durable=True) -queue = Queue(args.amqp_queue, exchange=exchange) -try: - with Connection(args.amqp_url) as conn: - with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer: - consumer.qos(prefetch_count=1) - while True: - try: - conn.drain_events(timeout=0.5) - except socket.timeout: - if not args.run_forever: - logging.debug("exiting, no messages left on the queue") - break -except KeyboardInterrupt: - logging.debug("exiting, stopped by user") diff --git a/bin/queue-json b/bin/queue-json deleted file mode 100755 index 8aa71a8..0000000 --- a/bin/queue-json +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/python3.4 -# vim: set sw=4 et: - -import os -import sys -import argparse -import logging -import umbra -import json -from kombu import Connection, Exchange, Queue - -arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description='queue-json - send json message to umbra via AMQP', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the AMQP server to talk to') -arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', - help='AMQP exchange name') -arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', - help='AMQP routing key') -arg_parser.add_argument('-v', '--verbose', dest='log_level', - action="store_const", default=logging.INFO, const=logging.DEBUG) -arg_parser.add_argument('--version', action='version', - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) -arg_parser.add_argument('payload_json', metavar='JSON_PAYLOAD', help='json payload to send to umbra') -args = arg_parser.parse_args(args=sys.argv[1:]) - -logging.basicConfig(stream=sys.stdout, level=args.log_level, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -payload = json.loads(args.payload_json) - -exchange = Exchange(args.amqp_exchange, 'direct', durable=True) -with Connection(args.amqp_url) as conn: - producer = conn.Producer(serializer='json') - logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload)) - producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange) - diff --git a/bin/queue-url b/bin/queue-url deleted file mode 100755 index e93cfa8..0000000 --- a/bin/queue-url +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import os -import sys -import argparse -import logging -import umbra -from kombu import Connection, Exchange, Queue - -arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description='queue-url - send url to umbra via AMQP', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the AMQP server to talk to') -arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', - help='AMQP exchange name') -arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='urls', - help='AMQP routing key') -arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0', - help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') -arg_parser.add_argument('-v', '--verbose', dest='log_level', - action="store_const", default=logging.INFO, const=logging.DEBUG) -arg_parser.add_argument('--version', action='version', - version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) -arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') -args = arg_parser.parse_args(args=sys.argv[1:]) - -logging.basicConfig(stream=sys.stdout, level=args.log_level, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -exchange = Exchange(args.amqp_exchange, 'direct', durable=True) -with Connection(args.amqp_url) as conn: - producer = conn.Producer(serializer='json') - for url in args.urls: - payload = {'url': url, 'metadata': {}, 'clientId': args.client_id} - logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload)) - producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange) - diff --git a/bin/umbra b/bin/umbra deleted file mode 100755 index 63da89d..0000000 --- a/bin/umbra +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import logging -import argparse -import time -import umbra -import sys -import signal -import os -import umbra -import signal -import threading -import traceback -import pprint - -if __name__ == "__main__": - arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description='umbra - browser automation tool communicating via AMQP', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', - help='Executable to use to invoke chrome') - arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') - arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', - help='AMQP exchange name') - arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', - help='AMQP queue to consume urls from') - arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', - help='Max number of chrome instances simultaneously browsing pages') - arg_parser.add_argument('-v', '--verbose', dest='log_level', - action="store_const", default=logging.INFO, const=logging.DEBUG) - arg_parser.add_argument('--version', action='version', - version="umbra {}".format(umbra.version)) - args = arg_parser.parse_args(args=sys.argv[1:]) - - logging.basicConfig(stream=sys.stdout, level=args.log_level, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - - logging.info("umbra {} starting up".format(umbra.version)) - - controller = umbra.Umbra(args.amqp_url, args.chrome_exe, - max_active_browsers=int(args.max_browsers), - exchange_name=args.amqp_exchange, queue_name=args.amqp_queue) - - def browserdump_str(pp, b): - x = [] - x.append(pp.pformat(b.__dict__)) - if b._chrome_instance: - x.append("=> {} chrome instance:".format(b)) - x.append(pp.pformat(b._chrome_instance.__dict__)) - if b._behavior: - x.append("=> {} active behavior:".format(b)) - x.append(pp.pformat(b._behavior.__dict__)) - return "\n".join(x) - - def dump_state(signum, frame): - pp = pprint.PrettyPrinter(indent=4) - state_strs = [] - - for th in threading.enumerate(): - state_strs.append(str(th)) - stack = traceback.format_stack(sys._current_frames()[th.ident]) - state_strs.append("".join(stack)) - - state_strs.append("umbra controller:") - state_strs.append(pp.pformat(controller.__dict__)) - state_strs.append("") - - for b in controller._browser_pool._in_use: - state_strs.append("{} (in use):".format(b)) - state_strs.append(browserdump_str(pp, b)) - state_strs.append("") - for b in controller._browser_pool._available: - state_strs.append("{} (not in use):".format(b)) - state_strs.append(browserdump_str(pp, b)) - state_strs.append("") - - logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) - - - class ShutdownRequested(Exception): - pass - - def sigterm(signum, frame): - raise ShutdownRequested('shutdown requested (caught SIGTERM)') - def sigint(signum, frame): - raise ShutdownRequested('shutdown requested (caught SIGINT)') - - signal.signal(signal.SIGQUIT, dump_state) - signal.signal(signal.SIGHUP, controller.reconnect) - signal.signal(signal.SIGTERM, sigterm) - signal.signal(signal.SIGINT, sigint) - - controller.start() - - try: - while True: - time.sleep(0.5) - except ShutdownRequested as e: - logging.info(e) - except BaseException as e: - logging.fatal(e, exc_info=sys.exc_info()) - finally: - try: - controller.shutdown() - for th in threading.enumerate(): - if th != threading.current_thread(): - th.join() - except BaseException as e: - logging.warn("caught exception {}".format(e)) - for i in range(6,0,-1): - controller.shutdown_now() - try: - for th in threading.enumerate(): - if th != threading.current_thread(): - th.join() - break # if we get here, we're done, all threads finished - except: - logging.warn("caught exception {}".format(e)) - - logging.info("all finished, exiting") - diff --git a/umbra/__init__.py b/brozzler/__init__.py similarity index 67% rename from umbra/__init__.py rename to brozzler/__init__.py index be84f77..ce9b71c 100644 --- a/umbra/__init__.py +++ b/brozzler/__init__.py @@ -1,7 +1,5 @@ -from umbra.browser import Browser -from umbra.controller import AmqpBrowserController -from umbra.url import CrawlUrl -Umbra = AmqpBrowserController +from brozzler.browser import Browser +from brozzler.url import CrawlUrl def _read_version(): import os diff --git a/umbra/behaviors.d/default.js b/brozzler/behaviors.d/default.js similarity index 100% rename from umbra/behaviors.d/default.js rename to brozzler/behaviors.d/default.js diff --git a/umbra/behaviors.d/facebook.js b/brozzler/behaviors.d/facebook.js similarity index 100% rename from umbra/behaviors.d/facebook.js rename to brozzler/behaviors.d/facebook.js diff --git a/umbra/behaviors.d/flickr.js b/brozzler/behaviors.d/flickr.js similarity index 100% rename from umbra/behaviors.d/flickr.js rename to brozzler/behaviors.d/flickr.js diff --git a/umbra/behaviors.d/instagram.js b/brozzler/behaviors.d/instagram.js similarity index 100% rename from umbra/behaviors.d/instagram.js rename to brozzler/behaviors.d/instagram.js diff --git a/umbra/behaviors.d/marquette_edu.js b/brozzler/behaviors.d/marquette_edu.js similarity index 100% rename from umbra/behaviors.d/marquette_edu.js rename to brozzler/behaviors.d/marquette_edu.js diff --git a/umbra/behaviors.d/simpleclicks.js.in b/brozzler/behaviors.d/simpleclicks.js.in similarity index 100% rename from umbra/behaviors.d/simpleclicks.js.in rename to brozzler/behaviors.d/simpleclicks.js.in diff --git a/umbra/behaviors.d/vimeo.js b/brozzler/behaviors.d/vimeo.js similarity index 100% rename from umbra/behaviors.d/vimeo.js rename to brozzler/behaviors.d/vimeo.js diff --git a/umbra/behaviors.py b/brozzler/behaviors.py similarity index 100% rename from umbra/behaviors.py rename to brozzler/behaviors.py diff --git a/umbra/behaviors.yaml b/brozzler/behaviors.yaml similarity index 100% rename from umbra/behaviors.yaml rename to brozzler/behaviors.yaml diff --git a/umbra/browser.py b/brozzler/browser.py similarity index 99% rename from umbra/browser.py rename to brozzler/browser.py index 601387a..72973c6 100644 --- a/umbra/browser.py +++ b/brozzler/browser.py @@ -15,7 +15,7 @@ import os import socket import base64 import random -from umbra.behaviors import Behavior +from brozzler.behaviors import Behavior class BrowserPool: logger = logging.getLogger(__module__ + "." + __qualname__) diff --git a/umbra/hq.py b/brozzler/hq.py similarity index 100% rename from umbra/hq.py rename to brozzler/hq.py diff --git a/umbra/url.py b/brozzler/url.py similarity index 100% rename from umbra/url.py rename to brozzler/url.py diff --git a/requirements.txt b/requirements.txt index 836e545..065a8c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,5 @@ kombu websocket-client-py3==0.13.1 argparse PyYAML -sortedcontainers git+https://github.com/ikreymer/surt.git@py3 youtube_dl diff --git a/setup.py b/setup.py index 8af8679..abface2 100644 --- a/setup.py +++ b/setup.py @@ -18,27 +18,27 @@ def full_version_bytes(): return VERSION_BYTES version_bytes = full_version_bytes() -with open('umbra/version.txt', 'wb') as out: +with open('brozzler/version.txt', 'wb') as out: out.write(version_bytes) out.write(b'\n'); -setuptools.setup(name='umbra', +setuptools.setup(name='brozzler', version=version_bytes.decode('utf-8'), - description='Browser automation via chrome debug protocol', - url='https://github.com/internetarchive/umbra', - author='Eldon Stegall', - author_email='eldon@archive.org', + description='Distributed web crawling with browsers', + url='https://github.com/nlevitt/brozzler', + author='Noah Levitt', + author_email='nlevitt@archive.org', long_description=open('README.md').read(), license='Apache License 2.0', - packages=['umbra'], - package_data={'umbra':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']}, + packages=['brozzler'], + package_data={'brozzler':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']}, scripts=glob.glob('bin/*'), zip_safe=False, classifiers=[ - 'Development Status :: 5 - Production/Stable', + 'Development Status :: 3 - Alpha', 'Environment :: Console', 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', + 'Topic :: Internet :: WWW/HTTP', 'Topic :: System :: Archiving', ]) diff --git a/umbra/controller.py b/umbra/controller.py deleted file mode 100644 index b37a588..0000000 --- a/umbra/controller.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import logging -import time -import threading -import kombu -import socket -from umbra.browser import BrowserPool, BrowsingException - -class AmqpBrowserController: - """ - Consumes amqp messages representing requests to browse urls, from the - specified amqp queue (default: "urls") on the specified amqp exchange - (default: "umbra"). Incoming amqp message is a json object with 3 - attributes: - - { - "clientId": "umbra.client.123", - "url": "http://example.com/my_fancy_page", - "metadata": {"arbitrary":"fields", "etc":4} - } - - "url" is the url to browse. - - "clientId" uniquely identifies the client of umbra. Umbra uses the clientId - as the amqp routing key, to direct information via amqp back to the client. - It sends this information on the same specified amqp exchange (default: - "umbra"). - - Each url requested in the browser is published to amqp this way. The - outgoing amqp message is a json object: - - { - "url": "http://example.com/images/embedded_thing.jpg", - "method": "GET", - "headers": {"User-Agent": "...", "Accept": "...", ...}, - "parentUrl": "http://example.com/my_fancy_page", - "parentUrlMetadata": {"arbitrary":"fields", "etc":4, ...} - } - - POST requests have an additional field, postData. - """ - - logger = logging.getLogger(__module__ + "." + __qualname__) - - def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', - chrome_exe='chromium-browser', max_active_browsers=1, - queue_name='urls', exchange_name='umbra'): - self.amqp_url = amqp_url - self.queue_name = queue_name - self.exchange_name = exchange_name - self.max_active_browsers = max_active_browsers - - self._browser_pool = BrowserPool(size=max_active_browsers, chrome_exe=chrome_exe) - - def start(self): - self._browsing_threads = set() - self._browsing_threads_lock = threading.Lock() - - self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', - durable=True) - - self._reconnect_requested = False - - self._producer = None - self._producer_lock = threading.Lock() - with self._producer_lock: - self._producer_conn = kombu.Connection(self.amqp_url) - self._producer = self._producer_conn.Producer(serializer='json') - - self._consumer_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') - self._consumer_stop = threading.Event() - self._consumer_thread.start() - - def shutdown(self): - self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self._consumer_stop.set() - self._consumer_thread.join() - - def shutdown_now(self): - self._consumer_stop.set() - self._browser_pool.shutdown_now() - self._consumer_thread.join() - - def reconnect(self, *args, **kwargs): - self._reconnect_requested = True - self._browser_pool.shutdown_now() - - def _wait_for_and_browse_urls(self, conn, consumer, timeout): - start = time.time() - browser = None - consumer.qos(prefetch_count=self.max_active_browsers) - - while not self._consumer_stop.is_set() and time.time() - start < timeout and not self._reconnect_requested: - try: - browser = self._browser_pool.acquire() # raises KeyError if none available - browser.start() - - def callback(body, message): - try: - client_id, url, metadata = body['clientId'], body['url'], body['metadata'] - except: - self.logger.error("unable to decipher message {}".format(message), exc_info=True) - self.logger.error("discarding bad message") - message.reject() - browser.stop() - self._browser_pool.release(browser) - return - self._start_browsing_page(browser, message, client_id, url, metadata) - - consumer.callbacks = [callback] - - while True: - try: - conn.drain_events(timeout=0.5) - break # out of "while True" to acquire another browser - except socket.timeout: - pass - except socket.error: - self.logger.error("problem consuming messages from AMQP, will try reconnecting after active browsing finishes", exc_info=True) - self._reconnect_requested = True - - if self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested: - browser.stop() - self._browser_pool.release(browser) - break - - except KeyError: - # no browsers available - time.sleep(0.5) - except: - self.logger.critical("problem with browser initialization", exc_info=True) - time.sleep(0.5) - finally: - consumer.callbacks = None - - def _wait_for_active_browsers(self): - self.logger.info("waiting for browsing threads to finish") - while True: - with self._browsing_threads_lock: - if len(self._browsing_threads) == 0: - break - time.sleep(0.5) - self.logger.info("active browsing threads finished") - - def _consume_amqp(self): - # XXX https://webarchive.jira.com/browse/ARI-3811 - # After running for some amount of time (3 weeks in the latest case), - # consumer looks normal but doesn't consume any messages. Not clear if - # it's hanging in drain_events() or not. As a temporary measure for - # mitigation (if it works) or debugging (if it doesn't work), close and - # reopen the connection every 2.5 hours - RECONNECT_AFTER_SECONDS = 150 * 60 - - url_queue = kombu.Queue(self.queue_name, exchange=self._exchange) - - while not self._consumer_stop.is_set(): - try: - self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) - self._reconnect_requested = False - with kombu.Connection(self.amqp_url) as conn: - with conn.Consumer(url_queue) as consumer: - self._wait_for_and_browse_urls(conn, consumer, timeout=RECONNECT_AFTER_SECONDS) - - # need to wait for browsers to finish here, before closing - # the amqp connection, because they use it to do - # message.ack() after they finish browsing a page - self._wait_for_active_browsers() - except BaseException as e: - self.logger.error("caught exception {}".format(e), exc_info=True) - time.sleep(0.5) - self.logger.error("attempting to reopen amqp connection") - - def _start_browsing_page(self, browser, message, client_id, url, parent_url_metadata): - def on_request(chrome_msg): - payload = chrome_msg['params']['request'] - payload['parentUrl'] = url - payload['parentUrlMetadata'] = parent_url_metadata - self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload)) - with self._producer_lock: - publish = self._producer_conn.ensure(self._producer, self._producer.publish) - publish(payload, exchange=self._exchange, routing_key=client_id) - - def browse_page_sync(): - self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) - try: - browser.browse_page(url, on_request=on_request) - message.ack() - except BrowsingException as e: - self.logger.warn("browsing did not complete normally, requeuing url {} - {}".format(url, e)) - message.requeue() - except: - self.logger.critical("problem browsing page, requeuing url {}, may have lost browser process".format(url), exc_info=True) - message.requeue() - finally: - browser.stop() - self._browser_pool.release(browser) - - def browse_thread_run_then_cleanup(): - browse_page_sync() - with self._browsing_threads_lock: - self._browsing_threads.remove(threading.current_thread()) - - import random - thread_name = "BrowsingThread{}-{}".format(browser.chrome_port, - ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) - th = threading.Thread(target=browse_thread_run_then_cleanup, name=thread_name) - with self._browsing_threads_lock: - self._browsing_threads.add(th) - th.start() -