diff --git a/bin/load_url.py b/bin/load_url.py index cea8899..08b5322 100755 --- a/bin/load_url.py +++ b/bin/load_url.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# vim: set sw=4 et: from json import dumps, loads import os,sys,argparse, urllib.request, urllib.error, urllib.parse import websocket @@ -7,10 +8,21 @@ import uuid import logging import threading from kombu import Connection, Exchange, Queue -logging.basicConfig(level=logging.INFO) + +logging.basicConfig(stream=sys.stdout, level=logging.INFO, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='load_url.py - send url to umbra via amqp', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', + help='URL identifying the amqp server to talk to') +arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') +args = arg_parser.parse_args(args=sys.argv[1:]) + umbra_exchange = Exchange('umbra', 'direct', durable=True) -with Connection(sys.argv[1]) as conn: +with Connection(args.amqp_url) as conn: producer = conn.Producer(serializer='json') - for url in sys.argv[2:]: + for url in args.urls: producer.publish({'url': url}, 'url', exchange=umbra_exchange) diff --git a/umbra/umbra.py b/umbra/umbra.py index bb8d6e1..30e7d85 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -7,104 +7,130 @@ import uuid import logging import threading from kombu import Connection, Exchange, Queue -logging.basicConfig(level=logging.INFO) class Umbra: - def __init__(self, port, amqpurl): + logger = logging.getLogger('umbra.Umbra') + def __init__(self, websocket_url, amqp_url): self.cmd_id = 0 - self.chrome_debug_port = port self.producer = None - self.current_socket = False - self.amqpurl = amqpurl - self.launch_tab_socket = self.get_websocket(self.on_open) + self.browser_lock = threading.Lock() + self.amqp_url = amqp_url self.producer_lock = threading.Lock() - threading.Thread(target=self.launch_tab_socket.run_forever).start() + self.websocket_url = websocket_url + self.websock = websocket.WebSocketApp(websocket_url, on_message = self.handle_message) + self.amqp_thread = threading.Thread(target=self.consume_amqp) + self.amqp_stop = threading.Event() + self.websock.on_open = lambda ws: self.amqp_thread.start() + threading.Thread(target=self.websock.run_forever).start() - def get_websocket(self, on_open, url=None): - def fetch_debugging_json(): - return loads(urllib.request.urlopen("http://localhost:%s/json" % self.chrome_debug_port).read().decode('utf-8').replace("\\n","")) - time.sleep(0.5) - debug_info = [] - while not (debug_info and 'webSocketDebuggerUrl' in debug_info[0]): - debug_info = fetch_debugging_json() - if url: - debug_info = [x for x in debug_info if x['url'] == url] - return_socket = websocket.WebSocketApp(debug_info[0]['webSocketDebuggerUrl'], on_message = self.handle_message) - return_socket.on_open = on_open - return return_socket + def shutdown(self): + self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) + self.amqp_stop.set() + self.logger.info("shutting down websocket {}".format(self.websocket_url)) + self.websock.close() + self.amqp_thread.join() def handle_message(self, ws, message): + # self.logger.debug("handling message from websocket {} - {}".format(ws, message)) message = loads(message) if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": - request_queue = Queue('requests', routing_key='request', exchange=self.umbra_exchange) + request_queue = Queue('requests', routing_key='request', + exchange=self.umbra_exchange) with self.producer_lock: - self.producer.publish(message['params']['request'],routing_key='request', exchange=self.umbra_exchange, declare=[request_queue]) + self.producer.publish(message['params']['request'], + routing_key='request', exchange=self.umbra_exchange, + declare=[request_queue]) - - def start_amqp(self): + def consume_amqp(self): self.umbra_exchange = Exchange('umbra', 'direct', durable=True) - url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) - with Connection(self.amqpurl) as conn: + url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) + self.logger.info("connecting to amqp {} at {}".format(repr(self.umbra_exchange), self.amqp_url)) + with Connection(self.amqp_url) as conn: self.producer = conn.Producer(serializer='json') with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: - while True: - conn.drain_events() + import socket + while not self.amqp_stop.is_set(): + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + pass - def on_open(self, ws): - threading.Thread(target=self.start_amqp).start() - - - def send_command(self,tab=None, **kwargs): - if not tab: - tab = self.launch_tab_socket + def send_command(self, **kwargs): + self.logger.debug("sending command kwargs={}".format(kwargs)) command = {} command.update(kwargs) self.cmd_id += 1 command['id'] = self.cmd_id - tab.send(dumps(command)) + self.websock.send(dumps(command)) def fetch_url(self, body, message): - while self.current_socket: - time.sleep(1) - self.current_socket = True + self.logger.debug("body={} message={} message.headers={} message.payload={}".format(body, message, message.headers, message.payload)) + url = body['url'] - new_page = 'data:text/html;charset=utf-8,%s' % str(uuid.uuid4()) - self.send_command(method="Runtime.evaluate", params={"expression":"window.open('%s');" % new_page}) - def on_open(ws): - ws.on_message=self.handle_message - self.send_command(tab=ws, method="Network.enable") - self.send_command(tab=ws, method="Runtime.evaluate", params={"expression":"document.location = '%s';" % url}) - def do_close(): - time.sleep(10) - self.send_command(tab=ws, method="Runtime.evaluate", params={"expression":"window.open('', '_self', ''); window.close(); "}) - self.current_socket = False - threading.Thread(target=do_close).start() - socket = self.get_websocket(on_open, new_page) - message.ack() - threading.Thread(target=socket.run_forever).start() + + with self.browser_lock: + self.send_command(method="Network.enable") + self.send_command(method="Runtime.evaluate", params={"expression":"document.location = '%s';" % url}) + + # XXX more logic goes here + time.sleep(10) + + message.ack() class Chrome(): + logger = logging.getLogger('umbra.Chrome') + def __init__(self, port, executable, browser_wait): self.port = port self.executable = executable - self.browser_wait=browser_wait + self.browser_wait = browser_wait + + def fetch_debugging_json(): + raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() + json = raw_json.decode('utf-8') + return loads(json) def __enter__(self): import subprocess - self.chrome_process = subprocess.Popen([self.executable, "--disable-web-sockets", "--disable-cache", "--temp-profile", "--remote-debugging-port=%s" % self.port]) + chrome_args = [self.executable, "--temp-profile", + "--remote-debugging-port=%s" % self.port, + "--disable-web-sockets", "--disable-cache", + "--window-size=1100,900", "--enable-logging" + "--homepage=about:blank", "about:blank"] + self.logger.info("running {}".format(chrome_args)) + self.chrome_process = subprocess.Popen(chrome_args) + self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) start = time.time() - import socket - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - while sock.connect_ex(('127.0.0.1',int(self.port))) != 0 and (time.time() - start) < float(self.browser_wait): - time.sleep(0.1) - if (time.time() - start) > float(self.browser_wait): - raise Exception("Browser appears to have failed to start") + json_url = "http://localhost:%s/json" % self.port + while True: + try: + raw_json = urllib.request.urlopen(json_url).read() + all_debug_info = loads(raw_json.decode('utf-8')) + debug_info = [x for x in all_debug_info if x['url'] == 'about:blank'] + + if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: + self.logger.debug("{} returned {}".format(json_url, raw_json)) + url = debug_info[0]['webSocketDebuggerUrl'] + self.logger.info('got chrome window websocket debug url {} from {}'.format(url, json_url)) + return url + except: + pass + finally: + if time.time() - start > float(self.browser_wait): + raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - start)) + else: + time.sleep(0.5) def __exit__(self, *args): + self.logger.info("killing chrome pid {}".format(self.chrome_process.pid)) self.chrome_process.kill() def main(): + # logging.basicConfig(stream=sys.stdout, level=logging.INFO, + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), description='umbra - Browser automation tool', formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -114,13 +140,19 @@ def main(): help='Executable to use to invoke chrome') arg_parser.add_argument('-p', '--port', dest='port', default='9222', help='Port to have invoked chrome listen on for debugging connections') - arg_parser.add_argument('-u', '--url', dest='amqpurl', default='amqp://guest:guest@localhost:5672//', + arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', help='URL identifying the amqp server to talk to') args = arg_parser.parse_args(args=sys.argv[1:]) - with Chrome(args.port, args.executable, args.browser_wait): - Umbra(args.port, args.amqpurl) - while True: - time.sleep(1) + with Chrome(args.port, args.executable, args.browser_wait) as websocket_url: + umbra = Umbra(websocket_url, args.amqp_url) + try: + while True: + time.sleep(0.5) + except: + pass + finally: + umbra.shutdown() + if __name__ == "__main__": main()