diff --git a/bin/load_url.py b/bin/load_url.py index 43252db..ac87cef 100755 --- a/bin/load_url.py +++ b/bin/load_url.py @@ -17,6 +17,8 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', help='URL identifying the amqp server to talk to') +arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0', + help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') args = arg_parser.parse_args(args=sys.argv[1:]) @@ -24,5 +26,5 @@ umbra_exchange = Exchange('umbra', 'direct', durable=True) with Connection(args.amqp_url) as conn: producer = conn.Producer(serializer='json') for url in args.urls: - producer.publish({'url': url, 'metadata' : {}}, 'url', exchange=umbra_exchange) + producer.publish({'url': url, 'metadata': {}, 'clientId': args.client_id}, 'url', exchange=umbra_exchange) diff --git a/setup.py b/setup.py index 287f98c..616aad9 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,6 @@ setuptools.setup(name='umbra', 'Development Status :: 3 - Alpha Development Status', 'Environment :: Console', 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.3', 'Topic :: System :: Archiving', ]) diff --git a/umbra/behaviors.json b/umbra/behaviors.json new file mode 100644 index 0000000..7774ec9 --- /dev/null +++ b/umbra/behaviors.json @@ -0,0 +1,14 @@ +[ + { + "scripts": [ + "setInterval(function() { window.scrollBy(0,500); }, 150);" + ], + "site": ".*facebook.com.*" + }, + { + "scripts": [ + "setInterval(function() { window.scrollBy(0,50); }, 50);" + ], + "site": ".*flickr.com.*" + } +] diff --git a/umbra/behaviors.py b/umbra/behaviors.py new file mode 100644 index 0000000..c92b2f2 --- /dev/null +++ b/umbra/behaviors.py @@ -0,0 +1,18 @@ +# vim: set sw=4 et: + +from json import dumps, load +from time import sleep +import os, re +import logging + +behaviors_file = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.json']) +def execute(url, websock, command_id): + logger = logging.getLogger('behaviors') + with open(behaviors_file) as js: + behaviors = load(js) + for behavior in behaviors: + if re.match(behavior['site'], url): + for script in behavior['scripts']: + msg = dumps(dict(method="Runtime.evaluate", params={"expression": script}, id=next(command_id))) + logger.debug('sending message to {}: {}'.format(websock, msg)) + websock.send(msg) diff --git a/umbra/umbra.py b/umbra/umbra.py index a5f4959..0710990 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -9,21 +9,89 @@ import time import uuid import logging import threading +import subprocess +import signal from kombu import Connection, Exchange, Queue +import tempfile -class Umbra: - logger = logging.getLogger('umbra.Umbra') - def __init__(self, amqp_url, chrome_args): - self.producer = None - self.amqp_url = amqp_url - self.chrome_args = chrome_args - self.producer_lock = threading.Lock() - self.consume_amqp() +class UmbraWorker: + logger = logging.getLogger('umbra.UmbraWorker') - def get_message_handler(self, url, url_metadata): + def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'): + self.command_id = count(1) + self.lock = threading.Lock() + self.umbra = umbra + self.chrome_port = chrome_port + self.chrome_exe = chrome_exe + self.chrome_wait = chrome_wait + self.client_id = client_id + self.page_done = threading.Event() + self.idle_timer = None + + def browse_page(self, url, url_metadata): + with self.lock: + self.url = url + self.url_metadata = url_metadata + with tempfile.TemporaryDirectory() as user_data_dir: + with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: + websock = websocket.WebSocketApp(websocket_url, + on_open=self.visit_page, on_message=self.handle_message) + websock_thread = threading.Thread(target=websock.run_forever) + websock_thread.start() + + self.page_done.clear() + self._reset_idle_timer() + while not self.page_done.is_set(): + time.sleep(0.5) + + websock.close() + self.idle_timer = None + + def _reset_idle_timer(self): + if self.idle_timer: + self.idle_timer.cancel() + self.idle_timer = threading.Timer(60, self.page_done.set) + self.idle_timer.start() + + def visit_page(self, websock): + msg = dumps(dict(method="Network.enable", id=next(self.command_id))) + self.logger.debug('sending message to {}: {}'.format(websock, msg)) + websock.send(msg) + + msg = dumps(dict(method="Page.enable", id=next(self.command_id))) + self.logger.debug('sending message to {}: {}'.format(websock, msg)) + websock.send(msg) + + msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url})) + self.logger.debug('sending message to {}: {}'.format(websock, msg)) + websock.send(msg) + + def send_request_to_amqp(self, chrome_msg): + payload = chrome_msg['params']['request'] + payload['parentUrl'] = self.url + payload['parentUrlMetadata'] = self.url_metadata + self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange, self.client_id, payload)) + with self.umbra.producer_lock: + self.umbra.producer.publish(payload, + exchange=self.umbra.umbra_exchange, + routing_key=self.client_id) + + def handle_message(self, websock, message): + self.logger.debug("handling message from websocket {} - {}".format(websock, message[:95])) + self._reset_idle_timer() + message = loads(message) + if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": + self.send_request_to_amqp(message) + elif "method" in message.keys() and message["method"] == "Page.loadEventFired": + self.logger.debug("got Page.loadEventFired, starting behaviors for {}".format(self.url)) + from umbra import behaviors + behaviors.execute(self.url, websock, self.command_id) + + def get_message_handler(self, url, url_metadata, command_id): + this_watchdog = self.watchdog(command_id) def handle_message(ws, message): + this_watchdog.send(ws) message = loads(message) - self.logger.info(message) if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": to_send = {} to_send.update(message['params']['request']) @@ -35,60 +103,90 @@ class Umbra: exchange=self.umbra_exchange) return handle_message +class Umbra: + logger = logging.getLogger('umbra.Umbra') + + def __init__(self, amqp_url, chrome_exe, browser_wait): + self.amqp_url = amqp_url + self.chrome_exe = chrome_exe + self.browser_wait = browser_wait + self.producer = None + self.producer_lock = None + self.workers = {} + self.workers_lock = threading.Lock() + self.amqp_thread = threading.Thread(target=self.consume_amqp) + self.amqp_stop = threading.Event() + self.amqp_thread.start() + + def shutdown(self): + self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) + self.amqp_stop.set() + self.amqp_thread.join() + def consume_amqp(self): - self.umbra_exchange = Exchange('umbra', 'direct', durable=True) + self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) self.logger.info("connecting to amqp {} at {}".format(repr(self.umbra_exchange), self.amqp_url)) with Connection(self.amqp_url) as conn: self.producer = conn.Producer(serializer='json') + self.producer_lock = threading.Lock() with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: - while True: - conn.drain_events() + import socket + while not self.amqp_stop.is_set(): + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + pass def fetch_url(self, body, message): - url, metadata = body['url'], body['metadata'] - def send_websocket_commands(ws): - command_id = count(1) - ws.send(dumps(dict(method="Network.enable", id=next(command_id)))) - ws.send(dumps(dict(method="Page.navigate", id=next(command_id), params={"url": url}))) - - #from umbra import behaviors - #behaviors.execute(url, ws, command_id) - - message.ack() + client_id = body['clientId'] + with self.workers_lock: + if not client_id in self.workers: + port = 9222 + len(self.workers) + t = UmbraWorker(umbra=self, chrome_port=port, + chrome_exe=self.chrome_exe, + chrome_wait=self.browser_wait, + client_id=client_id) + self.workers[client_id] = t - with Chrome(*self.chrome_args) as websocket_url: - websock = websocket.WebSocketApp(websocket_url) - websock.on_message = self.get_message_handler(url, metadata) - websock.on_open = send_websocket_commands - websock.run_forever() + def browse_page_async(): + self.logger.info('client_id={} body={}'.format(client_id, body)) + self.workers[client_id].browse_page(body['url'], body['metadata']) -class Chrome(): + threading.Thread(target=browse_page_async).start() + + message.ack() + + +class Chrome: logger = logging.getLogger('umbra.Chrome') - def __init__(self, port, executable, browser_wait): + def __init__(self, port, executable, browser_wait, user_data_dir): self.port = port self.executable = executable self.browser_wait = browser_wait + self.user_data_dir = user_data_dir def fetch_debugging_json(): raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() json = raw_json.decode('utf-8') return loads(json) + # returns websocket url to chrome window with about:blank loaded def __enter__(self): - import subprocess - chrome_args = [self.executable, "--temp-profile", + chrome_args = [self.executable, + "--user-data-dir={}".format(self.user_data_dir), "--remote-debugging-port=%s" % self.port, "--disable-web-sockets", "--disable-cache", - "--window-size=1100,900", "--enable-logging" + "--window-size=1100,900", "--enable-logging", "--homepage=about:blank", "about:blank"] self.logger.info("running {}".format(chrome_args)) - self.chrome_process = subprocess.Popen(chrome_args) + self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True) self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) start = time.time() json_url = "http://localhost:%s/json" % self.port + while True: try: raw_json = urllib.request.urlopen(json_url).read() @@ -110,7 +208,8 @@ class Chrome(): def __exit__(self, *args): self.logger.info("killing chrome pid {}".format(self.chrome_process.pid)) - self.chrome_process.kill() + os.killpg(self.chrome_process.pid, signal.SIGINT) + self.chrome_process.wait() def main(): # logging.basicConfig(stream=sys.stdout, level=logging.INFO, @@ -124,13 +223,19 @@ def main(): help='Seconds to wait for browser initialization') arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', help='Executable to use to invoke chrome') - arg_parser.add_argument('-p', '--port', dest='port', default='9222', - help='Port to have invoked chrome listen on for debugging connections') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') args = arg_parser.parse_args(args=sys.argv[1:]) - chrome_args = (args.port, args.executable, args.browser_wait) - umbra = Umbra(args.amqp_url, chrome_args) + + umbra = Umbra(args.amqp_url, args.executable, args.browser_wait) + try: + while True: + time.sleep(0.5) + except: + pass + finally: + umbra.shutdown() + if __name__ == "__main__": main()