From bdf00cc515df9d340dae8b086dddf0a2c5f0c817 Mon Sep 17 00:00:00 2001 From: Eldon Date: Wed, 12 Feb 2014 19:31:03 -0500 Subject: [PATCH] Refactor to pull Chrome execution inside of umbra, simplify some things --- bin/load_url.py | 2 +- umbra/umbra.py | 104 +++++++++++++++++------------------------------- 2 files changed, 38 insertions(+), 68 deletions(-) diff --git a/bin/load_url.py b/bin/load_url.py index 08b5322..43252db 100755 --- a/bin/load_url.py +++ b/bin/load_url.py @@ -24,5 +24,5 @@ umbra_exchange = Exchange('umbra', 'direct', durable=True) with Connection(args.amqp_url) as conn: producer = conn.Producer(serializer='json') for url in args.urls: - producer.publish({'url': url}, 'url', exchange=umbra_exchange) + producer.publish({'url': url, 'metadata' : {}}, 'url', exchange=umbra_exchange) diff --git a/umbra/umbra.py b/umbra/umbra.py index a63c4ea..a5f4959 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -2,6 +2,7 @@ # vim: set sw=4 et: from json import dumps, loads +from itertools import count import os,sys,argparse, urllib.request, urllib.error, urllib.parse import websocket import time @@ -12,41 +13,27 @@ from kombu import Connection, Exchange, Queue class Umbra: logger = logging.getLogger('umbra.Umbra') - def __init__(self, websocket_url, amqp_url): - self.cmd_id = 0 + def __init__(self, amqp_url, chrome_args): self.producer = None - self.browser_lock = threading.Lock() self.amqp_url = amqp_url + self.chrome_args = chrome_args self.producer_lock = threading.Lock() - self.websocket_url = websocket_url - self.websock = websocket.WebSocketApp(websocket_url, on_message = self.handle_message) - self.amqp_thread = threading.Thread(target=self.consume_amqp) - self.amqp_stop = threading.Event() - self.websock.on_open = lambda ws: self.amqp_thread.start() - threading.Thread(target=self.websock.run_forever).start() + self.consume_amqp() - def shutdown(self): - self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self.amqp_stop.set() - self.logger.info("shutting down websocket {}".format(self.websocket_url)) - self.websock.close() - self.amqp_thread.join() - - def handle_message(self, ws, message): - # self.logger.debug("handling message from websocket {} - {}".format(ws, message)) - message = loads(message) - if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": - to_send = message['params']['request'] - to_send['parentUrl'] = self.url - to_send['parentUrlMetadata'] = self.url_metadata - self.logger.debug('sending to amqp: {}'.format(to_send)) - request_queue = Queue('requests', routing_key='request', - exchange=self.umbra_exchange) - with self.producer_lock: - self.producer.publish(to_send, - routing_key='request', - exchange=self.umbra_exchange, - declare=[request_queue]) + def get_message_handler(self, url, url_metadata): + def handle_message(ws, message): + message = loads(message) + self.logger.info(message) + if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": + to_send = {} + to_send.update(message['params']['request']) + to_send.update(dict(parentUrl=url,parentUrlMetadata=url_metadata)) + self.logger.debug('sending to amqp: {}'.format(to_send)) + with self.producer_lock: + self.producer.publish(to_send, + routing_key='request', + exchange=self.umbra_exchange) + return handle_message def consume_amqp(self): self.umbra_exchange = Exchange('umbra', 'direct', durable=True) @@ -55,36 +42,27 @@ class Umbra: with Connection(self.amqp_url) as conn: self.producer = conn.Producer(serializer='json') with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: - import socket - while not self.amqp_stop.is_set(): - try: - conn.drain_events(timeout=0.5) - except socket.timeout: - pass - - def send_command(self, **kwargs): - self.logger.debug("sending command kwargs={}".format(kwargs)) - command = {} - command.update(kwargs) - self.cmd_id += 1 - command['id'] = self.cmd_id - self.websock.send(dumps(command)) + while True: + conn.drain_events() def fetch_url(self, body, message): - self.logger.debug("body={} message={} message.headers={} message.payload={}".format(repr(body), message, message.headers, message.payload)) - - self.url = body['url'] - self.url_metadata = body['metadata'] - - with self.browser_lock: - self.send_command(method="Network.enable") - self.send_command(method="Page.navigate", params={"url": self.url}) - - # XXX more logic goes here - time.sleep(10) - + url, metadata = body['url'], body['metadata'] + def send_websocket_commands(ws): + command_id = count(1) + ws.send(dumps(dict(method="Network.enable", id=next(command_id)))) + ws.send(dumps(dict(method="Page.navigate", id=next(command_id), params={"url": url}))) + + #from umbra import behaviors + #behaviors.execute(url, ws, command_id) + message.ack() + with Chrome(*self.chrome_args) as websocket_url: + websock = websocket.WebSocketApp(websocket_url) + websock.on_message = self.get_message_handler(url, metadata) + websock.on_open = send_websocket_commands + websock.run_forever() + class Chrome(): logger = logging.getLogger('umbra.Chrome') @@ -151,16 +129,8 @@ def main(): arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') args = arg_parser.parse_args(args=sys.argv[1:]) - with Chrome(args.port, args.executable, args.browser_wait) as websocket_url: - umbra = Umbra(websocket_url, args.amqp_url) - try: - while True: - time.sleep(0.5) - except: - pass - finally: - umbra.shutdown() - + chrome_args = (args.port, args.executable, args.browser_wait) + umbra = Umbra(args.amqp_url, chrome_args) if __name__ == "__main__": main()