From 55fad8055395e150469a8637d268b60e868e2320 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 5 May 2014 12:26:39 -0700 Subject: [PATCH] UmbraWorker.send_to_chrome() - central place to send message to chrome via websocket --- umbra/behaviors.py | 19 ++++------- umbra/umbra.py | 84 ++++++++++++++++++++-------------------------- 2 files changed, 43 insertions(+), 60 deletions(-) diff --git a/umbra/behaviors.py b/umbra/behaviors.py index d7f58ff..26d76af 100644 --- a/umbra/behaviors.py +++ b/umbra/behaviors.py @@ -1,7 +1,7 @@ # vim: set sw=4 et: import json -from itertools import chain +import itertools import os import re import logging @@ -18,7 +18,7 @@ class Behavior: def behaviors(): if Behavior._behaviors is None: behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) - behavior_files = chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)]) + behavior_files = itertools.chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)]) Behavior._behaviors = [] for file_name in behavior_files: Behavior.logger.debug("reading behavior file {}".format(file_name)) @@ -46,10 +46,9 @@ class Behavior: Behavior._default_behavior = behavior return Behavior._default_behavior - def __init__(self, url, websock, command_id): + def __init__(self, url, umbra_worker): self.url = url - self.websock = websock - self.command_id = command_id + self.umbra_worker = umbra_worker self.script_finished = False self.waiting_result_msg_ids = [] @@ -65,18 +64,12 @@ class Behavior: if self.active_behavior is None: self.active_behavior = Behavior.default_behavior() - msg = json.dumps(dict(method="Runtime.evaluate", params={"expression": self.active_behavior['script']}, id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) - self.websock.send(msg) - + self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": self.active_behavior['script']}) self.notify_of_activity() def is_finished(self): - msg_id = next(self.command_id) + msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"}) self.waiting_result_msg_ids.append(msg_id) - msg = json.dumps(dict(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"}, id=msg_id)) - self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) - self.websock.send(msg) request_idle_timeout_sec = 30 if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior: diff --git a/umbra/umbra.py b/umbra/umbra.py index cda8ed0..2d141f1 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -2,13 +2,15 @@ # vim: set sw=4 et: import logging -import os, sys, argparse +import sys # logging.basicConfig(stream=sys.stdout, level=logging.INFO, logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -from json import dumps, loads +import os +import argparse +import json import urllib.request, urllib.error, urllib.parse import itertools import websocket @@ -17,7 +19,7 @@ import uuid import threading import subprocess import signal -from kombu import Connection, Exchange, Queue +import kombu import tempfile from umbra.behaviors import Behavior @@ -39,6 +41,7 @@ class UmbraWorker: self.chrome_wait = chrome_wait self.client_id = client_id self._behavior = None + self.websock = None def browse_page(self, url, url_metadata): """Synchronously browse a page and run behaviors.""" @@ -47,17 +50,17 @@ class UmbraWorker: self.url_metadata = url_metadata with tempfile.TemporaryDirectory() as user_data_dir: with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: - websock = websocket.WebSocketApp(websocket_url, + self.websock = websocket.WebSocketApp(websocket_url, on_open=self._visit_page, on_message=self._handle_message) - websock_thread = threading.Thread(target=websock.run_forever, kwargs={'ping_timeout':0.5}) + websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5}) websock_thread.start() start = time.time() while True: time.sleep(0.5) - if not websock or not websock.sock or not websock.sock.connected: - self.logger.error("websocket closed, did chrome die??? {}".format(websock)) + if not self.websock or not self.websock.sock or not self.websock.sock.connected: + self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) break elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS: self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url)) @@ -67,41 +70,32 @@ class UmbraWorker: break try: - websock.close() + self.websock.close() except BaseException as e: - self.logger.error("exception closing websocket {} - {}".format(websock, e)) + self.logger.error("exception closing websocket {} - {}".format(self.websock, e)) websock_thread.join() + def send_to_chrome(self, **kwargs): + msg_id = next(self.command_id) + kwargs['id'] = msg_id + msg = json.dumps(kwargs) + self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) + self.websock.send(msg) + return msg_id + def _visit_page(self, websock): - msg = dumps(dict(method="Network.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Page.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Console.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Debugger.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Runtime.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + self.send_to_chrome(method="Network.enable") + self.send_to_chrome(method="Page.enable") + self.send_to_chrome(method="Console.enable") + self.send_to_chrome(method="Debugger.enable") + self.send_to_chrome(method="Runtime.enable") # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused" - msg = dumps(dict(method="Debugger.setBreakpointByUrl", id=next(self.command_id), params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}) - msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url})) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + # navigate to the page! + self.send_to_chrome(method="Page.navigate", params={"url": self.url}) # XXX should this class know anything about amqp? or should it # delegate this back up to the Umbra class? @@ -118,7 +112,7 @@ class UmbraWorker: def _handle_message(self, websock, message): # self.logger.debug("message from {} - {}".format(websock.url, message[:95])) # self.logger.debug("message from {} - {}".format(websock.url, message)) - message = loads(message) + message = json.loads(message) if "method" in message and message["method"] == "Network.requestWillBeSent": if self._behavior: self._behavior.notify_of_activity() @@ -129,7 +123,7 @@ class UmbraWorker: elif "method" in message and message["method"] == "Page.loadEventFired": if self._behavior is None: self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) - self._behavior = Behavior(self.url, websock, self.command_id) + self._behavior = Behavior(self.url, self) self._behavior.start() else: self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message)) @@ -145,14 +139,10 @@ class UmbraWorker: scriptId = message['params']['callFrames'][0]['location']['scriptId'] # replace script - msg = dumps(dict(method="Debugger.setScriptSource", id=next(self.command_id), params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"}) # resume execution - msg = dumps(dict(method="Debugger.resume", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + self.send_to_chrome(method="Debugger.resume") elif "result" in message: if self._behavior and self._behavior.is_waiting_on_result(message['id']): self._behavior.notify_of_result(message) @@ -217,10 +207,10 @@ class Umbra: def _consume_amqp(self): while not self.amqp_stop.is_set(): try: - self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) - url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) + self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True) + url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange) self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) - with Connection(self.amqp_url) as conn: + with kombu.Connection(self.amqp_url) as conn: if self.producer_lock is None: self.producer_lock = threading.Lock() with self.producer_lock: @@ -273,7 +263,7 @@ class Chrome: def fetch_debugging_json(): raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() json = raw_json.decode('utf-8') - return loads(json) + return json.loads(json) # returns websocket url to chrome window with about:blank loaded def __enter__(self): @@ -294,7 +284,7 @@ class Chrome: while True: try: raw_json = urllib.request.urlopen(json_url).read() - all_debug_info = loads(raw_json.decode('utf-8')) + all_debug_info = json.loads(raw_json.decode('utf-8')) debug_info = [x for x in all_debug_info if x['url'] == 'about:blank'] if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: