diff --git a/umbra/behaviors.d/default.js b/umbra/behaviors.d/default.js new file mode 100644 index 0000000..41e0220 --- /dev/null +++ b/umbra/behaviors.d/default.js @@ -0,0 +1,42 @@ +// vim:set sw=8 et: + +// STATES = ['NASCENT', 'NEED_SCROLL', 'WAITING', 'FINISHED'] + +// var transition = prepareTransition(state); +// if (transition.callback) { +// newState.callback(state, newState); +// } +// state = newState; + +// if (state.status === 'NASCENT') { +// } else if (state.status == 'NEED_SCROLL') { +// } else if (state.status == 'FINISHED') { + +var UMBRA_FINISH_AFTER_IDLE_TIME = 10 * 1000; // ms +var umbraState = {'idleSince':null}; +var umbraFinished = false; +var umbraIntervalFunc = function() { + // var needToScroll = (window.scrollY + window.innerHeight + 10 < document.body.clientHeight); + // var needToScroll = (document.documentElement.scrollTop + document.documentElement.clientHeight < document.documentElement.scrollHeight); + var needToScroll = (window.scrollY + window.innerHeight < document.documentElement.scrollHeight); + + // console.log('intervalFunc umbraState.idleSince=' + umbraState.idleSince + ' needToScroll=' + needToScroll + ' window.scrollY=' + window.scrollY + ' window.innerHeight=' + window.innerHeight + ' document.documentElement.scrollHeight=' + document.documentElement.scrollHeight); + if (needToScroll) { + window.scrollBy(0, 200); + umbraState.idleSince = null; + } else if (umbraState.idleSince == null) { + umbraState.idleSince = Date.now(); + } +} + +var umbraBehaviorFinished = function() { + if (umbraState.idleSince != null) { + var idleTime = Date.now() - umbraState.idleSince; + if (idleTime > UMBRA_FINISH_AFTER_IDLE_TIME) { + return true; + } + } + return false; +} + +var umbraIntervalId = setInterval(umbraIntervalFunc, 100); diff --git a/umbra/behaviors.d/facebook.js b/umbra/behaviors.d/facebook.js index 2a35d55..b753e2d 100644 --- a/umbra/behaviors.d/facebook.js +++ b/umbra/behaviors.d/facebook.js @@ -1,4 +1,4 @@ -//^https?://(?:www\.)?facebook.com/.*$ +//^https?://(?:www\.)?facebook\.com/.*$ // vim:set sw=8 et: var aboveBelowOrOnScreen = function(e) { diff --git a/umbra/behaviors.d/flickr.js b/umbra/behaviors.d/flickr.js index c5b6707..e03e525 100644 --- a/umbra/behaviors.d/flickr.js +++ b/umbra/behaviors.d/flickr.js @@ -1,4 +1,4 @@ -//^https?://(?:www\.)?flickr.com/.*$ +//^https?://(?:www\.)?flickr\.com/.*$ setInterval(function() { window.scrollBy(0,50); }, 100); diff --git a/umbra/behaviors.py b/umbra/behaviors.py index 4518f7f..bfb6b44 100644 --- a/umbra/behaviors.py +++ b/umbra/behaviors.py @@ -1,25 +1,90 @@ # vim: set sw=4 et: from json import dumps, load -from time import sleep from itertools import chain import os, re import logging +import time + +class Behavior: + logger = logging.getLogger('umbra.behaviors.Behavior') + + _behaviors = None + _default_behavior_script = None + + @staticmethod + def behaviors(): + if Behavior._behaviors is None: + behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) + behavior_files = chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)]) + Behavior._behaviors = [] + for file_name in behavior_files: + Behavior.logger.debug("reading behavior file {}".format(file_name)) + lines = open(file_name).readlines() + pattern, script = lines[0][2:].strip(), ''.join(lines[1:]) + Behavior._behaviors.append({'url_regex': pattern, 'script': script, 'file': file_name}) + Behavior.logger.info("will run behaviors from {} to urls matching {}".format(file_name, pattern)) + + return Behavior._behaviors + + @staticmethod + def default_behavior_script(): + if Behavior._default_behavior_script is None: + behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) + file_name = os.path.join(behaviors_directory, 'default.js') + Behavior.logger.debug("reading default behavior file {}".format(file_name)) + Behavior._default_behavior_script = open(file_name).read() + return Behavior._default_behavior_script + + def __init__(self, url, websock, command_id): + self.url = url + self.websock = websock + self.command_id = command_id + + self.script_finished = False + self.waiting_result_msg_ids = [] + + def start(self): + self.notify_of_activity() + + script_started = False + for behavior in Behavior.behaviors(): + if re.match(behavior['url_regex'], self.url): + msg = dumps(dict(method="Runtime.evaluate", params={"expression": behavior['script']}, id=next(self.command_id))) + self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) + self.websock.send(msg) + script_started = True + break + + if not script_started: + msg = dumps(dict(method="Runtime.evaluate", params={"expression": Behavior.default_behavior_script()}, id=next(self.command_id))) + self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) + self.websock.send(msg) + + def is_finished(self): + msg_id = next(self.command_id) + self.waiting_result_msg_ids.append(msg_id) + msg = dumps(dict(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"}, id=msg_id)) + self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) + self.websock.send(msg) + + return self.script_finished # XXX and idle_time > behavior_specified_idle_timeout + + def is_waiting_on_result(self, msg_id): + return msg_id in self.waiting_result_msg_ids + + def notify_of_result(self, chrome_message): + # {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}} + self.waiting_result_msg_ids.remove(chrome_message['id']) + if ('result' in chrome_message + and not chrome_message['result']['wasThrown'] + and 'result' in chrome_message['result'] + and type(chrome_message['result']['result']['value']) == bool): + self.script_finished = chrome_message['result']['result']['value'] + else: + self.logger.error("chrome message doesn't look like a boolean result! {}".format(chrome_message)) + + def notify_of_activity(self): + self.last_activity = time.time() -logger = logging.getLogger('behaviors') -behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) -behavior_files = chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js')] for dir, dirs, files in os.walk(behaviors_directory)]) -behaviors = [] -for file_name in behavior_files: - logger.debug("reading behavior file {}".format(file_name)) - lines = open(file_name).readlines() - pattern, script = lines[0][2:].strip(), ''.join(lines[1:]) - behaviors.append({'url_regex': pattern, 'script': script, 'file': file_name}) - logger.info("will run behaviors from {} to urls matching {}".format(file_name, pattern)) -def execute(url, websock, command_id): - for behavior in behaviors: - if re.match(behavior['url_regex'], url): - msg = dumps(dict(method="Runtime.evaluate", params={"expression": behavior['script']}, id=next(command_id))) - logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) diff --git a/umbra/umbra.py b/umbra/umbra.py index 1cc0461..cda8ed0 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -3,13 +3,14 @@ import logging import os, sys, argparse + # logging.basicConfig(stream=sys.stdout, level=logging.INFO, logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') from json import dumps, loads import urllib.request, urllib.error, urllib.parse -from itertools import count +import itertools import websocket import time import uuid @@ -18,66 +19,61 @@ import subprocess import signal from kombu import Connection, Exchange, Queue import tempfile -from umbra import behaviors +from umbra.behaviors import Behavior class UmbraWorker: + """Runs chrome/chromium to synchronously browse one page at a time using + worker.browse_page(). Currently the implementation starts up a new instance + of chrome for each page browsed, always on the same debug port. (In the + future, it may keep the browser running indefinitely.)""" logger = logging.getLogger('umbra.UmbraWorker') + HARD_TIMEOUT_SECONDS = 60 * 15 + def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'): - self.command_id = count(1) + self.command_id = itertools.count(1) self.lock = threading.Lock() self.umbra = umbra self.chrome_port = chrome_port self.chrome_exe = chrome_exe self.chrome_wait = chrome_wait self.client_id = client_id - self.page_done = threading.Event() - self.idle_timer = None - self.hard_stop_timer = None + self._behavior = None def browse_page(self, url, url_metadata): + """Synchronously browse a page and run behaviors.""" with self.lock: self.url = url self.url_metadata = url_metadata with tempfile.TemporaryDirectory() as user_data_dir: with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: websock = websocket.WebSocketApp(websocket_url, - on_open=self.visit_page, on_message=self.handle_message) - websock_thread = threading.Thread(target=websock.run_forever) + on_open=self._visit_page, + on_message=self._handle_message) + websock_thread = threading.Thread(target=websock.run_forever, kwargs={'ping_timeout':0.5}) websock_thread.start() + start = time.time() - self.page_done.clear() - self._reset_idle_timer() - while not self.page_done.is_set(): + while True: time.sleep(0.5) + if not websock or not websock.sock or not websock.sock.connected: + self.logger.error("websocket closed, did chrome die??? {}".format(websock)) + break + elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS: + self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url)) + break + elif self._behavior != None and self._behavior.is_finished(): + self.logger.info("finished browsing page according to behavior url={}".format(self.url)) + break - websock.close() - self.idle_timer = None + try: + websock.close() + except BaseException as e: + self.logger.error("exception closing websocket {} - {}".format(websock, e)) - def _reset_idle_timer(self): - def _idle_timeout(): - self.logger.debug('idle timeout') - self.page_done.set() - if self.hard_stop_timer: - self.hard_stop_timer.cancel() + websock_thread.join() - def _hard_timeout(): - self.logger.debug('hard timeout') - self.page_done.set() - if self.idle_timer: - self.idle_timer.cancel() - - if self.idle_timer: - self.idle_timer.cancel() - - self.idle_timer = threading.Timer(30, _idle_timeout) - self.idle_timer.start() - - if not self.hard_stop_timer: # 15 minutes is as long as we should give 1 page - self.hard_stop_timer = threading.Timer(900, _hard_timeout) - self.hard_stop_timer.start() - - def visit_page(self, websock): + def _visit_page(self, websock): msg = dumps(dict(method="Network.enable", id=next(self.command_id))) self.logger.debug('sending message to {}: {}'.format(websock, msg)) websock.send(msg) @@ -94,6 +90,11 @@ class UmbraWorker: self.logger.debug('sending message to {}: {}'.format(websock, msg)) websock.send(msg) + msg = dumps(dict(method="Runtime.enable", id=next(self.command_id))) + self.logger.debug('sending message to {}: {}'.format(websock, msg)) + websock.send(msg) + + # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused" msg = dumps(dict(method="Debugger.setBreakpointByUrl", id=next(self.command_id), params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})) self.logger.debug('sending message to {}: {}'.format(websock, msg)) websock.send(msg) @@ -102,7 +103,9 @@ class UmbraWorker: self.logger.debug('sending message to {}: {}'.format(websock, msg)) websock.send(msg) - def send_request_to_amqp(self, chrome_msg): + # XXX should this class know anything about amqp? or should it + # delegate this back up to the Umbra class? + def _send_request_to_amqp(self, chrome_msg): payload = chrome_msg['params']['request'] payload['parentUrl'] = self.url payload['parentUrlMetadata'] = self.url_metadata @@ -112,37 +115,86 @@ class UmbraWorker: exchange=self.umbra.umbra_exchange, routing_key=self.client_id) - def handle_message(self, websock, message): + def _handle_message(self, websock, message): # self.logger.debug("message from {} - {}".format(websock.url, message[:95])) # self.logger.debug("message from {} - {}".format(websock.url, message)) message = loads(message) if "method" in message and message["method"] == "Network.requestWillBeSent": - self._reset_idle_timer() + if self._behavior: + self._behavior.notify_of_activity() if not message["params"]["request"]["url"].lower().startswith("data:"): - self.send_request_to_amqp(message) + self._send_request_to_amqp(message) else: self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) elif "method" in message and message["method"] == "Page.loadEventFired": - self.logger.debug("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) - behaviors.execute(self.url, websock, self.command_id) + if self._behavior is None: + self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) + self._behavior = Behavior(self.url, websock, self.command_id) + self._behavior.start() + else: + self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message)) elif "method" in message and message["method"] == "Console.messageAdded": self.logger.debug("{} console {} {}".format(websock.url, message["params"]["message"]["level"], message["params"]["message"]["text"])) elif "method" in message and message["method"] == "Debugger.paused": + # We hit the breakpoint set in visit_page. Get rid of google + # analytics script! + self.logger.debug("debugger paused! message={}".format(message)) scriptId = message['params']['callFrames'][0]['location']['scriptId'] + # replace script msg = dumps(dict(method="Debugger.setScriptSource", id=next(self.command_id), params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})) self.logger.debug('sending message to {}: {}'.format(websock, msg)) websock.send(msg) + # resume execution msg = dumps(dict(method="Debugger.resume", id=next(self.command_id))) self.logger.debug('sending message to {}: {}'.format(websock, msg)) websock.send(msg) + elif "result" in message: + if self._behavior and self._behavior.is_waiting_on_result(message['id']): + self._behavior.notify_of_result(message) + # elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"): + # pass + # elif "method" in message: + # self.logger.debug("{} {}".format(message["method"], message)) + # else: + # self.logger.debug("[no-method] {}".format(message)) class Umbra: + """Consumes amqp messages representing requests to browse urls, from the + amqp queue "urls" on exchange "umbra". Incoming amqp message is a json + object with 3 attributes: + { + "clientId": "umbra.client.123", + "url": "http://example.com/my_fancy_page", + "metadata": {"arbitrary":"fields", "etc":4} + } + + "url" is the url to browse. + + "clientId" uniquely idenfities the client of + umbra. Umbra uses the clientId to direct information via amqp back to the + client. It sends this information on that same "umbra" exchange, and uses + the clientId as the amqp routing key. + + Each url requested in the browser is published to amqp this way. The + outgoing amqp message is a json object: + + { + 'url': 'http://example.com/images/embedded_thing.jpg', + 'method': 'GET', + 'headers': {'User-Agent': '...', 'Accept': '...'} + 'parentUrl': 'http://example.com/my_fancy_page', + 'parentUrlMetadata': {"arbitrary":"fields", "etc":4}, + } + + POST requests have an additional field, postData. + """ + logger = logging.getLogger('umbra.Umbra') def __init__(self, amqp_url, chrome_exe, browser_wait): @@ -153,7 +205,7 @@ class Umbra: self.producer_lock = None self.workers = {} self.workers_lock = threading.Lock() - self.amqp_thread = threading.Thread(target=self.consume_amqp) + self.amqp_thread = threading.Thread(target=self._consume_amqp) self.amqp_stop = threading.Event() self.amqp_thread.start() @@ -162,7 +214,7 @@ class Umbra: self.amqp_stop.set() self.amqp_thread.join() - def consume_amqp(self): + def _consume_amqp(self): while not self.amqp_stop.is_set(): try: self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) @@ -173,7 +225,7 @@ class Umbra: self.producer_lock = threading.Lock() with self.producer_lock: self.producer = conn.Producer(serializer='json') - with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: + with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: import socket while not self.amqp_stop.is_set(): try: @@ -184,7 +236,12 @@ class Umbra: self.logger.error("amqp exception {}".format(e)) self.logger.error("attempting to reopen amqp connection") - def fetch_url(self, body, message): + def _browse_page_requested(self, body, message): + """First waits for the UmbraWorker for the client body['clientId'] to + become available, or creates a new worker if this clientId has not been + served before. Starts a worker browsing the page asynchronously, then + acknowledges the amqp message, which lets the server know it can be + removed from the queue.""" client_id = body['clientId'] with self.workers_lock: if not client_id in self.workers: