Merge branch 'master' into nlevitt-master

This commit is contained in:
Noah Levitt 2014-02-12 18:15:05 -08:00
commit 4dbe111aee
2 changed files with 38 additions and 68 deletions

View File

@ -24,5 +24,5 @@ umbra_exchange = Exchange('umbra', 'direct', durable=True)
with Connection(args.amqp_url) as conn: with Connection(args.amqp_url) as conn:
producer = conn.Producer(serializer='json') producer = conn.Producer(serializer='json')
for url in args.urls: for url in args.urls:
producer.publish({'url': url}, 'url', exchange=umbra_exchange) producer.publish({'url': url, 'metadata' : {}}, 'url', exchange=umbra_exchange)

View File

@ -2,6 +2,7 @@
# vim: set sw=4 et: # vim: set sw=4 et:
from json import dumps, loads from json import dumps, loads
from itertools import count
import os,sys,argparse, urllib.request, urllib.error, urllib.parse import os,sys,argparse, urllib.request, urllib.error, urllib.parse
import websocket import websocket
import time import time
@ -12,41 +13,27 @@ from kombu import Connection, Exchange, Queue
class Umbra: class Umbra:
logger = logging.getLogger('umbra.Umbra') logger = logging.getLogger('umbra.Umbra')
def __init__(self, websocket_url, amqp_url): def __init__(self, amqp_url, chrome_args):
self.cmd_id = 0
self.producer = None self.producer = None
self.browser_lock = threading.Lock()
self.amqp_url = amqp_url self.amqp_url = amqp_url
self.chrome_args = chrome_args
self.producer_lock = threading.Lock() self.producer_lock = threading.Lock()
self.websocket_url = websocket_url self.consume_amqp()
self.websock = websocket.WebSocketApp(websocket_url, on_message = self.handle_message)
self.amqp_thread = threading.Thread(target=self.consume_amqp)
self.amqp_stop = threading.Event()
self.websock.on_open = lambda ws: self.amqp_thread.start()
threading.Thread(target=self.websock.run_forever).start()
def shutdown(self): def get_message_handler(self, url, url_metadata):
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) def handle_message(ws, message):
self.amqp_stop.set() message = loads(message)
self.logger.info("shutting down websocket {}".format(self.websocket_url)) self.logger.info(message)
self.websock.close() if "method" in message.keys() and message["method"] == "Network.requestWillBeSent":
self.amqp_thread.join() to_send = {}
to_send.update(message['params']['request'])
def handle_message(self, ws, message): to_send.update(dict(parentUrl=url,parentUrlMetadata=url_metadata))
# self.logger.debug("handling message from websocket {} - {}".format(ws, message)) self.logger.debug('sending to amqp: {}'.format(to_send))
message = loads(message) with self.producer_lock:
if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": self.producer.publish(to_send,
to_send = message['params']['request'] routing_key='request',
to_send['parentUrl'] = self.url exchange=self.umbra_exchange)
to_send['parentUrlMetadata'] = self.url_metadata return handle_message
self.logger.debug('sending to amqp: {}'.format(to_send))
request_queue = Queue('requests', routing_key='request',
exchange=self.umbra_exchange)
with self.producer_lock:
self.producer.publish(to_send,
routing_key='request',
exchange=self.umbra_exchange,
declare=[request_queue])
def consume_amqp(self): def consume_amqp(self):
self.umbra_exchange = Exchange('umbra', 'direct', durable=True) self.umbra_exchange = Exchange('umbra', 'direct', durable=True)
@ -55,36 +42,27 @@ class Umbra:
with Connection(self.amqp_url) as conn: with Connection(self.amqp_url) as conn:
self.producer = conn.Producer(serializer='json') self.producer = conn.Producer(serializer='json')
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
import socket while True:
while not self.amqp_stop.is_set(): conn.drain_events()
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
pass
def send_command(self, **kwargs):
self.logger.debug("sending command kwargs={}".format(kwargs))
command = {}
command.update(kwargs)
self.cmd_id += 1
command['id'] = self.cmd_id
self.websock.send(dumps(command))
def fetch_url(self, body, message): def fetch_url(self, body, message):
self.logger.debug("body={} message={} message.headers={} message.payload={}".format(repr(body), message, message.headers, message.payload)) url, metadata = body['url'], body['metadata']
def send_websocket_commands(ws):
self.url = body['url'] command_id = count(1)
self.url_metadata = body['metadata'] ws.send(dumps(dict(method="Network.enable", id=next(command_id))))
ws.send(dumps(dict(method="Page.navigate", id=next(command_id), params={"url": url})))
with self.browser_lock:
self.send_command(method="Network.enable") #from umbra import behaviors
self.send_command(method="Page.navigate", params={"url": self.url}) #behaviors.execute(url, ws, command_id)
# XXX more logic goes here
time.sleep(10)
message.ack() message.ack()
with Chrome(*self.chrome_args) as websocket_url:
websock = websocket.WebSocketApp(websocket_url)
websock.on_message = self.get_message_handler(url, metadata)
websock.on_open = send_websocket_commands
websock.run_forever()
class Chrome(): class Chrome():
logger = logging.getLogger('umbra.Chrome') logger = logging.getLogger('umbra.Chrome')
@ -151,16 +129,8 @@ def main():
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to') help='URL identifying the amqp server to talk to')
args = arg_parser.parse_args(args=sys.argv[1:]) args = arg_parser.parse_args(args=sys.argv[1:])
with Chrome(args.port, args.executable, args.browser_wait) as websocket_url: chrome_args = (args.port, args.executable, args.browser_wait)
umbra = Umbra(websocket_url, args.amqp_url) umbra = Umbra(args.amqp_url, chrome_args)
try:
while True:
time.sleep(0.5)
except:
pass
finally:
umbra.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
main() main()