brozzler/umbra/umbra.py
2014-02-13 13:10:19 -05:00

158 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python
# vim: set sw=4 et:
from json import dumps, loads
from itertools import count
import os,sys,argparse, urllib.request, urllib.error, urllib.parse
import websocket
import time
import uuid
import logging
import threading
from kombu import Connection, Exchange, Queue
class Umbra:
logger = logging.getLogger('umbra.Umbra')
def __init__(self, amqp_url, chrome_args):
self.producer = None
self.amqp_url = amqp_url
self.chrome_args = chrome_args
self.producer_lock = threading.Lock()
self.consume_amqp()
def watchdog(self, command_id):
def wrapped():
timer = None
while True:
ws = yield
if timer:
self.logger.info("Cancelling")
timer.cancel()
def go():
close_exp = "window.open('', '_self', ''); window.close(); "
ws.send(dumps(dict(method="Runtime.evaluate", params={"expression": close_exp}, id=next(command_id))))
self.logger.info("Going")
ws.close()
timer = threading.Timer(10, go)
timer.start()
result = wrapped()
next(result)
return result
def get_message_handler(self, url, url_metadata, command_id):
this_watchdog = self.watchdog(command_id)
def handle_message(ws, message):
this_watchdog.send(ws)
message = loads(message)
if "method" in message.keys() and message["method"] == "Network.requestWillBeSent":
to_send = {}
to_send.update(message['params']['request'])
to_send.update(dict(parentUrl=url,parentUrlMetadata=url_metadata))
self.logger.debug('sending to amqp: {}'.format(to_send))
with self.producer_lock:
self.producer.publish(to_send,
routing_key='request',
exchange=self.umbra_exchange)
return handle_message
def consume_amqp(self):
self.umbra_exchange = Exchange('umbra', 'direct', durable=True)
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
self.logger.info("connecting to amqp {} at {}".format(repr(self.umbra_exchange), self.amqp_url))
with Connection(self.amqp_url) as conn:
self.producer = conn.Producer(serializer='json')
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
while True:
conn.drain_events()
def fetch_url(self, body, message):
url, metadata = body['url'], body['metadata']
command_id = count(1)
def send_websocket_commands(ws):
ws.send(dumps(dict(method="Network.enable", id=next(command_id))))
ws.send(dumps(dict(method="Page.navigate", id=next(command_id), params={"url": url})))
from umbra import behaviors
behaviors.execute(url, ws, command_id)
message.ack()
with Chrome(*self.chrome_args) as websocket_url:
websock = websocket.WebSocketApp(websocket_url)
websock.on_message = self.get_message_handler(url, metadata, command_id)
websock.on_open = send_websocket_commands
websock.run_forever()
class Chrome():
logger = logging.getLogger('umbra.Chrome')
def __init__(self, port, executable, browser_wait):
self.port = port
self.executable = executable
self.browser_wait = browser_wait
def fetch_debugging_json():
raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read()
json = raw_json.decode('utf-8')
return loads(json)
def __enter__(self):
import subprocess
chrome_args = [self.executable, "--temp-profile",
"--remote-debugging-port=%s" % self.port,
"--disable-web-sockets", "--disable-cache",
"--window-size=1100,900", "--enable-logging"
"--homepage=about:blank", "about:blank"]
self.logger.info("running {}".format(chrome_args))
self.chrome_process = subprocess.Popen(chrome_args)
self.logger.info("chrome running, pid {}".format(self.chrome_process.pid))
start = time.time()
json_url = "http://localhost:%s/json" % self.port
while True:
try:
raw_json = urllib.request.urlopen(json_url).read()
all_debug_info = loads(raw_json.decode('utf-8'))
debug_info = [x for x in all_debug_info if x['url'] == 'about:blank']
if debug_info and 'webSocketDebuggerUrl' in debug_info[0]:
self.logger.debug("{} returned {}".format(json_url, raw_json))
url = debug_info[0]['webSocketDebuggerUrl']
self.logger.info('got chrome window websocket debug url {} from {}'.format(url, json_url))
return url
except:
pass
finally:
if time.time() - start > float(self.browser_wait):
raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - start))
else:
time.sleep(0.5)
def __exit__(self, *args):
self.logger.info("killing chrome pid {}".format(self.chrome_process.pid))
self.chrome_process.kill()
def main():
# logging.basicConfig(stream=sys.stdout, level=logging.INFO,
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='umbra - Browser automation tool',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='10',
help='Seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser',
help='Executable to use to invoke chrome')
arg_parser.add_argument('-p', '--port', dest='port', default='9222',
help='Port to have invoked chrome listen on for debugging connections')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
args = arg_parser.parse_args(args=sys.argv[1:])
chrome_args = (args.port, args.executable, args.browser_wait)
umbra = Umbra(args.amqp_url, chrome_args)
if __name__ == "__main__":
main()