diff --git a/.gitignore b/.gitignore index 2daac0a..743b95b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,4 @@ *.pyc -/bin/ -/include/ -/lib/ -/local/ -/share/ -/build/ +*.diff +.*.sw* /umbra.egg-info/ diff --git a/README.md b/README.md index 58db62c..239ed64 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,38 @@ umbra ===== +Umbra is a browser automation tool, developed for the web archiving service +https://archive-it.org/. -Browser automation via chrome debug protocol +Umbra receives urls via AMQP. It opens them in the chrome or chromium browser, +with which it communicates using the chrome remote debug protocol (see +https://developer.chrome.com/devtools/docs/debugger-protocol). It runs +javascript behaviors to simulate user interaction with the page. It publishes +information about the the urls requested by the browser back to AMQP. The +format of the incoming and outgoing AMQP messages is described in `pydoc +umbra.controller`. + +Umbra can be used with the Heritrix web crawler, using these heritrix modules: +* [AMQPUrlReceiver](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java) +* [AMQPPublishProcessor](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java) Install -====== -Install via pip from this repo. +------ +Install via pip from this repo, e.g. + + pip install git+https://github.com/internetarchive/umbra.git + +Umbra requires an AMQP messaging service like RabbitMQ. On Ubuntu, +`sudo apt-get install rabbitmq-server` will install and start RabbitMQ +at amqp://guest:guest@localhost:5672/%2f, which the default AMQP url for umbra. Run -===== -"umbra" script should be in bin/. -load_url.py takes urls as arguments and puts them onto a rabbitmq queue -dump_queue.py prints resources discovered by the browser and sent over the return queue. +--- +The command `umbra` will start umbra with default configuration. `umbra --help` +describes all command line options. + +Umbra also comes with these utilities: +* browse-url - open urls in chrome/chromium and run behaviors (without involving AMQP) +* queue-url - send url to umbra via AMQP +* drain-queue - consume messages from AMQP queue -On ubuntu, rabbitmq install with `sudo apt-get install rabbitmq-server` should automatically -be set up for these three scripts to function on localhost ( the default amqp url ). diff --git a/bin/browse-url b/bin/browse-url new file mode 100755 index 0000000..9fd6311 --- /dev/null +++ b/bin/browse-url @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import argparse +import os +import sys +import logging +from umbra.browser import Browser + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='browse-url - open urls in chrome/chromium and run behaviors', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', + help='seconds to wait for browser initialization') +arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', + help='executable to use to invoke chrome') +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +browser = Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) +for url in args.urls: + browser.browse_page(url) + diff --git a/bin/drain-queue b/bin/drain-queue new file mode 100755 index 0000000..f198d15 --- /dev/null +++ b/bin/drain-queue @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# vim: set sw=4 et: +import os +import sys +import argparse +import logging +import socket +from kombu import Connection, Exchange, Queue + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='drain-queue - consume messages from AMQP queue', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the AMQP server to talk to') +arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') +arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', + help='AMQP queue name') +arg_parser.add_argument('-n', '--no-ack', dest='no_ack', action="store_const", + default=False, const=True, help="leave messages on the queue (default: remove them from the queue)") +arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store_const", + default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)") +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stderr, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +def print_and_maybe_ack(body, message): + print(body) + if not args.no_ack: + message.ack() + +exchange = Exchange(args.amqp_exchange, 'direct', durable=True) +queue = Queue(args.amqp_queue, exchange=exchange) +try: + with Connection(args.amqp_url) as conn: + with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer: + consumer.qos(prefetch_count=1) + while True: + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + if not args.run_forever: + logging.debug("exiting, no messages left on the queue") + break +except KeyboardInterrupt: + logging.debug("exiting, stopped by user") diff --git a/bin/dump_queue.py b/bin/dump_queue.py deleted file mode 100755 index 9e31619..0000000 --- a/bin/dump_queue.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python -from json import dumps, loads -import os,sys,argparse, urllib.request, urllib.error, urllib.parse -import websocket -import time -import uuid -import logging -import threading -from kombu import Connection, Exchange, Queue -logging.basicConfig(level=logging.INFO) - -umbra_exchange = Exchange('umbra', 'direct', durable=True) -requests_queue = Queue('requests', exchange=umbra_exchange) -def print_and_ack(body, message): - print(body['url']) - message.ack() - -with Connection(sys.argv[1] if len(sys.argv) > 1 else "amqp://guest:guest@localhost:5672//") as conn: - with conn.Consumer(requests_queue, callbacks=[print_and_ack]) as consumer: - while True: - conn.drain_events() diff --git a/bin/load_url.py b/bin/load_url.py deleted file mode 100755 index ac87cef..0000000 --- a/bin/load_url.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: -from json import dumps, loads -import os,sys,argparse, urllib.request, urllib.error, urllib.parse -import websocket -import time -import uuid -import logging -import threading -from kombu import Connection, Exchange, Queue - -logging.basicConfig(stream=sys.stdout, level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='load_url.py - send url to umbra via amqp', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', - help='URL identifying the amqp server to talk to') -arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0', - help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') -arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') -args = arg_parser.parse_args(args=sys.argv[1:]) - -umbra_exchange = Exchange('umbra', 'direct', durable=True) -with Connection(args.amqp_url) as conn: - producer = conn.Producer(serializer='json') - for url in args.urls: - producer.publish({'url': url, 'metadata': {}, 'clientId': args.client_id}, 'url', exchange=umbra_exchange) - diff --git a/bin/queue-url b/bin/queue-url new file mode 100755 index 0000000..272c2e7 --- /dev/null +++ b/bin/queue-url @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import os +import sys +import argparse +import logging +from kombu import Connection, Exchange, Queue + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='queue-url - send url to umbra via AMQP', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the AMQP server to talk to') +arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') +arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', + help='AMQP routing key') +arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0', + help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +exchange = Exchange(args.amqp_exchange, 'direct', durable=True) +with Connection(args.amqp_url) as conn: + producer = conn.Producer(serializer='json') + for url in args.urls: + payload = {'url': url, 'metadata': {}, 'clientId': args.client_id} + logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload)) + producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange) + diff --git a/bin/umbra b/bin/umbra index a77c17b..167f200 100755 --- a/bin/umbra +++ b/bin/umbra @@ -1,6 +1,78 @@ #!/usr/bin/env python - -from umbra import umbra +# vim: set sw=4 et: + +import logging +import argparse +import time +import umbra +import sys +import signal +import os +import umbra +import signal +import threading if __name__=="__main__": - umbra.main() + import faulthandler + faulthandler.register(signal.SIGQUIT) + + arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='umbra - browser automation tool communicating via AMQP', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', + help='Seconds to wait for browser initialization') + arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', + help='Executable to use to invoke chrome') + arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the amqp server to talk to') + arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') + arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', + help='AMQP queue to consume urls from') + arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', + help='AMQP routing key to bind to the AMQP queue') + arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='3', + help='Max number of chrome instances simultaneously browsing pages') + arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) + args = arg_parser.parse_args(args=sys.argv[1:]) + + logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + + umbra = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait, + max_active_browsers=int(args.max_browsers), + exchange_name=args.amqp_exchange, queue_name=args.amqp_queue, + routing_key=args.amqp_routing_key) + + class ShutdownRequested(Exception): + pass + + def sigterm(signum, frame): + raise ShutdownRequested('shutdown requested (caught SIGTERM)') + def sigint(signum, frame): + raise ShutdownRequested('shutdown requested (caught SIGINT)') + + signal.signal(signal.SIGTERM, sigterm) + signal.signal(signal.SIGINT, sigint) + + umbra.start() + + try: + while True: + time.sleep(0.5) + except ShutdownRequested as e: + logging.info(e) + except BaseException as e: + logging.fatal(e) + finally: + try: + umbra.shutdown() + for th in threading.enumerate(): + if th != threading.current_thread(): + th.join() + except BaseException as e: + logging.warn("caught {}".format(e)) + umbra.shutdown_now() + + diff --git a/setup.py b/setup.py index b95cdbc..ae5c4ab 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,11 @@ -import setuptools +# vim: set sw=4 et: + +import setuptools +import glob setuptools.setup(name='umbra', version='0.1', - description='Google Chrome remote control interface', + description='Browser automation via chrome debug protocol', url='https://github.com/internetarchive/umbra', author='Eldon Stegall', author_email='eldon@archive.org', @@ -10,8 +13,8 @@ setuptools.setup(name='umbra', license='Apache License 2.0', packages=['umbra'], package_data={'umbra':['behaviors.d/*.js']}, - install_requires=['kombu', 'websocket-client-py3','argparse'], - scripts=['bin/umbra', 'bin/load_url.py', 'bin/dump_queue.py'], + install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'], + scripts=glob.glob("bin/*"), zip_safe=False, classifiers=[ 'Development Status :: 3 - Alpha Development Status', diff --git a/umbra/__init__.py b/umbra/__init__.py index e69de29..b8964b2 100644 --- a/umbra/__init__.py +++ b/umbra/__init__.py @@ -0,0 +1,3 @@ +from umbra.browser import Browser +from umbra.controller import AmqpBrowserController +Umbra = AmqpBrowserController diff --git a/umbra/umbra.py b/umbra/browser.py old mode 100755 new mode 100644 similarity index 50% rename from umbra/umbra.py rename to umbra/browser.py index e197921..3ab63bb --- a/umbra/umbra.py +++ b/umbra/browser.py @@ -2,58 +2,105 @@ # vim: set sw=4 et: import logging -import sys - -# logging.basicConfig(stream=sys.stdout, level=logging.INFO, -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -import os -import argparse import json -import urllib.request, urllib.error, urllib.parse +import urllib.request import itertools import websocket import time -import uuid import threading import subprocess import signal -import kombu import tempfile +import os +import socket from umbra.behaviors import Behavior -class UmbraWorker: +class BrowserPool: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60): + self._available = set() + self._in_use = set() + + for i in range(0, size): + port_holder = self._grab_random_port() + browser = Browser(port_holder.getsockname()[1], chrome_exe, chrome_wait) + self._available.add((browser, port_holder)) + + self._lock = threading.Lock() + + self.logger.info("browser ports: {}".format([browser.chrome_port for (browser, port_holder) in self._available])) + + def _grab_random_port(self): + """Returns socket bound to some port.""" + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + return sock + + def _hold_port(self, port): + """Returns socket bound to supplied port.""" + sock = socket.socket() + sock.bind(('127.0.0.1', port)) + return sock + + def acquire(self): + """Returns browser from pool if available, raises KeyError otherwise.""" + with self._lock: + (browser, port_holder) = self._available.pop() + port_holder.close() + self._in_use.add(browser) + return browser + + def release(self, browser): + with self._lock: + port_holder = self._hold_port(browser.chrome_port) + self._available.add((browser, port_holder)) + self._in_use.remove(browser) + + def shutdown_now(self): + for browser in self._in_use: + browser.shutdown_now() + + +class Browser: """Runs chrome/chromium to synchronously browse one page at a time using worker.browse_page(). Currently the implementation starts up a new instance of chrome for each page browsed, always on the same debug port. (In the future, it may keep the browser running indefinitely.)""" - logger = logging.getLogger('umbra.UmbraWorker') + + logger = logging.getLogger(__module__ + "." + __qualname__) HARD_TIMEOUT_SECONDS = 20 * 60 - def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'): + def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60): self.command_id = itertools.count(1) - self.lock = threading.Lock() - self.umbra = umbra + self._lock = threading.Lock() self.chrome_port = chrome_port self.chrome_exe = chrome_exe self.chrome_wait = chrome_wait - self.client_id = client_id self._behavior = None self.websock = None + self._shutdown_now = False - def browse_page(self, url, url_metadata): - """Synchronously browse a page and run behaviors.""" - with self.lock: + def shutdown_now(self): + self._shutdown_now = True + + def browse_page(self, url, on_request=None): + """Synchronously browses a page and runs behaviors. First blocks to + acquire lock to ensure we only browse one page at a time.""" + with self._lock: self.url = url - self.url_metadata = url_metadata + self.on_request = on_request with tempfile.TemporaryDirectory() as user_data_dir: with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: self.websock = websocket.WebSocketApp(websocket_url, on_open=self._visit_page, on_message=self._handle_message) - websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5}) + + import random + threadName = "WebsockThread{}-{}".format(self.chrome_port, + ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) + websock_thread = threading.Thread(target=self.websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) websock_thread.start() start = time.time() @@ -62,12 +109,15 @@ class UmbraWorker: if not self.websock or not self.websock.sock or not self.websock.sock.connected: self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) break - elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS: - self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url)) + elif time.time() - start > Browser.HARD_TIMEOUT_SECONDS: + self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url)) break elif self._behavior != None and self._behavior.is_finished(): self.logger.info("finished browsing page according to behavior url={}".format(self.url)) break + elif self._shutdown_now: + self.logger.warn("immediate shutdown requested") + break try: self.websock.close() @@ -98,18 +148,6 @@ class UmbraWorker: # navigate to the page! self.send_to_chrome(method="Page.navigate", params={"url": self.url}) - # XXX should this class know anything about amqp? or should it - # delegate this back up to the Umbra class? - def _send_request_to_amqp(self, chrome_msg): - payload = chrome_msg['params']['request'] - payload['parentUrl'] = self.url - payload['parentUrlMetadata'] = self.url_metadata - self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange.name, self.client_id, payload)) - with self.umbra.producer_lock: - self.umbra.producer.publish(payload, - exchange=self.umbra.umbra_exchange, - routing_key=self.client_id) - def _handle_message(self, websock, message): # self.logger.debug("message from {} - {}".format(websock.url, message[:95])) # self.logger.debug("message from {} - {}".format(websock.url, message)) @@ -117,10 +155,10 @@ class UmbraWorker: if "method" in message and message["method"] == "Network.requestWillBeSent": if self._behavior: self._behavior.notify_of_activity() - if not message["params"]["request"]["url"].lower().startswith("data:"): - self._send_request_to_amqp(message) - else: + if message["params"]["request"]["url"].lower().startswith("data:"): self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) + elif self.on_request: + self.on_request(message) elif "method" in message and message["method"] == "Page.loadEventFired": if self._behavior is None: self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) @@ -155,105 +193,8 @@ class UmbraWorker: # self.logger.debug("[no-method] {}".format(message)) -class Umbra: - """Consumes amqp messages representing requests to browse urls, from the - amqp queue "urls" on exchange "umbra". Incoming amqp message is a json - object with 3 attributes: - { - "clientId": "umbra.client.123", - "url": "http://example.com/my_fancy_page", - "metadata": {"arbitrary":"fields", "etc":4} - } - - "url" is the url to browse. - - "clientId" uniquely idenfities the client of - umbra. Umbra uses the clientId to direct information via amqp back to the - client. It sends this information on that same "umbra" exchange, and uses - the clientId as the amqp routing key. - - Each url requested in the browser is published to amqp this way. The - outgoing amqp message is a json object: - - { - 'url': 'http://example.com/images/embedded_thing.jpg', - 'method': 'GET', - 'headers': {'User-Agent': '...', 'Accept': '...'} - 'parentUrl': 'http://example.com/my_fancy_page', - 'parentUrlMetadata': {"arbitrary":"fields", "etc":4}, - } - - POST requests have an additional field, postData. - """ - - logger = logging.getLogger('umbra.Umbra') - - def __init__(self, amqp_url, chrome_exe, browser_wait): - self.amqp_url = amqp_url - self.chrome_exe = chrome_exe - self.browser_wait = browser_wait - self.producer = None - self.producer_lock = None - self.workers = {} - self.workers_lock = threading.Lock() - self.amqp_thread = threading.Thread(target=self._consume_amqp) - self.amqp_stop = threading.Event() - self.amqp_thread.start() - - def shutdown(self): - self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self.amqp_stop.set() - self.amqp_thread.join() - - def _consume_amqp(self): - while not self.amqp_stop.is_set(): - try: - self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True) - url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange) - self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) - with kombu.Connection(self.amqp_url) as conn: - if self.producer_lock is None: - self.producer_lock = threading.Lock() - with self.producer_lock: - self.producer = conn.Producer(serializer='json') - with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: - import socket - while not self.amqp_stop.is_set(): - try: - conn.drain_events(timeout=0.5) - except socket.timeout: - pass - except BaseException as e: - self.logger.error("amqp exception {}".format(e)) - self.logger.error("attempting to reopen amqp connection") - - def _browse_page_requested(self, body, message): - """First waits for the UmbraWorker for the client body['clientId'] to - become available, or creates a new worker if this clientId has not been - served before. Starts a worker browsing the page asynchronously, then - acknowledges the amqp message, which lets the server know it can be - removed from the queue.""" - client_id = body['clientId'] - with self.workers_lock: - if not client_id in self.workers: - port = 9222 + len(self.workers) - t = UmbraWorker(umbra=self, chrome_port=port, - chrome_exe=self.chrome_exe, - chrome_wait=self.browser_wait, - client_id=client_id) - self.workers[client_id] = t - - def browse_page_async(): - self.logger.info('client_id={} body={}'.format(client_id, body)) - self.workers[client_id].browse_page(body['url'], body['metadata']) - - threading.Thread(target=browse_page_async).start() - - message.ack() - - class Chrome: - logger = logging.getLogger('umbra.Chrome') + logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, port, executable, browser_wait, user_data_dir): self.port = port @@ -261,19 +202,14 @@ class Chrome: self.browser_wait = browser_wait self.user_data_dir = user_data_dir - def fetch_debugging_json(): - raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() - json = raw_json.decode('utf-8') - return json.loads(json) - # returns websocket url to chrome window with about:blank loaded def __enter__(self): chrome_args = [self.executable, "--user-data-dir={}".format(self.user_data_dir), - "--remote-debugging-port=%s" % self.port, + "--remote-debugging-port={}".format(self.port), "--disable-web-sockets", "--disable-cache", - "--window-size=1100,900", "--enable-logging", - "--no-default-browser-check", "--disable-first-run-ui", "--no-first-run", + "--window-size=1100,900", "--no-default-browser-check", + "--disable-first-run-ui", "--no-first-run", "--homepage=about:blank", "about:blank"] self.logger.info("running {}".format(chrome_args)) self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True) @@ -306,31 +242,3 @@ class Chrome: os.killpg(self.chrome_process.pid, signal.SIGINT) self.chrome_process.wait() -def main(): - import faulthandler - faulthandler.register(signal.SIGQUIT) - - arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='umbra - Browser automation tool', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='10', - help='Seconds to wait for browser initialization') - arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', - help='Executable to use to invoke chrome') - arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') - args = arg_parser.parse_args(args=sys.argv[1:]) - - umbra = Umbra(args.amqp_url, args.executable, args.browser_wait) - try: - while True: - time.sleep(0.5) - except: - pass - finally: - umbra.shutdown() - - -if __name__ == "__main__": - main() - diff --git a/umbra/controller.py b/umbra/controller.py new file mode 100644 index 0000000..198d8bb --- /dev/null +++ b/umbra/controller.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import logging +import time +import threading +import kombu +from umbra.browser import BrowserPool + +class AmqpBrowserController: + """ + Consumes amqp messages representing requests to browse urls, from the + specified amqp queue (default: "urls") on the specified amqp exchange + (default: "umbra"). Incoming amqp message is a json object with 3 + attributes: + + { + "clientId": "umbra.client.123", + "url": "http://example.com/my_fancy_page", + "metadata": {"arbitrary":"fields", "etc":4} + } + + "url" is the url to browse. + + "clientId" uniquely identifies the client of umbra. Umbra uses the clientId + as the amqp routing key, to direct information via amqp back to the client. + It sends this information on the same specified amqp exchange (default: + "umbra"). + + Each url requested in the browser is published to amqp this way. The + outgoing amqp message is a json object: + + { + "url": "http://example.com/images/embedded_thing.jpg", + "method": "GET", + "headers": {"User-Agent": "...", "Accept": "...", ...}, + "parentUrl": "http://example.com/my_fancy_page", + "parentUrlMetadata": {"arbitrary":"fields", "etc":4, ...} + } + + POST requests have an additional field, postData. + """ + + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', + chrome_exe='chromium-browser', browser_wait=60, + max_active_browsers=1, queue_name='urls', routing_key='url', + exchange_name='umbra'): + self.amqp_url = amqp_url + self.queue_name = queue_name + self.routing_key = routing_key + self.exchange_name = exchange_name + + self._browser_pool = BrowserPool(size=max_active_browsers, + chrome_exe=chrome_exe, chrome_wait=browser_wait) + + def start(self): + self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', + durable=True) + + self._producer = None + self._producer_lock = threading.Lock() + with self._producer_lock: + self._producer_conn = kombu.Connection(self.amqp_url) + self._producer = self._producer_conn.Producer(serializer='json') + + self._amqp_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') + self._amqp_stop = threading.Event() + self._amqp_thread.start() + + def shutdown(self): + self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) + self._amqp_stop.set() + self._amqp_thread.join() + # with self._producer_lock: + # self._producer_conn.close() + # self._producer_conn = None + + def shutdown_now(self): + self._browser_pool.shutdown_now() + + def _consume_amqp(self): + # XXX https://webarchive.jira.com/browse/ARI-3811 + # After running for some amount of time (3 weeks in the latest case), + # consumer looks normal but doesn't consume any messages. Not clear if + # it's hanging in drain_events() or not. As a temporary measure for + # mitigation (if it works) or debugging (if it doesn't work), close and + # reopen the connection every 15 minutes + RECONNECT_AFTER_SECONDS = 15 * 60 + + url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, + exchange=self._exchange) + + while not self._amqp_stop.is_set(): + try: + self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) + with kombu.Connection(self.amqp_url) as conn: + conn_opened = time.time() + with conn.Consumer(url_queue) as consumer: + consumer.qos(prefetch_count=1) + while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): + import socket + try: + browser = self._browser_pool.acquire() # raises KeyError if none available + consumer.callbacks = [self._make_callback(browser)] + conn.drain_events(timeout=0.5) + consumer.callbacks = None + except KeyError: + # no browsers available + time.sleep(0.5) + except socket.timeout: + # no urls in the queue + self._browser_pool.release(browser) + + except BaseException as e: + self.logger.error("amqp exception {}".format(e)) + time.sleep(0.5) + self.logger.error("attempting to reopen amqp connection") + + def _make_callback(self, browser): + def callback(body, message): + self._browse_page(browser, body['clientId'], body['url'], body['metadata']) + message.ack() + return callback + + def _browse_page(self, browser, client_id, url, parent_url_metadata): + def on_request(chrome_msg): + payload = chrome_msg['params']['request'] + payload['parentUrl'] = url + payload['parentUrlMetadata'] = parent_url_metadata + self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload)) + with self._producer_lock: + publish = self._producer_conn.ensure(self._producer, self._producer.publish) + publish(payload, exchange=self._exchange, routing_key=client_id) + + def browse_page_async(): + self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) + try: + browser.browse_page(url, on_request=on_request) + finally: + self._browser_pool.release(browser) + + import random + threadName = "BrowsingThread{}-{}".format(browser.chrome_port, + ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) + threading.Thread(target=browse_page_async, name=threadName).start() +