From cc0ffee5088542cc8781e609d1bec2943e4860fe Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 00:57:07 -0700 Subject: [PATCH 01/19] only websocket-client-py3==0.13.1 works right with python3 at the moment, see https://github.com/liris/websocket-client/issues/84 --- setup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index b95cdbc..4dd994f 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,10 @@ +# vim: set sw=4 et: + import setuptools setuptools.setup(name='umbra', version='0.1', - description='Google Chrome remote control interface', + description='Browser automation via chrome debug protocol', url='https://github.com/internetarchive/umbra', author='Eldon Stegall', author_email='eldon@archive.org', @@ -10,7 +12,7 @@ setuptools.setup(name='umbra', license='Apache License 2.0', packages=['umbra'], package_data={'umbra':['behaviors.d/*.js']}, - install_requires=['kombu', 'websocket-client-py3','argparse'], + install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'], scripts=['bin/umbra', 'bin/load_url.py', 'bin/dump_queue.py'], zip_safe=False, classifiers=[ From 6fdcdd0bf0bc8d0986553c0369952e8008790b26 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 01:09:11 -0700 Subject: [PATCH 02/19] configurable max number of instances of chrome simultaneously browsing pages (default=3); close and reopen connection to amqp every 15 minutes (consumer only); increase default browser wait to 60 sec --- umbra/umbra.py | 68 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/umbra/umbra.py b/umbra/umbra.py index e197921..cd767ff 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -32,7 +32,7 @@ class UmbraWorker: HARD_TIMEOUT_SECONDS = 20 * 60 - def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'): + def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60, client_id='request'): self.command_id = itertools.count(1) self.lock = threading.Lock() self.umbra = umbra @@ -44,7 +44,8 @@ class UmbraWorker: self.websock = None def browse_page(self, url, url_metadata): - """Synchronously browse a page and run behaviors.""" + """Synchronously browses a page and runs behaviors. First blocks to + acquire lock to ensure self only browses one page at a time.""" with self.lock: self.url = url self.url_metadata = url_metadata @@ -106,8 +107,9 @@ class UmbraWorker: payload['parentUrlMetadata'] = self.url_metadata self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange.name, self.client_id, payload)) with self.umbra.producer_lock: - self.umbra.producer.publish(payload, - exchange=self.umbra.umbra_exchange, + publish = self.umbra.producer_conn.ensure(self.umbra.producer, + self.umbra.producer.publish) + publish(payload, exchange=self.umbra.umbra_exchange, routing_key=self.client_id) def _handle_message(self, websock, message): @@ -167,7 +169,7 @@ class Umbra: "url" is the url to browse. - "clientId" uniquely idenfities the client of + "clientId" uniquely identifies the client of umbra. Umbra uses the clientId to direct information via amqp back to the client. It sends this information on that same "umbra" exchange, and uses the clientId as the amqp routing key. @@ -188,14 +190,22 @@ class Umbra: logger = logging.getLogger('umbra.Umbra') - def __init__(self, amqp_url, chrome_exe, browser_wait): + def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', chrome_exe='chromium-browser', browser_wait=60, max_active_workers=1): self.amqp_url = amqp_url self.chrome_exe = chrome_exe self.browser_wait = browser_wait + self.max_active_workers = max_active_workers + self.producer = None - self.producer_lock = None + + self.producer_lock = threading.Lock() + with self.producer_lock: + self.producer_conn = kombu.Connection(self.amqp_url) + self.producer = self.producer_conn.Producer(serializer='json') + self.workers = {} self.workers_lock = threading.Lock() + self.num_active_workers = 0 self.amqp_thread = threading.Thread(target=self._consume_amqp) self.amqp_stop = threading.Event() self.amqp_thread.start() @@ -206,19 +216,24 @@ class Umbra: self.amqp_thread.join() def _consume_amqp(self): + # XXX https://webarchive.jira.com/browse/ARI-3811 + # After running for some amount of time (3 weeks in the latest case), + # consumer looks normal but doesn't consume any messages. Not clear if + # it's hanging in drain_events() or not. As a temporary measure for + # mitigation (if it works) or debugging (if it doesn't work), close and + # reopen the connection every 15 minutes + RECONNECT_AFTER_MIN = 15 + while not self.amqp_stop.is_set(): try: self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True) url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange) self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) with kombu.Connection(self.amqp_url) as conn: - if self.producer_lock is None: - self.producer_lock = threading.Lock() - with self.producer_lock: - self.producer = conn.Producer(serializer='json') + conn_opened = time.time() with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: import socket - while not self.amqp_stop.is_set(): + while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_MIN * 60): try: conn.drain_events(timeout=0.5) except socket.timeout: @@ -228,11 +243,8 @@ class Umbra: self.logger.error("attempting to reopen amqp connection") def _browse_page_requested(self, body, message): - """First waits for the UmbraWorker for the client body['clientId'] to - become available, or creates a new worker if this clientId has not been - served before. Starts a worker browsing the page asynchronously, then - acknowledges the amqp message, which lets the server know it can be - removed from the queue.""" + """Kombu Consumer callback. Provisions an UmbraWorker and + asynchronously asks it to browse the requested url.""" client_id = body['clientId'] with self.workers_lock: if not client_id in self.workers: @@ -245,8 +257,18 @@ class Umbra: def browse_page_async(): self.logger.info('client_id={} body={}'.format(client_id, body)) + while True: + with self.workers_lock: + if self.num_active_workers < self.max_active_workers: + self.num_active_workers += 1 + break + time.sleep(0.5) + self.workers[client_id].browse_page(body['url'], body['metadata']) + with self.workers_lock: + self.num_active_workers -= 1 + threading.Thread(target=browse_page_async).start() message.ack() @@ -270,10 +292,10 @@ class Chrome: def __enter__(self): chrome_args = [self.executable, "--user-data-dir={}".format(self.user_data_dir), - "--remote-debugging-port=%s" % self.port, + "--remote-debugging-port={}".format(self.port), "--disable-web-sockets", "--disable-cache", - "--window-size=1100,900", "--enable-logging", - "--no-default-browser-check", "--disable-first-run-ui", "--no-first-run", + "--window-size=1100,900", "--no-default-browser-check", + "--disable-first-run-ui", "--no-first-run", "--homepage=about:blank", "about:blank"] self.logger.info("running {}".format(chrome_args)) self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True) @@ -313,15 +335,17 @@ def main(): arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), description='umbra - Browser automation tool', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='10', + arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', help='Seconds to wait for browser initialization') arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', help='Executable to use to invoke chrome') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') + arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3', + help='Max number of chrome instances simultaneously browsing pages') args = arg_parser.parse_args(args=sys.argv[1:]) - umbra = Umbra(args.amqp_url, args.executable, args.browser_wait) + umbra = Umbra(args.amqp_url, args.executable, args.browser_wait, max_active_workers=int(args.max_workers)) try: while True: time.sleep(0.5) From 3e4232f32c8f6b2a2735aaaedf8d0adf08ee1f16 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 02:42:40 -0700 Subject: [PATCH 03/19] refactor umbra.py into controller.py and browser.py, improve class names --- bin/umbra | 43 +++++- setup.py | 2 +- umbra/controller.py | 142 +++++++++++++++++ umbra/umbra.py | 360 -------------------------------------------- 4 files changed, 184 insertions(+), 363 deletions(-) create mode 100644 umbra/controller.py delete mode 100755 umbra/umbra.py diff --git a/bin/umbra b/bin/umbra index a77c17b..2b503c1 100755 --- a/bin/umbra +++ b/bin/umbra @@ -1,6 +1,45 @@ #!/usr/bin/env python +# vim: set sw=4 et: -from umbra import umbra +import logging +import argparse +import time +import umbra +import sys +import signal +import os +import umbra.controller if __name__=="__main__": - umbra.main() + # logging.basicConfig(stream=sys.stdout, level=logging.INFO, + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + + import faulthandler + faulthandler.register(signal.SIGQUIT) + + arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='umbra - Browser automation tool', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', + help='Seconds to wait for browser initialization') + arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', + help='Executable to use to invoke chrome') + arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the amqp server to talk to') + arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3', + help='Max number of chrome instances simultaneously browsing pages') + args = arg_parser.parse_args(args=sys.argv[1:]) + + controller = umbra.controller.AmqpBrowserController(args.amqp_url, + args.executable, args.browser_wait, + max_active_workers=int(args.max_workers)) + try: + while True: + time.sleep(0.5) + except: + pass + finally: + controller.shutdown() + + diff --git a/setup.py b/setup.py index 4dd994f..f0fd904 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ # vim: set sw=4 et: -import setuptools +import setuptools setuptools.setup(name='umbra', version='0.1', diff --git a/umbra/controller.py b/umbra/controller.py new file mode 100644 index 0000000..69aab39 --- /dev/null +++ b/umbra/controller.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import logging +import time +import threading +import kombu +from umbra.browser import Browser + +class AmqpBrowserController: + """Consumes amqp messages representing requests to browse urls, from the + amqp queue "urls" on exchange "umbra". Incoming amqp message is a json + object with 3 attributes: + { + "clientId": "umbra.client.123", + "url": "http://example.com/my_fancy_page", + "metadata": {"arbitrary":"fields", "etc":4} + } + + "url" is the url to browse. + + "clientId" uniquely identifies the client of + umbra. Umbra uses the clientId to direct information via amqp back to the + client. It sends this information on that same "umbra" exchange, and uses + the clientId as the amqp routing key. + + Each url requested in the browser is published to amqp this way. The + outgoing amqp message is a json object: + + { + 'url': 'http://example.com/images/embedded_thing.jpg', + 'method': 'GET', + 'headers': {'User-Agent': '...', 'Accept': '...'} + 'parentUrl': 'http://example.com/my_fancy_page', + 'parentUrlMetadata': {"arbitrary":"fields", "etc":4}, + } + + POST requests have an additional field, postData. + """ + + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', + chrome_exe='chromium-browser', browser_wait=60, + max_active_workers=1, queue_name='urls', routing_key='url', + exchange_name='umbra'): + + self.amqp_url = amqp_url + self.chrome_exe = chrome_exe + self.browser_wait = browser_wait + self.max_active_workers = max_active_workers + self.queue_name = queue_name + self.routing_key = routing_key + self.exchange_name = exchange_name + self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', durable=True) + + self.producer = None + self.producer_lock = threading.Lock() + with self.producer_lock: + self.producer_conn = kombu.Connection(self.amqp_url) + self.producer = self.producer_conn.Producer(serializer='json') + + self.browsers = {} + self.browsers_lock = threading.Lock() + self.num_active_workers = 0 + self.amqp_thread = threading.Thread(target=self._consume_amqp) + self.amqp_stop = threading.Event() + self.amqp_thread.start() + + def shutdown(self): + self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) + self.amqp_stop.set() + self.amqp_thread.join() + + def _consume_amqp(self): + # XXX https://webarchive.jira.com/browse/ARI-3811 + # After running for some amount of time (3 weeks in the latest case), + # consumer looks normal but doesn't consume any messages. Not clear if + # it's hanging in drain_events() or not. As a temporary measure for + # mitigation (if it works) or debugging (if it doesn't work), close and + # reopen the connection every 15 minutes + RECONNECT_AFTER_SECONDS = 15 * 60 + + while not self.amqp_stop.is_set(): + try: + url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, exchange=self._exchange) + self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) + with kombu.Connection(self.amqp_url) as conn: + conn_opened = time.time() + with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: + import socket + while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + pass + except BaseException as e: + self.logger.error("amqp exception {}".format(e)) + self.logger.error("attempting to reopen amqp connection") + + def _browse_page_requested(self, body, message): + """Kombu Consumer callback. Provisions a Browser and + asynchronously asks it to browse the requested url.""" + client_id = body['clientId'] + + def on_request(chrome_msg): + payload = chrome_msg['params']['request'] + payload['parentUrl'] = body['url'] + payload['parentUrlMetadata'] = body['metadata'] + self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload)) + with self.producer_lock: + publish = self.producer_conn.ensure(self.producer, self.producer.publish) + publish(payload, exchange=self._exchange, routing_key=client_id) + + with self.browsers_lock: + if client_id in self.browsers: + browser = self.browsers[client_id] + else: + # XXX should reuse ports + port = 9222 + len(self.browsers) + browser = Browser(chrome_port=port, chrome_exe=self.chrome_exe, + chrome_wait=self.browser_wait, client_id=client_id) + self.browsers[client_id] = browser + + def browse_page_async(): + self.logger.info('client_id={} body={}'.format(client_id, body)) + while True: + with self.browsers_lock: + if self.num_active_workers < self.max_active_workers: + self.num_active_workers += 1 + break + time.sleep(0.5) + + browser.browse_page(body['url'], on_request=on_request) + + with self.browsers_lock: + self.num_active_workers -= 1 + + threading.Thread(target=browse_page_async).start() + + message.ack() + diff --git a/umbra/umbra.py b/umbra/umbra.py deleted file mode 100755 index cd767ff..0000000 --- a/umbra/umbra.py +++ /dev/null @@ -1,360 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import logging -import sys - -# logging.basicConfig(stream=sys.stdout, level=logging.INFO, -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - -import os -import argparse -import json -import urllib.request, urllib.error, urllib.parse -import itertools -import websocket -import time -import uuid -import threading -import subprocess -import signal -import kombu -import tempfile -from umbra.behaviors import Behavior - -class UmbraWorker: - """Runs chrome/chromium to synchronously browse one page at a time using - worker.browse_page(). Currently the implementation starts up a new instance - of chrome for each page browsed, always on the same debug port. (In the - future, it may keep the browser running indefinitely.)""" - logger = logging.getLogger('umbra.UmbraWorker') - - HARD_TIMEOUT_SECONDS = 20 * 60 - - def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60, client_id='request'): - self.command_id = itertools.count(1) - self.lock = threading.Lock() - self.umbra = umbra - self.chrome_port = chrome_port - self.chrome_exe = chrome_exe - self.chrome_wait = chrome_wait - self.client_id = client_id - self._behavior = None - self.websock = None - - def browse_page(self, url, url_metadata): - """Synchronously browses a page and runs behaviors. First blocks to - acquire lock to ensure self only browses one page at a time.""" - with self.lock: - self.url = url - self.url_metadata = url_metadata - with tempfile.TemporaryDirectory() as user_data_dir: - with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: - self.websock = websocket.WebSocketApp(websocket_url, - on_open=self._visit_page, - on_message=self._handle_message) - websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5}) - websock_thread.start() - start = time.time() - - while True: - time.sleep(0.5) - if not self.websock or not self.websock.sock or not self.websock.sock.connected: - self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) - break - elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS: - self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url)) - break - elif self._behavior != None and self._behavior.is_finished(): - self.logger.info("finished browsing page according to behavior url={}".format(self.url)) - break - - try: - self.websock.close() - except BaseException as e: - self.logger.error("exception closing websocket {} - {}".format(self.websock, e)) - - websock_thread.join() - self._behavior = None - - def send_to_chrome(self, **kwargs): - msg_id = next(self.command_id) - kwargs['id'] = msg_id - msg = json.dumps(kwargs) - self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) - self.websock.send(msg) - return msg_id - - def _visit_page(self, websock): - self.send_to_chrome(method="Network.enable") - self.send_to_chrome(method="Page.enable") - self.send_to_chrome(method="Console.enable") - self.send_to_chrome(method="Debugger.enable") - self.send_to_chrome(method="Runtime.enable") - - # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused" - self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}) - - # navigate to the page! - self.send_to_chrome(method="Page.navigate", params={"url": self.url}) - - # XXX should this class know anything about amqp? or should it - # delegate this back up to the Umbra class? - def _send_request_to_amqp(self, chrome_msg): - payload = chrome_msg['params']['request'] - payload['parentUrl'] = self.url - payload['parentUrlMetadata'] = self.url_metadata - self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange.name, self.client_id, payload)) - with self.umbra.producer_lock: - publish = self.umbra.producer_conn.ensure(self.umbra.producer, - self.umbra.producer.publish) - publish(payload, exchange=self.umbra.umbra_exchange, - routing_key=self.client_id) - - def _handle_message(self, websock, message): - # self.logger.debug("message from {} - {}".format(websock.url, message[:95])) - # self.logger.debug("message from {} - {}".format(websock.url, message)) - message = json.loads(message) - if "method" in message and message["method"] == "Network.requestWillBeSent": - if self._behavior: - self._behavior.notify_of_activity() - if not message["params"]["request"]["url"].lower().startswith("data:"): - self._send_request_to_amqp(message) - else: - self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) - elif "method" in message and message["method"] == "Page.loadEventFired": - if self._behavior is None: - self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) - self._behavior = Behavior(self.url, self) - self._behavior.start() - else: - self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message)) - elif "method" in message and message["method"] == "Console.messageAdded": - self.logger.debug("{} console.{} {}".format(websock.url, - message["params"]["message"]["level"], - message["params"]["message"]["text"])) - elif "method" in message and message["method"] == "Debugger.paused": - # We hit the breakpoint set in visit_page. Get rid of google - # analytics script! - - self.logger.debug("debugger paused! message={}".format(message)) - scriptId = message['params']['callFrames'][0]['location']['scriptId'] - - # replace script - self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"}) - - # resume execution - self.send_to_chrome(method="Debugger.resume") - elif "result" in message: - if self._behavior and self._behavior.is_waiting_on_result(message['id']): - self._behavior.notify_of_result(message) - # elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"): - # pass - # elif "method" in message: - # self.logger.debug("{} {}".format(message["method"], message)) - # else: - # self.logger.debug("[no-method] {}".format(message)) - - -class Umbra: - """Consumes amqp messages representing requests to browse urls, from the - amqp queue "urls" on exchange "umbra". Incoming amqp message is a json - object with 3 attributes: - { - "clientId": "umbra.client.123", - "url": "http://example.com/my_fancy_page", - "metadata": {"arbitrary":"fields", "etc":4} - } - - "url" is the url to browse. - - "clientId" uniquely identifies the client of - umbra. Umbra uses the clientId to direct information via amqp back to the - client. It sends this information on that same "umbra" exchange, and uses - the clientId as the amqp routing key. - - Each url requested in the browser is published to amqp this way. The - outgoing amqp message is a json object: - - { - 'url': 'http://example.com/images/embedded_thing.jpg', - 'method': 'GET', - 'headers': {'User-Agent': '...', 'Accept': '...'} - 'parentUrl': 'http://example.com/my_fancy_page', - 'parentUrlMetadata': {"arbitrary":"fields", "etc":4}, - } - - POST requests have an additional field, postData. - """ - - logger = logging.getLogger('umbra.Umbra') - - def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', chrome_exe='chromium-browser', browser_wait=60, max_active_workers=1): - self.amqp_url = amqp_url - self.chrome_exe = chrome_exe - self.browser_wait = browser_wait - self.max_active_workers = max_active_workers - - self.producer = None - - self.producer_lock = threading.Lock() - with self.producer_lock: - self.producer_conn = kombu.Connection(self.amqp_url) - self.producer = self.producer_conn.Producer(serializer='json') - - self.workers = {} - self.workers_lock = threading.Lock() - self.num_active_workers = 0 - self.amqp_thread = threading.Thread(target=self._consume_amqp) - self.amqp_stop = threading.Event() - self.amqp_thread.start() - - def shutdown(self): - self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self.amqp_stop.set() - self.amqp_thread.join() - - def _consume_amqp(self): - # XXX https://webarchive.jira.com/browse/ARI-3811 - # After running for some amount of time (3 weeks in the latest case), - # consumer looks normal but doesn't consume any messages. Not clear if - # it's hanging in drain_events() or not. As a temporary measure for - # mitigation (if it works) or debugging (if it doesn't work), close and - # reopen the connection every 15 minutes - RECONNECT_AFTER_MIN = 15 - - while not self.amqp_stop.is_set(): - try: - self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True) - url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange) - self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) - with kombu.Connection(self.amqp_url) as conn: - conn_opened = time.time() - with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: - import socket - while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_MIN * 60): - try: - conn.drain_events(timeout=0.5) - except socket.timeout: - pass - except BaseException as e: - self.logger.error("amqp exception {}".format(e)) - self.logger.error("attempting to reopen amqp connection") - - def _browse_page_requested(self, body, message): - """Kombu Consumer callback. Provisions an UmbraWorker and - asynchronously asks it to browse the requested url.""" - client_id = body['clientId'] - with self.workers_lock: - if not client_id in self.workers: - port = 9222 + len(self.workers) - t = UmbraWorker(umbra=self, chrome_port=port, - chrome_exe=self.chrome_exe, - chrome_wait=self.browser_wait, - client_id=client_id) - self.workers[client_id] = t - - def browse_page_async(): - self.logger.info('client_id={} body={}'.format(client_id, body)) - while True: - with self.workers_lock: - if self.num_active_workers < self.max_active_workers: - self.num_active_workers += 1 - break - time.sleep(0.5) - - self.workers[client_id].browse_page(body['url'], body['metadata']) - - with self.workers_lock: - self.num_active_workers -= 1 - - threading.Thread(target=browse_page_async).start() - - message.ack() - - -class Chrome: - logger = logging.getLogger('umbra.Chrome') - - def __init__(self, port, executable, browser_wait, user_data_dir): - self.port = port - self.executable = executable - self.browser_wait = browser_wait - self.user_data_dir = user_data_dir - - def fetch_debugging_json(): - raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() - json = raw_json.decode('utf-8') - return json.loads(json) - - # returns websocket url to chrome window with about:blank loaded - def __enter__(self): - chrome_args = [self.executable, - "--user-data-dir={}".format(self.user_data_dir), - "--remote-debugging-port={}".format(self.port), - "--disable-web-sockets", "--disable-cache", - "--window-size=1100,900", "--no-default-browser-check", - "--disable-first-run-ui", "--no-first-run", - "--homepage=about:blank", "about:blank"] - self.logger.info("running {}".format(chrome_args)) - self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True) - self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) - start = time.time() - - json_url = "http://localhost:%s/json" % self.port - - while True: - try: - raw_json = urllib.request.urlopen(json_url).read() - all_debug_info = json.loads(raw_json.decode('utf-8')) - debug_info = [x for x in all_debug_info if x['url'] == 'about:blank'] - - if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: - self.logger.debug("{} returned {}".format(json_url, raw_json)) - url = debug_info[0]['webSocketDebuggerUrl'] - self.logger.info('got chrome window websocket debug url {} from {}'.format(url, json_url)) - return url - except: - pass - finally: - if time.time() - start > float(self.browser_wait): - raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - start)) - else: - time.sleep(0.5) - - def __exit__(self, *args): - self.logger.info("killing chrome pid {}".format(self.chrome_process.pid)) - os.killpg(self.chrome_process.pid, signal.SIGINT) - self.chrome_process.wait() - -def main(): - import faulthandler - faulthandler.register(signal.SIGQUIT) - - arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='umbra - Browser automation tool', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', - help='Seconds to wait for browser initialization') - arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', - help='Executable to use to invoke chrome') - arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') - arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3', - help='Max number of chrome instances simultaneously browsing pages') - args = arg_parser.parse_args(args=sys.argv[1:]) - - umbra = Umbra(args.amqp_url, args.executable, args.browser_wait, max_active_workers=int(args.max_workers)) - try: - while True: - time.sleep(0.5) - except: - pass - finally: - umbra.shutdown() - - -if __name__ == "__main__": - main() - From b59e76a5b9b23340f0fa1fceed7db5de2c6d641e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 03:02:48 -0700 Subject: [PATCH 04/19] clean shutdown without draining entire amqp queue (only consume urls from amqp when browser activity isn't saturated) --- bin/umbra | 4 ++-- umbra/controller.py | 22 ++++++++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/bin/umbra b/bin/umbra index 2b503c1..43bc807 100755 --- a/bin/umbra +++ b/bin/umbra @@ -27,13 +27,13 @@ if __name__=="__main__": help='Executable to use to invoke chrome') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') - arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3', + arg_parser.add_argument('-n', '--max-workers', dest='max_browsers', default='3', help='Max number of chrome instances simultaneously browsing pages') args = arg_parser.parse_args(args=sys.argv[1:]) controller = umbra.controller.AmqpBrowserController(args.amqp_url, args.executable, args.browser_wait, - max_active_workers=int(args.max_workers)) + max_active_browsers=int(args.max_browsers)) try: while True: time.sleep(0.5) diff --git a/umbra/controller.py b/umbra/controller.py index 69aab39..feecbea 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -40,15 +40,15 @@ class AmqpBrowserController: logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', + def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', chrome_exe='chromium-browser', browser_wait=60, - max_active_workers=1, queue_name='urls', routing_key='url', + max_active_browsers=1, queue_name='urls', routing_key='url', exchange_name='umbra'): self.amqp_url = amqp_url self.chrome_exe = chrome_exe self.browser_wait = browser_wait - self.max_active_workers = max_active_workers + self.max_active_browsers = max_active_browsers self.queue_name = queue_name self.routing_key = routing_key self.exchange_name = exchange_name @@ -62,7 +62,7 @@ class AmqpBrowserController: self.browsers = {} self.browsers_lock = threading.Lock() - self.num_active_workers = 0 + self.num_active_browsers = 0 self.amqp_thread = threading.Thread(target=self._consume_amqp) self.amqp_stop = threading.Event() self.amqp_thread.start() @@ -71,6 +71,9 @@ class AmqpBrowserController: self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) self.amqp_stop.set() self.amqp_thread.join() + with self.producer_lock: + self.producer_conn.close() + self.producer_conn = None def _consume_amqp(self): # XXX https://webarchive.jira.com/browse/ARI-3811 @@ -91,7 +94,10 @@ class AmqpBrowserController: import socket while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): try: - conn.drain_events(timeout=0.5) + if self.num_active_browsers < self.max_active_browsers: + conn.drain_events(timeout=0.5) + else: + time.sleep(0.5) except socket.timeout: pass except BaseException as e: @@ -126,15 +132,15 @@ class AmqpBrowserController: self.logger.info('client_id={} body={}'.format(client_id, body)) while True: with self.browsers_lock: - if self.num_active_workers < self.max_active_workers: - self.num_active_workers += 1 + if self.num_active_browsers < self.max_active_browsers: + self.num_active_browsers += 1 break time.sleep(0.5) browser.browse_page(body['url'], on_request=on_request) with self.browsers_lock: - self.num_active_workers -= 1 + self.num_active_browsers -= 1 threading.Thread(target=browse_page_async).start() From 8749b97811f3917b7fdeefad78bc461a6f52422c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 03:10:33 -0700 Subject: [PATCH 05/19] oops, check in browser.py --- umbra/browser.py | 191 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 umbra/browser.py diff --git a/umbra/browser.py b/umbra/browser.py new file mode 100644 index 0000000..0f0dae5 --- /dev/null +++ b/umbra/browser.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import logging +import json +import urllib.request +import itertools +import websocket +import time +import threading +import subprocess +import signal +import tempfile +import os +from umbra.behaviors import Behavior + +class Browser: + """Runs chrome/chromium to synchronously browse one page at a time using + worker.browse_page(). Currently the implementation starts up a new instance + of chrome for each page browsed, always on the same debug port. (In the + future, it may keep the browser running indefinitely.)""" + + logger = logging.getLogger(__module__ + "." + __qualname__) + + HARD_TIMEOUT_SECONDS = 20 * 60 + + def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60, client_id='request'): + self.command_id = itertools.count(1) + self._lock = threading.Lock() + self.chrome_port = chrome_port + self.chrome_exe = chrome_exe + self.chrome_wait = chrome_wait + self.client_id = client_id + self._behavior = None + self.websock = None + + def browse_page(self, url, on_request=None): + """Synchronously browses a page and runs behaviors. First blocks to + acquire lock to ensure we only browse one page at a time.""" + with self._lock: + self.url = url + self.on_request = on_request + with tempfile.TemporaryDirectory() as user_data_dir: + with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: + self.websock = websocket.WebSocketApp(websocket_url, + on_open=self._visit_page, + on_message=self._handle_message) + websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5}) + websock_thread.start() + start = time.time() + + while True: + time.sleep(0.5) + if not self.websock or not self.websock.sock or not self.websock.sock.connected: + self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) + break + elif time.time() - start > Browser.HARD_TIMEOUT_SECONDS: + self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url)) + break + elif self._behavior != None and self._behavior.is_finished(): + self.logger.info("finished browsing page according to behavior url={}".format(self.url)) + break + + try: + self.websock.close() + except BaseException as e: + self.logger.error("exception closing websocket {} - {}".format(self.websock, e)) + + websock_thread.join() + self._behavior = None + + def send_to_chrome(self, **kwargs): + msg_id = next(self.command_id) + kwargs['id'] = msg_id + msg = json.dumps(kwargs) + self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) + self.websock.send(msg) + return msg_id + + def _visit_page(self, websock): + self.send_to_chrome(method="Network.enable") + self.send_to_chrome(method="Page.enable") + self.send_to_chrome(method="Console.enable") + self.send_to_chrome(method="Debugger.enable") + self.send_to_chrome(method="Runtime.enable") + + # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused" + self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}) + + # navigate to the page! + self.send_to_chrome(method="Page.navigate", params={"url": self.url}) + + def _handle_message(self, websock, message): + # self.logger.debug("message from {} - {}".format(websock.url, message[:95])) + # self.logger.debug("message from {} - {}".format(websock.url, message)) + message = json.loads(message) + if "method" in message and message["method"] == "Network.requestWillBeSent": + if self._behavior: + self._behavior.notify_of_activity() + if message["params"]["request"]["url"].lower().startswith("data:"): + self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) + else: + self.on_request(message) + elif "method" in message and message["method"] == "Page.loadEventFired": + if self._behavior is None: + self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) + self._behavior = Behavior(self.url, self) + self._behavior.start() + else: + self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message)) + elif "method" in message and message["method"] == "Console.messageAdded": + self.logger.debug("{} console.{} {}".format(websock.url, + message["params"]["message"]["level"], + message["params"]["message"]["text"])) + elif "method" in message and message["method"] == "Debugger.paused": + # We hit the breakpoint set in visit_page. Get rid of google + # analytics script! + + self.logger.debug("debugger paused! message={}".format(message)) + scriptId = message['params']['callFrames'][0]['location']['scriptId'] + + # replace script + self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"}) + + # resume execution + self.send_to_chrome(method="Debugger.resume") + elif "result" in message: + if self._behavior and self._behavior.is_waiting_on_result(message['id']): + self._behavior.notify_of_result(message) + # elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"): + # pass + # elif "method" in message: + # self.logger.debug("{} {}".format(message["method"], message)) + # else: + # self.logger.debug("[no-method] {}".format(message)) + + +class Chrome: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, port, executable, browser_wait, user_data_dir): + self.port = port + self.executable = executable + self.browser_wait = browser_wait + self.user_data_dir = user_data_dir + + def fetch_debugging_json(): + raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() + json = raw_json.decode('utf-8') + return json.loads(json) + + # returns websocket url to chrome window with about:blank loaded + def __enter__(self): + chrome_args = [self.executable, + "--user-data-dir={}".format(self.user_data_dir), + "--remote-debugging-port={}".format(self.port), + "--disable-web-sockets", "--disable-cache", + "--window-size=1100,900", "--no-default-browser-check", + "--disable-first-run-ui", "--no-first-run", + "--homepage=about:blank", "about:blank"] + self.logger.info("running {}".format(chrome_args)) + self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True) + self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) + start = time.time() + + json_url = "http://localhost:%s/json" % self.port + + while True: + try: + raw_json = urllib.request.urlopen(json_url).read() + all_debug_info = json.loads(raw_json.decode('utf-8')) + debug_info = [x for x in all_debug_info if x['url'] == 'about:blank'] + + if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: + self.logger.debug("{} returned {}".format(json_url, raw_json)) + url = debug_info[0]['webSocketDebuggerUrl'] + self.logger.info('got chrome window websocket debug url {} from {}'.format(url, json_url)) + return url + except: + pass + finally: + if time.time() - start > float(self.browser_wait): + raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - start)) + else: + time.sleep(0.5) + + def __exit__(self, *args): + self.logger.info("killing chrome pid {}".format(self.chrome_process.pid)) + os.killpg(self.chrome_process.pid, signal.SIGINT) + self.chrome_process.wait() + From 1e18c2ca74840e96283386235aa7813b99f795a1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 16:44:13 -0700 Subject: [PATCH 06/19] improve helper utilities --- bin/drain-queue | 48 ++++++++++++++++++++++++++++++++++ bin/dump_queue.py | 21 --------------- bin/{load_url.py => queue-url} | 26 +++++++++++------- setup.py | 3 ++- 4 files changed, 67 insertions(+), 31 deletions(-) create mode 100755 bin/drain-queue delete mode 100755 bin/dump_queue.py rename bin/{load_url.py => queue-url} (52%) diff --git a/bin/drain-queue b/bin/drain-queue new file mode 100755 index 0000000..ad9fd24 --- /dev/null +++ b/bin/drain-queue @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# vim: set sw=4 et: +import os +import sys +import argparse +import logging +import socket +from kombu import Connection, Exchange, Queue + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='drain-queue - consume messages from AMQP queue', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the AMQP server to talk to') +arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') +arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', + help='AMQP queue name') +arg_parser.add_argument('-n', '--no-ack', dest='no_ack', action="store_const", + default=False, const=True, help="leave messages on the queue (default: remove them from the queue)") +arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store_const", + default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)") +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stderr, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +def print_and_maybe_ack(body, message): + print(body) + if not args.no_ack: + message.ack() + +exchange = Exchange(args.amqp_exchange, 'direct', durable=True) +queue = Queue(args.amqp_queue, exchange=exchange) +try: + with Connection(args.amqp_url) as conn: + with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer: + while True: + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + if not args.run_forever: + logging.debug("exiting, no messages left on the queue") + break +except KeyboardInterrupt: + logging.debug("exiting, stopped by user") diff --git a/bin/dump_queue.py b/bin/dump_queue.py deleted file mode 100755 index 9e31619..0000000 --- a/bin/dump_queue.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python -from json import dumps, loads -import os,sys,argparse, urllib.request, urllib.error, urllib.parse -import websocket -import time -import uuid -import logging -import threading -from kombu import Connection, Exchange, Queue -logging.basicConfig(level=logging.INFO) - -umbra_exchange = Exchange('umbra', 'direct', durable=True) -requests_queue = Queue('requests', exchange=umbra_exchange) -def print_and_ack(body, message): - print(body['url']) - message.ack() - -with Connection(sys.argv[1] if len(sys.argv) > 1 else "amqp://guest:guest@localhost:5672//") as conn: - with conn.Consumer(requests_queue, callbacks=[print_and_ack]) as consumer: - while True: - conn.drain_events() diff --git a/bin/load_url.py b/bin/queue-url similarity index 52% rename from bin/load_url.py rename to bin/queue-url index ac87cef..2e4d01c 100755 --- a/bin/load_url.py +++ b/bin/queue-url @@ -9,22 +9,30 @@ import logging import threading from kombu import Connection, Exchange, Queue -logging.basicConfig(stream=sys.stdout, level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='load_url.py - send url to umbra via amqp', + description='queue-url - send url to umbra via amqp', formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', - help='URL identifying the amqp server to talk to') +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the AMQP server to talk to') +arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') +arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', + help='AMQP routing key') arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0', help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') args = arg_parser.parse_args(args=sys.argv[1:]) -umbra_exchange = Exchange('umbra', 'direct', durable=True) +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +exchange = Exchange(args.amqp_exchange, 'direct', durable=True) with Connection(args.amqp_url) as conn: producer = conn.Producer(serializer='json') for url in args.urls: - producer.publish({'url': url, 'metadata': {}, 'clientId': args.client_id}, 'url', exchange=umbra_exchange) - + payload = {'url': url, 'metadata': {}, 'clientId': args.client_id} + logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload)) + producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange) + diff --git a/setup.py b/setup.py index f0fd904..ae5c4ab 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,7 @@ # vim: set sw=4 et: import setuptools +import glob setuptools.setup(name='umbra', version='0.1', @@ -13,7 +14,7 @@ setuptools.setup(name='umbra', packages=['umbra'], package_data={'umbra':['behaviors.d/*.js']}, install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'], - scripts=['bin/umbra', 'bin/load_url.py', 'bin/dump_queue.py'], + scripts=glob.glob("bin/*"), zip_safe=False, classifiers=[ 'Development Status :: 3 - Alpha Development Status', From 99d219dfda1e06b2886bc9b47a21a34bd6f738d2 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 17:06:26 -0700 Subject: [PATCH 07/19] not sure why /bin/ et al were in .gitignore... replace with a couple of useful things --- .gitignore | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 2daac0a..743b95b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,4 @@ *.pyc -/bin/ -/include/ -/lib/ -/local/ -/share/ -/build/ +*.diff +.*.sw* /umbra.egg-info/ From d4693b2aba9d51488684cf62601e439458f691e7 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 17:07:42 -0700 Subject: [PATCH 08/19] remove unused param to __init__, avoid exception when on_request callback not provided --- umbra/browser.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/umbra/browser.py b/umbra/browser.py index 0f0dae5..45e277d 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -24,13 +24,12 @@ class Browser: HARD_TIMEOUT_SECONDS = 20 * 60 - def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60, client_id='request'): + def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60): self.command_id = itertools.count(1) self._lock = threading.Lock() self.chrome_port = chrome_port self.chrome_exe = chrome_exe self.chrome_wait = chrome_wait - self.client_id = client_id self._behavior = None self.websock = None @@ -99,7 +98,7 @@ class Browser: self._behavior.notify_of_activity() if message["params"]["request"]["url"].lower().startswith("data:"): self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) - else: + elif self.on_request: self.on_request(message) elif "method" in message and message["method"] == "Page.loadEventFired": if self._behavior is None: From 6c69b68771688b025a3fbf37fdcd13f0991a9e93 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 17:10:41 -0700 Subject: [PATCH 09/19] organize imports, tweak command line args --- bin/queue-url | 9 +++------ bin/umbra | 6 +++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/bin/queue-url b/bin/queue-url index 2e4d01c..de570fd 100755 --- a/bin/queue-url +++ b/bin/queue-url @@ -1,12 +1,9 @@ #!/usr/bin/env python # vim: set sw=4 et: -from json import dumps, loads -import os,sys,argparse, urllib.request, urllib.error, urllib.parse -import websocket -import time -import uuid +import os +import sys +import argparse import logging -import threading from kombu import Connection, Exchange, Queue arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), diff --git a/bin/umbra b/bin/umbra index 43bc807..dc8decd 100755 --- a/bin/umbra +++ b/bin/umbra @@ -23,16 +23,16 @@ if __name__=="__main__": formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', help='Seconds to wait for browser initialization') - arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', + arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='Executable to use to invoke chrome') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') - arg_parser.add_argument('-n', '--max-workers', dest='max_browsers', default='3', + arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='3', help='Max number of chrome instances simultaneously browsing pages') args = arg_parser.parse_args(args=sys.argv[1:]) controller = umbra.controller.AmqpBrowserController(args.amqp_url, - args.executable, args.browser_wait, + args.chrome_exe, args.browser_wait, max_active_browsers=int(args.max_browsers)) try: while True: From d7cfcbf2333cc7e751fbcd17d6a4e35314b4200a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 17:11:16 -0700 Subject: [PATCH 10/19] new helper utility to browse urls provided as command line args --- bin/browse-url | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100755 bin/browse-url diff --git a/bin/browse-url b/bin/browse-url new file mode 100755 index 0000000..9fd6311 --- /dev/null +++ b/bin/browse-url @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import argparse +import os +import sys +import logging +from umbra.browser import Browser + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='browse-url - open urls in chrome/chromium and run behaviors', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', + help='seconds to wait for browser initialization') +arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', + help='executable to use to invoke chrome') +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +browser = Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) +for url in args.urls: + browser.browse_page(url) + From bf3afcccb99a1ee23afa6651f2d4230507879e3a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 20 May 2014 19:27:53 -0700 Subject: [PATCH 11/19] oops, Browser.__init__ doesn't take client_id anymore --- umbra/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/umbra/controller.py b/umbra/controller.py index feecbea..ff4e4cc 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -125,7 +125,7 @@ class AmqpBrowserController: # XXX should reuse ports port = 9222 + len(self.browsers) browser = Browser(chrome_port=port, chrome_exe=self.chrome_exe, - chrome_wait=self.browser_wait, client_id=client_id) + chrome_wait=self.browser_wait) self.browsers[client_id] = browser def browse_page_async(): From 155db96461bbaeed765bee02371c23a9bc8a143e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 13:27:00 -0700 Subject: [PATCH 12/19] provide abbreviated api --- umbra/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/umbra/__init__.py b/umbra/__init__.py index e69de29..b8964b2 100644 --- a/umbra/__init__.py +++ b/umbra/__init__.py @@ -0,0 +1,3 @@ +from umbra.browser import Browser +from umbra.controller import AmqpBrowserController +Umbra = AmqpBrowserController From a7cd872b95495352f038624e68e4de957313576d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 13:34:07 -0700 Subject: [PATCH 13/19] sleep for 0.5 sec before attempting to reconnect to amqp; documentation tweaks --- umbra/controller.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/umbra/controller.py b/umbra/controller.py index ff4e4cc..a94cc96 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -8,9 +8,12 @@ import kombu from umbra.browser import Browser class AmqpBrowserController: - """Consumes amqp messages representing requests to browse urls, from the - amqp queue "urls" on exchange "umbra". Incoming amqp message is a json - object with 3 attributes: + """ + Consumes amqp messages representing requests to browse urls, from the + specified amqp queue (default: "urls") on the specified amqp exchange + (default: "umbra"). Incoming amqp message is a json object with 3 + attributes: + { "clientId": "umbra.client.123", "url": "http://example.com/my_fancy_page", @@ -19,20 +22,20 @@ class AmqpBrowserController: "url" is the url to browse. - "clientId" uniquely identifies the client of - umbra. Umbra uses the clientId to direct information via amqp back to the - client. It sends this information on that same "umbra" exchange, and uses - the clientId as the amqp routing key. + "clientId" uniquely identifies the client of umbra. Umbra uses the clientId + as the amqp routing key, to direct information via amqp back to the client. + It sends this information on the same specified amqp exchange (default: + "umbra"). Each url requested in the browser is published to amqp this way. The outgoing amqp message is a json object: { - 'url': 'http://example.com/images/embedded_thing.jpg', - 'method': 'GET', - 'headers': {'User-Agent': '...', 'Accept': '...'} - 'parentUrl': 'http://example.com/my_fancy_page', - 'parentUrlMetadata': {"arbitrary":"fields", "etc":4}, + "url": "http://example.com/images/embedded_thing.jpg", + "method": "GET", + "headers": {"User-Agent": "...", "Accept": "...", ...}, + "parentUrl": "http://example.com/my_fancy_page", + "parentUrlMetadata": {"arbitrary":"fields", "etc":4, ...} } POST requests have an additional field, postData. @@ -102,6 +105,7 @@ class AmqpBrowserController: pass except BaseException as e: self.logger.error("amqp exception {}".format(e)) + time.sleep(0.5) self.logger.error("attempting to reopen amqp connection") def _browse_page_requested(self, body, message): From 6f61d0289b98ded032a4e446372915769aaf370c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 13:34:51 -0700 Subject: [PATCH 14/19] improve readme, mentioning archive-it per kristine --- README.md | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 58db62c..239ed64 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,38 @@ umbra ===== +Umbra is a browser automation tool, developed for the web archiving service +https://archive-it.org/. -Browser automation via chrome debug protocol +Umbra receives urls via AMQP. It opens them in the chrome or chromium browser, +with which it communicates using the chrome remote debug protocol (see +https://developer.chrome.com/devtools/docs/debugger-protocol). It runs +javascript behaviors to simulate user interaction with the page. It publishes +information about the the urls requested by the browser back to AMQP. The +format of the incoming and outgoing AMQP messages is described in `pydoc +umbra.controller`. + +Umbra can be used with the Heritrix web crawler, using these heritrix modules: +* [AMQPUrlReceiver](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java) +* [AMQPPublishProcessor](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java) Install -====== -Install via pip from this repo. +------ +Install via pip from this repo, e.g. + + pip install git+https://github.com/internetarchive/umbra.git + +Umbra requires an AMQP messaging service like RabbitMQ. On Ubuntu, +`sudo apt-get install rabbitmq-server` will install and start RabbitMQ +at amqp://guest:guest@localhost:5672/%2f, which the default AMQP url for umbra. Run -===== -"umbra" script should be in bin/. -load_url.py takes urls as arguments and puts them onto a rabbitmq queue -dump_queue.py prints resources discovered by the browser and sent over the return queue. +--- +The command `umbra` will start umbra with default configuration. `umbra --help` +describes all command line options. + +Umbra also comes with these utilities: +* browse-url - open urls in chrome/chromium and run behaviors (without involving AMQP) +* queue-url - send url to umbra via AMQP +* drain-queue - consume messages from AMQP queue -On ubuntu, rabbitmq install with `sudo apt-get install rabbitmq-server` should automatically -be set up for these three scripts to function on localhost ( the default amqp url ). From bd3f979b56b0e612fd636fcb5b91a11ae5b8db0d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 13:39:08 -0700 Subject: [PATCH 15/19] capitalize AMQP in description --- bin/queue-url | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/queue-url b/bin/queue-url index de570fd..272c2e7 100755 --- a/bin/queue-url +++ b/bin/queue-url @@ -1,5 +1,6 @@ #!/usr/bin/env python # vim: set sw=4 et: + import os import sys import argparse @@ -7,7 +8,7 @@ import logging from kombu import Connection, Exchange, Queue arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='queue-url - send url to umbra via amqp', + description='queue-url - send url to umbra via AMQP', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the AMQP server to talk to') From 8d269f4c5687681a979048944cf73a7370861dda Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 13:39:39 -0700 Subject: [PATCH 16/19] add options --verbose, --exchange, --queue, --routing-key --- bin/umbra | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/bin/umbra b/bin/umbra index dc8decd..06a2f32 100755 --- a/bin/umbra +++ b/bin/umbra @@ -1,6 +1,6 @@ #!/usr/bin/env python # vim: set sw=4 et: - + import logging import argparse import time @@ -11,15 +11,11 @@ import os import umbra.controller if __name__=="__main__": - # logging.basicConfig(stream=sys.stdout, level=logging.INFO, - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - import faulthandler faulthandler.register(signal.SIGQUIT) arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='umbra - Browser automation tool', + description='umbra - browser automation tool communicating via AMQP', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', help='Seconds to wait for browser initialization') @@ -27,13 +23,27 @@ if __name__=="__main__": help='Executable to use to invoke chrome') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') + arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') + arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', + help='AMQP queue to consume urls from') + arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', + help='AMQP routing key to bind to the AMQP queue') arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='3', help='Max number of chrome instances simultaneously browsing pages') + arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) args = arg_parser.parse_args(args=sys.argv[1:]) + logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + controller = umbra.controller.AmqpBrowserController(args.amqp_url, - args.chrome_exe, args.browser_wait, - max_active_browsers=int(args.max_browsers)) + args.chrome_exe, args.browser_wait, + max_active_browsers=int(args.max_browsers), + exchange_name=args.amqp_exchange, queue_name=args.amqp_queue, + routing_key=args.amqp_routing_key) + try: while True: time.sleep(0.5) From 2c4ba005b53b28fb21a6af5a507a74471a2aacf5 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 21:59:34 -0700 Subject: [PATCH 17/19] make umbra amenable to clustering by using a pool of n browsers and removing the browser-clientId affinity (not useful currently since we start a fresh browser instance for each page browsed), and set prefetch_count=1 on amqp consumers to round-robin incoming urls among umbra instances --- bin/drain-queue | 1 + bin/umbra | 9 ++-- umbra/browser.py | 41 ++++++++++++++-- umbra/controller.py | 113 ++++++++++++++++++++------------------------ 4 files changed, 94 insertions(+), 70 deletions(-) diff --git a/bin/drain-queue b/bin/drain-queue index ad9fd24..f198d15 100755 --- a/bin/drain-queue +++ b/bin/drain-queue @@ -37,6 +37,7 @@ queue = Queue(args.amqp_queue, exchange=exchange) try: with Connection(args.amqp_url) as conn: with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer: + consumer.qos(prefetch_count=1) while True: try: conn.drain_events(timeout=0.5) diff --git a/bin/umbra b/bin/umbra index 06a2f32..9200736 100755 --- a/bin/umbra +++ b/bin/umbra @@ -8,7 +8,7 @@ import umbra import sys import signal import os -import umbra.controller +import umbra if __name__=="__main__": import faulthandler @@ -38,18 +38,19 @@ if __name__=="__main__": logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - controller = umbra.controller.AmqpBrowserController(args.amqp_url, - args.chrome_exe, args.browser_wait, + umbra = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait, max_active_browsers=int(args.max_browsers), exchange_name=args.amqp_exchange, queue_name=args.amqp_queue, routing_key=args.amqp_routing_key) + umbra.start() + try: while True: time.sleep(0.5) except: pass finally: - controller.shutdown() + umbra.shutdown() diff --git a/umbra/browser.py b/umbra/browser.py index 45e277d..91a89ff 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -12,8 +12,44 @@ import subprocess import signal import tempfile import os +import socket from umbra.behaviors import Behavior +class BrowserPool: + def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60): + self._available = set() + + for i in range(0, size): + port_holder = self._grab_random_port() + browser = Browser(port_holder.getsockname()[1], chrome_exe, chrome_wait) + self._available.add((browser, port_holder)) + + self._lock = threading.Lock() + + def _grab_random_port(self): + """Returns socket bound to some port.""" + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + return sock + + def _hold_port(self, port): + """Returns socket bound to supplied port.""" + sock = socket.socket() + sock.bind(('127.0.0.1', port)) + return sock + + def acquire(self): + """Returns browser from pool if available, raises KeyError otherwise.""" + with self._lock: + (browser, port_holder) = self._available.pop() + port_holder.close() + return browser + + def release(self, browser): + with self._lock: + port_holder = self._hold_port(browser.chrome_port) + self._available.add((browser, port_holder)) + class Browser: """Runs chrome/chromium to synchronously browse one page at a time using worker.browse_page(). Currently the implementation starts up a new instance @@ -143,11 +179,6 @@ class Chrome: self.browser_wait = browser_wait self.user_data_dir = user_data_dir - def fetch_debugging_json(): - raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() - json = raw_json.decode('utf-8') - return json.loads(json) - # returns websocket url to chrome window with about:blank loaded def __enter__(self): chrome_args = [self.executable, diff --git a/umbra/controller.py b/umbra/controller.py index a94cc96..8faf0ee 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -5,7 +5,7 @@ import logging import time import threading import kombu -from umbra.browser import Browser +from umbra.browser import BrowserPool class AmqpBrowserController: """ @@ -47,36 +47,35 @@ class AmqpBrowserController: chrome_exe='chromium-browser', browser_wait=60, max_active_browsers=1, queue_name='urls', routing_key='url', exchange_name='umbra'): - self.amqp_url = amqp_url - self.chrome_exe = chrome_exe - self.browser_wait = browser_wait - self.max_active_browsers = max_active_browsers self.queue_name = queue_name self.routing_key = routing_key self.exchange_name = exchange_name - self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', durable=True) - self.producer = None - self.producer_lock = threading.Lock() - with self.producer_lock: - self.producer_conn = kombu.Connection(self.amqp_url) - self.producer = self.producer_conn.Producer(serializer='json') + self._browser_pool = BrowserPool(size=max_active_browsers, + chrome_exe=chrome_exe, chrome_wait=browser_wait) - self.browsers = {} - self.browsers_lock = threading.Lock() - self.num_active_browsers = 0 - self.amqp_thread = threading.Thread(target=self._consume_amqp) - self.amqp_stop = threading.Event() - self.amqp_thread.start() + def start(self): + self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', + durable=True) + + self._producer = None + self._producer_lock = threading.Lock() + with self._producer_lock: + self._producer_conn = kombu.Connection(self.amqp_url) + self._producer = self._producer_conn.Producer(serializer='json') + + self._amqp_thread = threading.Thread(target=self._consume_amqp) + self._amqp_stop = threading.Event() + self._amqp_thread.start() def shutdown(self): self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self.amqp_stop.set() - self.amqp_thread.join() - with self.producer_lock: - self.producer_conn.close() - self.producer_conn = None + self._amqp_stop.set() + self._amqp_thread.join() + with self._producer_lock: + self._producer_conn.close() + self._producer_conn = None def _consume_amqp(self): # XXX https://webarchive.jira.com/browse/ARI-3811 @@ -87,66 +86,58 @@ class AmqpBrowserController: # reopen the connection every 15 minutes RECONNECT_AFTER_SECONDS = 15 * 60 - while not self.amqp_stop.is_set(): + browser = None + + while not self._amqp_stop.is_set(): try: url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, exchange=self._exchange) self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) with kombu.Connection(self.amqp_url) as conn: conn_opened = time.time() - with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: - import socket - while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): + with conn.Consumer(url_queue) as consumer: + consumer.qos(prefetch_count=1) + while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): + import socket try: - if self.num_active_browsers < self.max_active_browsers: - conn.drain_events(timeout=0.5) - else: - time.sleep(0.5) - except socket.timeout: + browser = self._browser_pool.acquire() + consumer.callbacks = [self._make_callback(browser)] + conn.drain_events(timeout=0.5) + consumer.callbacks = None + except KeyError: + # no browsers available pass + except socket.timeout: + # no urls in the queue + self._browser_pool.release(browser) + except BaseException as e: self.logger.error("amqp exception {}".format(e)) time.sleep(0.5) self.logger.error("attempting to reopen amqp connection") - def _browse_page_requested(self, body, message): + def _make_callback(self, browser): + def callback(body, message): + self._browse_page(browser, body['clientId'], body['url'], body['metadata']) + message.ack() + return callback + + def _browse_page(self, browser, client_id, url, parent_url_metadata): """Kombu Consumer callback. Provisions a Browser and asynchronously asks it to browse the requested url.""" - client_id = body['clientId'] def on_request(chrome_msg): payload = chrome_msg['params']['request'] - payload['parentUrl'] = body['url'] - payload['parentUrlMetadata'] = body['metadata'] + payload['parentUrl'] = url + payload['parentUrlMetadata'] = parent_url_metadata self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload)) - with self.producer_lock: - publish = self.producer_conn.ensure(self.producer, self.producer.publish) + with self._producer_lock: + publish = self._producer_conn.ensure(self._producer, self._producer.publish) publish(payload, exchange=self._exchange, routing_key=client_id) - with self.browsers_lock: - if client_id in self.browsers: - browser = self.browsers[client_id] - else: - # XXX should reuse ports - port = 9222 + len(self.browsers) - browser = Browser(chrome_port=port, chrome_exe=self.chrome_exe, - chrome_wait=self.browser_wait) - self.browsers[client_id] = browser - def browse_page_async(): - self.logger.info('client_id={} body={}'.format(client_id, body)) - while True: - with self.browsers_lock: - if self.num_active_browsers < self.max_active_browsers: - self.num_active_browsers += 1 - break - time.sleep(0.5) - - browser.browse_page(body['url'], on_request=on_request) - - with self.browsers_lock: - self.num_active_browsers -= 1 + self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) + browser.browse_page(url, on_request=on_request) + self._browser_pool.release(browser) threading.Thread(target=browse_page_async).start() - message.ack() - From b67d9fadf0ab334ae6e14fe99f89961cb2ba7114 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 23 May 2014 22:30:25 -0700 Subject: [PATCH 18/19] log ports chose for browsers, and give threads nice names to make logs easier to understand --- umbra/browser.py | 10 +++++++++- umbra/controller.py | 7 +++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/umbra/browser.py b/umbra/browser.py index 91a89ff..d868782 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -16,6 +16,8 @@ import socket from umbra.behaviors import Behavior class BrowserPool: + logger = logging.getLogger(__module__ + "." + __qualname__) + def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60): self._available = set() @@ -26,6 +28,8 @@ class BrowserPool: self._lock = threading.Lock() + self.logger.info("browser ports: {}".format([browser.chrome_port for (browser, port_holder) in self._available])) + def _grab_random_port(self): """Returns socket bound to some port.""" sock = socket.socket() @@ -80,7 +84,11 @@ class Browser: self.websock = websocket.WebSocketApp(websocket_url, on_open=self._visit_page, on_message=self._handle_message) - websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5}) + + import random + threadName = "WebsockThread{}-{}".format(self.chrome_port, + ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) + websock_thread = threading.Thread(target=self.websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) websock_thread.start() start = time.time() diff --git a/umbra/controller.py b/umbra/controller.py index 8faf0ee..1d40317 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -65,7 +65,7 @@ class AmqpBrowserController: self._producer_conn = kombu.Connection(self.amqp_url) self._producer = self._producer_conn.Producer(serializer='json') - self._amqp_thread = threading.Thread(target=self._consume_amqp) + self._amqp_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') self._amqp_stop = threading.Event() self._amqp_thread.start() @@ -139,5 +139,8 @@ class AmqpBrowserController: browser.browse_page(url, on_request=on_request) self._browser_pool.release(browser) - threading.Thread(target=browse_page_async).start() + import random + threadName = "BrowsingThread{}-{}".format(browser.chrome_port, + ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) + threading.Thread(target=browse_page_async, name=threadName).start() From 9c08be2699add28591bf7f8cf339ab2ac0cd548d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 24 May 2014 01:52:22 -0700 Subject: [PATCH 19/19] sigterm and sigint both shutdown request shutdown, which stops consuming urls and waits for active browsers to finish; a second sigint/sigterm immediately shuts down active browsers --- bin/umbra | 28 +++++++++++++++++++++++++--- umbra/browser.py | 15 +++++++++++++++ umbra/controller.py | 26 ++++++++++++++------------ 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/bin/umbra b/bin/umbra index 9200736..167f200 100755 --- a/bin/umbra +++ b/bin/umbra @@ -9,6 +9,8 @@ import sys import signal import os import umbra +import signal +import threading if __name__=="__main__": import faulthandler @@ -43,14 +45,34 @@ if __name__=="__main__": exchange_name=args.amqp_exchange, queue_name=args.amqp_queue, routing_key=args.amqp_routing_key) + class ShutdownRequested(Exception): + pass + + def sigterm(signum, frame): + raise ShutdownRequested('shutdown requested (caught SIGTERM)') + def sigint(signum, frame): + raise ShutdownRequested('shutdown requested (caught SIGINT)') + + signal.signal(signal.SIGTERM, sigterm) + signal.signal(signal.SIGINT, sigint) + umbra.start() try: while True: time.sleep(0.5) - except: - pass + except ShutdownRequested as e: + logging.info(e) + except BaseException as e: + logging.fatal(e) finally: - umbra.shutdown() + try: + umbra.shutdown() + for th in threading.enumerate(): + if th != threading.current_thread(): + th.join() + except BaseException as e: + logging.warn("caught {}".format(e)) + umbra.shutdown_now() diff --git a/umbra/browser.py b/umbra/browser.py index d868782..3ab63bb 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -20,6 +20,7 @@ class BrowserPool: def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60): self._available = set() + self._in_use = set() for i in range(0, size): port_holder = self._grab_random_port() @@ -47,12 +48,19 @@ class BrowserPool: with self._lock: (browser, port_holder) = self._available.pop() port_holder.close() + self._in_use.add(browser) return browser def release(self, browser): with self._lock: port_holder = self._hold_port(browser.chrome_port) self._available.add((browser, port_holder)) + self._in_use.remove(browser) + + def shutdown_now(self): + for browser in self._in_use: + browser.shutdown_now() + class Browser: """Runs chrome/chromium to synchronously browse one page at a time using @@ -72,6 +80,10 @@ class Browser: self.chrome_wait = chrome_wait self._behavior = None self.websock = None + self._shutdown_now = False + + def shutdown_now(self): + self._shutdown_now = True def browse_page(self, url, on_request=None): """Synchronously browses a page and runs behaviors. First blocks to @@ -103,6 +115,9 @@ class Browser: elif self._behavior != None and self._behavior.is_finished(): self.logger.info("finished browsing page according to behavior url={}".format(self.url)) break + elif self._shutdown_now: + self.logger.warn("immediate shutdown requested") + break try: self.websock.close() diff --git a/umbra/controller.py b/umbra/controller.py index 1d40317..198d8bb 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -73,9 +73,12 @@ class AmqpBrowserController: self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) self._amqp_stop.set() self._amqp_thread.join() - with self._producer_lock: - self._producer_conn.close() - self._producer_conn = None + # with self._producer_lock: + # self._producer_conn.close() + # self._producer_conn = None + + def shutdown_now(self): + self._browser_pool.shutdown_now() def _consume_amqp(self): # XXX https://webarchive.jira.com/browse/ARI-3811 @@ -86,11 +89,11 @@ class AmqpBrowserController: # reopen the connection every 15 minutes RECONNECT_AFTER_SECONDS = 15 * 60 - browser = None + url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, + exchange=self._exchange) while not self._amqp_stop.is_set(): try: - url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, exchange=self._exchange) self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) with kombu.Connection(self.amqp_url) as conn: conn_opened = time.time() @@ -99,13 +102,13 @@ class AmqpBrowserController: while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): import socket try: - browser = self._browser_pool.acquire() + browser = self._browser_pool.acquire() # raises KeyError if none available consumer.callbacks = [self._make_callback(browser)] conn.drain_events(timeout=0.5) consumer.callbacks = None except KeyError: # no browsers available - pass + time.sleep(0.5) except socket.timeout: # no urls in the queue self._browser_pool.release(browser) @@ -122,9 +125,6 @@ class AmqpBrowserController: return callback def _browse_page(self, browser, client_id, url, parent_url_metadata): - """Kombu Consumer callback. Provisions a Browser and - asynchronously asks it to browse the requested url.""" - def on_request(chrome_msg): payload = chrome_msg['params']['request'] payload['parentUrl'] = url @@ -136,8 +136,10 @@ class AmqpBrowserController: def browse_page_async(): self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) - browser.browse_page(url, on_request=on_request) - self._browser_pool.release(browser) + try: + browser.browse_page(url, on_request=on_request) + finally: + self._browser_pool.release(browser) import random threadName = "BrowsingThread{}-{}".format(browser.chrome_port,