diff --git a/umbra/behaviors.d/default.js b/umbra/behaviors.d/default.js new file mode 100644 index 0000000..dc66819 --- /dev/null +++ b/umbra/behaviors.d/default.js @@ -0,0 +1,37 @@ +// {"request_idle_timeout_sec":10} +// +// vim:set sw=8 et: +// +// Scrolls to the bottom of the page. That's it at the moment. +// + +var umbraState = {'idleSince':null}; +var umbraFinished = false; +var umbraIntervalFunc = function() { + var needToScroll = (window.scrollY + window.innerHeight < document.documentElement.scrollHeight); + + // console.log('intervalFunc umbraState.idleSince=' + umbraState.idleSince + ' needToScroll=' + needToScroll + ' window.scrollY=' + window.scrollY + ' window.innerHeight=' + window.innerHeight + ' document.documentElement.scrollHeight=' + document.documentElement.scrollHeight); + if (needToScroll) { + window.scrollBy(0, 200); + umbraState.idleSince = null; + } else if (umbraState.idleSince == null) { + umbraState.idleSince = Date.now(); + } +} + +// If we haven't had anything to do (scrolled, clicked, etc) in this amount of +// time, then we consider ourselves finished with the page. +var UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC = 10; + +// Called from outside of this script. +var umbraBehaviorFinished = function() { + if (umbraState.idleSince != null) { + var idleTimeMs = Date.now() - umbraState.idleSince; + if (idleTimeMs / 1000 > UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC) { + return true; + } + } + return false; +} + +var umbraIntervalId = setInterval(umbraIntervalFunc, 100); diff --git a/umbra/behaviors.d/facebook.js b/umbra/behaviors.d/facebook.js index 2a35d55..10adbef 100644 --- a/umbra/behaviors.d/facebook.js +++ b/umbra/behaviors.d/facebook.js @@ -1,7 +1,9 @@ -//^https?://(?:www\.)?facebook.com/.*$ +// {"url_regex":"^https?://(?:www\\.)?facebook\\.com/.*$", "request_idle_timeout_sec":30} +// // vim:set sw=8 et: +// -var aboveBelowOrOnScreen = function(e) { +var umbraAboveBelowOrOnScreen = function(e) { var eTop = e.getBoundingClientRect().top; if (eTop < window.scrollY) { return -1; // above @@ -13,65 +15,94 @@ var aboveBelowOrOnScreen = function(e) { } // comments - 'a.UFIPagerLink > span, a.UFIPagerLink, span.UFIReplySocialSentenceLinkText' -var THINGS_TO_CLICK_SELECTOR = 'a[href^="/browse/likes"], *[rel="theater"]'; -var alreadyClicked = {}; -var intervalId; +var UMBRA_THINGS_TO_CLICK_SELECTOR = 'a[href^="/browse/likes"], *[rel="theater"]'; +var umbraAlreadyClicked = {}; +var umbraState = {'idleSince':null,'expectingSomething':null}; -var intervalFunc = function() { - var closeButton = document.querySelector('a[title="Close"]'); - if (closeButton) { - console.log("clicking close button " + closeButton.outerHTML); - closeButton.click(); - return; +var umbraIntervalFunc = function() { + var closeButtons = document.querySelectorAll('a[title="Close"], a.closeTheater'); + for (var i = 0; i < closeButtons.length; i++) { + // XXX closeTheater buttons stick around in the dom after closing, clientWidth>0 is one way to check if they're visible + if (closeButtons[i].clientWidth > 0) { + if (umbraState.expectingSomething == 'closeButton') { + console.log("found expected close button, clicking on it " + closeButtons[i].outerHTML); + umbraState.expectingSomething = null; + } else { + console.warn("found UNexpected close button, umbraState.expectingSomething=" + umbraState.expectingSomething + " ... clicking on it " + closeButtons[i].outerHTML); + } + closeButtons[i].click(); + return; + } } - var closeTheaterButton = document.querySelector('a.closeTheater'); - if (closeTheaterButton && closeTheaterButton.offsetWidth > 0) { - console.log("clicking close button " + closeTheaterButton.outerHTML); - closeTheaterButton.click(); + if (umbraState.expectingSomething == 'closeButton') { + console.log("waiting for close button, haven't seen it yet"); return; } - var thingsToClick = document.querySelectorAll(THINGS_TO_CLICK_SELECTOR); + var thingsToClick = document.querySelectorAll(UMBRA_THINGS_TO_CLICK_SELECTOR); var clickedSomething = false; var somethingLeftBelow = false; + var somethingLeftAbove = false; var missedAbove = 0; for (var i = 0; i < thingsToClick.length; i++) { var target = thingsToClick[i]; - if (!(target in alreadyClicked)) { - var where = aboveBelowOrOnScreen(target); + if (!(target in umbraAlreadyClicked)) { + var where = umbraAboveBelowOrOnScreen(target); if (where == 0) { // on screen // var pos = target.getBoundingClientRect().top; // window.scrollTo(0, target.getBoundingClientRect().top - 100); console.log("clicking at " + target.getBoundingClientRect().top + " on " + target.outerHTML); - if(target.click != undefined) { + if (target.click != undefined) { + umbraState.expectingSomething = 'closeButton'; target.click(); } target.style.border = '1px solid #0a0'; - alreadyClicked[target] = true; + umbraAlreadyClicked[target] = true; clickedSomething = true; + umbraState.idleSince = null; break; } else if (where > 0) { somethingLeftBelow = true; - } else { - missedAbove++; + } else if (where < 0) { + somethingLeftAbove = true; } } } - if (missedAbove > 0) { - console.log("somehow missed " + missedAbove + " click targets above"); - } - if (!clickedSomething) { - if (somethingLeftBelow) { + if (somethingLeftAbove) { + console.log("scrolling UP because everything on this screen has been clicked but we missed something above"); + window.scrollBy(0, -200); + umbraState.idleSince = null; + } else if (somethingLeftBelow) { console.log("scrolling because everything on this screen has been clicked but there's more below document.body.clientHeight=" + document.body.clientHeight); window.scrollBy(0, 200); - } else if (window.scrollY + window.innerHeight + 10 < document.body.clientHeight) { + umbraState.idleSince = null; + } else if (window.scrollY + window.innerHeight < document.documentElement.scrollHeight) { console.log("scrolling because we're not to the bottom yet document.body.clientHeight=" + document.body.clientHeight); window.scrollBy(0, 200); - } + umbraState.idleSince = null; + } else if (umbraState.idleSince == null) { + umbraState.idleSince = Date.now(); + } } } -var intervalId = setInterval(intervalFunc, 200); +// If we haven't had anything to do (scrolled, clicked, etc) in this amount of +// time, then we consider ourselves finished with the page. +var UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC = 10; + +// Called from outside of this script. +var umbraBehaviorFinished = function() { + if (umbraState.idleSince != null) { + var idleTimeMs = Date.now() - umbraState.idleSince; + if (idleTimeMs / 1000 > UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC) { + return true; + } + } + return false; +} + + +var umbraIntervalId = setInterval(umbraIntervalFunc, 200); diff --git a/umbra/behaviors.d/flickr.js b/umbra/behaviors.d/flickr.js index c5b6707..65787a9 100644 --- a/umbra/behaviors.d/flickr.js +++ b/umbra/behaviors.d/flickr.js @@ -1,17 +1,20 @@ -//^https?://(?:www\.)?flickr.com/.*$ +// {"url_regex":"^https?://(?:www\\.)?flickr\\.com/.*$", "request_idle_timeout_sec":10} +// +// vim:set sw=8 et: +// setInterval(function() { window.scrollBy(0,50); }, 100); setTimeout(function() { - a = document.evaluate("//a[contains(@class, 'sn-ico-slideshow')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null ); - f = a.iterateNext(); - f.click();}, -5000); - -setTimeout(function() { - a = document.evaluate("//a[contains(@data-track, 'photo-click')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null ); - setInterval(function() { + a = document.evaluate("//a[contains(@class, 'sn-ico-slideshow')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null ); f = a.iterateNext(); f.click(); - }, 5000); +}, 5000); + +setTimeout(function() { + a = document.evaluate("//a[contains(@data-track, 'photo-click')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null ); + setInterval(function() { + f = a.iterateNext(); + f.click(); + }, 5000); }, 5000); diff --git a/umbra/behaviors.d/vimeo.js b/umbra/behaviors.d/vimeo.js index dc779c8..53860f2 100644 --- a/umbra/behaviors.d/vimeo.js +++ b/umbra/behaviors.d/vimeo.js @@ -1,7 +1,27 @@ -//^https?://(?:www\.)?vimeo.com/.*$ +// {"url_regex":"^https?://(?:www\\.)?vimeo\\.com/.*$", "request_idle_timeout_sec":10} +// +// vim:set sw=8 et: +// -var videoElements = document.getElementsByTagName('video'); -for (var i = 0; i < videoElements.length; i++) { - videoElements[i].play(); +var umbraState = {'idleSince':null}; +var umbraVideoElements = document.getElementsByTagName('video'); +for (var i = 0; i < umbraVideoElements.length; i++) { + umbraVideoElements[i].play(); +} +umbraState.idleSince = Date.now(); + +// If we haven't had anything to do (scrolled, clicked, etc) in this amount of +// time, then we consider ourselves finished with the page. +var UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC = 10; + +// Called from outside of this script. +var umbraBehaviorFinished = function() { + if (umbraState.idleSince != null) { + var idleTimeMs = Date.now() - umbraState.idleSince; + if (idleTimeMs / 1000 > UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC) { + return true; + } + } + return false; } diff --git a/umbra/behaviors.py b/umbra/behaviors.py index 4518f7f..c8acfbf 100644 --- a/umbra/behaviors.py +++ b/umbra/behaviors.py @@ -1,25 +1,106 @@ # vim: set sw=4 et: -from json import dumps, load -from time import sleep -from itertools import chain -import os, re +import json +import itertools +import os +import re import logging +import time +import sys + +class Behavior: + logger = logging.getLogger('umbra.behaviors.Behavior') + + _behaviors = None + _default_behavior = None + + @staticmethod + def behaviors(): + if Behavior._behaviors is None: + behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) + behavior_files = itertools.chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)]) + Behavior._behaviors = [] + for file_name in behavior_files: + Behavior.logger.debug("reading behavior file {}".format(file_name)) + script = open(file_name, encoding='utf-8').read() + first_line = script[:script.find('\n')] + behavior = json.loads(first_line[2:].strip()) + behavior['script'] = script + behavior['file'] = file_name + Behavior._behaviors.append(behavior) + Behavior.logger.info("will run behaviors from {} on urls matching {}".format(file_name, behavior['url_regex'])) + + return Behavior._behaviors + + @staticmethod + def default_behavior(): + if Behavior._default_behavior is None: + behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) + file_name = os.path.join(behaviors_directory, 'default.js') + Behavior.logger.debug("reading default behavior file {}".format(file_name)) + script = open(file_name, encoding='utf-8').read() + first_line = script[:script.find('\n')] + behavior = json.loads(first_line[2:].strip()) + behavior['script'] = script + behavior['file'] = file_name + Behavior._default_behavior = behavior + return Behavior._default_behavior + + def __init__(self, url, umbra_worker): + self.url = url + self.umbra_worker = umbra_worker + + self.script_finished = False + self.waiting_result_msg_ids = [] + self.active_behavior = None + self.last_activity = time.time() + + def start(self): + for behavior in Behavior.behaviors(): + if re.match(behavior['url_regex'], self.url): + self.active_behavior = behavior + break + + if self.active_behavior is None: + self.active_behavior = Behavior.default_behavior() + + self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": self.active_behavior['script']}) + self.notify_of_activity() + + def is_finished(self): + msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"}) + self.waiting_result_msg_ids.append(msg_id) + + request_idle_timeout_sec = 30 + if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior: + request_idle_timeout_sec = self.active_behavior['request_idle_timeout_sec'] + idle_time = time.time() - self.last_activity + + return self.script_finished and idle_time > request_idle_timeout_sec + + def is_waiting_on_result(self, msg_id): + return msg_id in self.waiting_result_msg_ids + + def notify_of_result(self, chrome_message): + # {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}} + # {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}} + self.waiting_result_msg_ids.remove(chrome_message['id']) + if ('result' in chrome_message + and not ('wasThrown' in chrome_message['result'] and chrome_message['result']['wasThrown']) + and 'result' in chrome_message['result'] + and type(chrome_message['result']['result']['value']) == bool): + self.script_finished = chrome_message['result']['result']['value'] + else: + self.logger.error("chrome message doesn't look like a boolean result! {}".format(chrome_message)) + + def notify_of_activity(self): + self.last_activity = time.time() + +if __name__ == "__main__": + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + logger = logging.getLogger('umbra.behaviors') + logger.info("custom behaviors: {}".format(Behavior.behaviors())) + logger.info("default behavior: {}".format(Behavior.default_behavior())) -logger = logging.getLogger('behaviors') -behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d']) -behavior_files = chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js')] for dir, dirs, files in os.walk(behaviors_directory)]) -behaviors = [] -for file_name in behavior_files: - logger.debug("reading behavior file {}".format(file_name)) - lines = open(file_name).readlines() - pattern, script = lines[0][2:].strip(), ''.join(lines[1:]) - behaviors.append({'url_regex': pattern, 'script': script, 'file': file_name}) - logger.info("will run behaviors from {} to urls matching {}".format(file_name, pattern)) -def execute(url, websock, command_id): - for behavior in behaviors: - if re.match(behavior['url_regex'], url): - msg = dumps(dict(method="Runtime.evaluate", params={"expression": behavior['script']}, id=next(command_id))) - logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) diff --git a/umbra/umbra.py b/umbra/umbra.py index 1cc0461..e197921 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -2,107 +2,105 @@ # vim: set sw=4 et: import logging -import os, sys, argparse +import sys + # logging.basicConfig(stream=sys.stdout, level=logging.INFO, logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -from json import dumps, loads +import os +import argparse +import json import urllib.request, urllib.error, urllib.parse -from itertools import count +import itertools import websocket import time import uuid import threading import subprocess import signal -from kombu import Connection, Exchange, Queue +import kombu import tempfile -from umbra import behaviors +from umbra.behaviors import Behavior class UmbraWorker: + """Runs chrome/chromium to synchronously browse one page at a time using + worker.browse_page(). Currently the implementation starts up a new instance + of chrome for each page browsed, always on the same debug port. (In the + future, it may keep the browser running indefinitely.)""" logger = logging.getLogger('umbra.UmbraWorker') + HARD_TIMEOUT_SECONDS = 20 * 60 + def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'): - self.command_id = count(1) + self.command_id = itertools.count(1) self.lock = threading.Lock() self.umbra = umbra self.chrome_port = chrome_port self.chrome_exe = chrome_exe self.chrome_wait = chrome_wait self.client_id = client_id - self.page_done = threading.Event() - self.idle_timer = None - self.hard_stop_timer = None + self._behavior = None + self.websock = None def browse_page(self, url, url_metadata): + """Synchronously browse a page and run behaviors.""" with self.lock: self.url = url self.url_metadata = url_metadata with tempfile.TemporaryDirectory() as user_data_dir: with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: - websock = websocket.WebSocketApp(websocket_url, - on_open=self.visit_page, on_message=self.handle_message) - websock_thread = threading.Thread(target=websock.run_forever) + self.websock = websocket.WebSocketApp(websocket_url, + on_open=self._visit_page, + on_message=self._handle_message) + websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5}) websock_thread.start() + start = time.time() - self.page_done.clear() - self._reset_idle_timer() - while not self.page_done.is_set(): + while True: time.sleep(0.5) + if not self.websock or not self.websock.sock or not self.websock.sock.connected: + self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) + break + elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS: + self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url)) + break + elif self._behavior != None and self._behavior.is_finished(): + self.logger.info("finished browsing page according to behavior url={}".format(self.url)) + break - websock.close() - self.idle_timer = None + try: + self.websock.close() + except BaseException as e: + self.logger.error("exception closing websocket {} - {}".format(self.websock, e)) - def _reset_idle_timer(self): - def _idle_timeout(): - self.logger.debug('idle timeout') - self.page_done.set() - if self.hard_stop_timer: - self.hard_stop_timer.cancel() + websock_thread.join() + self._behavior = None - def _hard_timeout(): - self.logger.debug('hard timeout') - self.page_done.set() - if self.idle_timer: - self.idle_timer.cancel() + def send_to_chrome(self, **kwargs): + msg_id = next(self.command_id) + kwargs['id'] = msg_id + msg = json.dumps(kwargs) + self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) + self.websock.send(msg) + return msg_id - if self.idle_timer: - self.idle_timer.cancel() + def _visit_page(self, websock): + self.send_to_chrome(method="Network.enable") + self.send_to_chrome(method="Page.enable") + self.send_to_chrome(method="Console.enable") + self.send_to_chrome(method="Debugger.enable") + self.send_to_chrome(method="Runtime.enable") - self.idle_timer = threading.Timer(30, _idle_timeout) - self.idle_timer.start() + # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused" + self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}) - if not self.hard_stop_timer: # 15 minutes is as long as we should give 1 page - self.hard_stop_timer = threading.Timer(900, _hard_timeout) - self.hard_stop_timer.start() + # navigate to the page! + self.send_to_chrome(method="Page.navigate", params={"url": self.url}) - def visit_page(self, websock): - msg = dumps(dict(method="Network.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Page.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Console.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Debugger.enable", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Debugger.setBreakpointByUrl", id=next(self.command_id), params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url})) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) - - def send_request_to_amqp(self, chrome_msg): + # XXX should this class know anything about amqp? or should it + # delegate this back up to the Umbra class? + def _send_request_to_amqp(self, chrome_msg): payload = chrome_msg['params']['request'] payload['parentUrl'] = self.url payload['parentUrlMetadata'] = self.url_metadata @@ -112,37 +110,82 @@ class UmbraWorker: exchange=self.umbra.umbra_exchange, routing_key=self.client_id) - def handle_message(self, websock, message): + def _handle_message(self, websock, message): # self.logger.debug("message from {} - {}".format(websock.url, message[:95])) # self.logger.debug("message from {} - {}".format(websock.url, message)) - message = loads(message) + message = json.loads(message) if "method" in message and message["method"] == "Network.requestWillBeSent": - self._reset_idle_timer() + if self._behavior: + self._behavior.notify_of_activity() if not message["params"]["request"]["url"].lower().startswith("data:"): - self.send_request_to_amqp(message) + self._send_request_to_amqp(message) else: self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) elif "method" in message and message["method"] == "Page.loadEventFired": - self.logger.debug("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) - behaviors.execute(self.url, websock, self.command_id) + if self._behavior is None: + self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) + self._behavior = Behavior(self.url, self) + self._behavior.start() + else: + self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message)) elif "method" in message and message["method"] == "Console.messageAdded": - self.logger.debug("{} console {} {}".format(websock.url, + self.logger.debug("{} console.{} {}".format(websock.url, message["params"]["message"]["level"], message["params"]["message"]["text"])) elif "method" in message and message["method"] == "Debugger.paused": + # We hit the breakpoint set in visit_page. Get rid of google + # analytics script! + self.logger.debug("debugger paused! message={}".format(message)) scriptId = message['params']['callFrames'][0]['location']['scriptId'] - msg = dumps(dict(method="Debugger.setScriptSource", id=next(self.command_id), params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + # replace script + self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"}) - msg = dumps(dict(method="Debugger.resume", id=next(self.command_id))) - self.logger.debug('sending message to {}: {}'.format(websock, msg)) - websock.send(msg) + # resume execution + self.send_to_chrome(method="Debugger.resume") + elif "result" in message: + if self._behavior and self._behavior.is_waiting_on_result(message['id']): + self._behavior.notify_of_result(message) + # elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"): + # pass + # elif "method" in message: + # self.logger.debug("{} {}".format(message["method"], message)) + # else: + # self.logger.debug("[no-method] {}".format(message)) class Umbra: + """Consumes amqp messages representing requests to browse urls, from the + amqp queue "urls" on exchange "umbra". Incoming amqp message is a json + object with 3 attributes: + { + "clientId": "umbra.client.123", + "url": "http://example.com/my_fancy_page", + "metadata": {"arbitrary":"fields", "etc":4} + } + + "url" is the url to browse. + + "clientId" uniquely idenfities the client of + umbra. Umbra uses the clientId to direct information via amqp back to the + client. It sends this information on that same "umbra" exchange, and uses + the clientId as the amqp routing key. + + Each url requested in the browser is published to amqp this way. The + outgoing amqp message is a json object: + + { + 'url': 'http://example.com/images/embedded_thing.jpg', + 'method': 'GET', + 'headers': {'User-Agent': '...', 'Accept': '...'} + 'parentUrl': 'http://example.com/my_fancy_page', + 'parentUrlMetadata': {"arbitrary":"fields", "etc":4}, + } + + POST requests have an additional field, postData. + """ + logger = logging.getLogger('umbra.Umbra') def __init__(self, amqp_url, chrome_exe, browser_wait): @@ -153,7 +196,7 @@ class Umbra: self.producer_lock = None self.workers = {} self.workers_lock = threading.Lock() - self.amqp_thread = threading.Thread(target=self.consume_amqp) + self.amqp_thread = threading.Thread(target=self._consume_amqp) self.amqp_stop = threading.Event() self.amqp_thread.start() @@ -162,18 +205,18 @@ class Umbra: self.amqp_stop.set() self.amqp_thread.join() - def consume_amqp(self): + def _consume_amqp(self): while not self.amqp_stop.is_set(): try: - self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) - url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) + self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True) + url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange) self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) - with Connection(self.amqp_url) as conn: + with kombu.Connection(self.amqp_url) as conn: if self.producer_lock is None: self.producer_lock = threading.Lock() with self.producer_lock: self.producer = conn.Producer(serializer='json') - with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: + with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer: import socket while not self.amqp_stop.is_set(): try: @@ -184,7 +227,12 @@ class Umbra: self.logger.error("amqp exception {}".format(e)) self.logger.error("attempting to reopen amqp connection") - def fetch_url(self, body, message): + def _browse_page_requested(self, body, message): + """First waits for the UmbraWorker for the client body['clientId'] to + become available, or creates a new worker if this clientId has not been + served before. Starts a worker browsing the page asynchronously, then + acknowledges the amqp message, which lets the server know it can be + removed from the queue.""" client_id = body['clientId'] with self.workers_lock: if not client_id in self.workers: @@ -216,7 +264,7 @@ class Chrome: def fetch_debugging_json(): raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() json = raw_json.decode('utf-8') - return loads(json) + return json.loads(json) # returns websocket url to chrome window with about:blank loaded def __enter__(self): @@ -237,7 +285,7 @@ class Chrome: while True: try: raw_json = urllib.request.urlopen(json_url).read() - all_debug_info = loads(raw_json.decode('utf-8')) + all_debug_info = json.loads(raw_json.decode('utf-8')) debug_info = [x for x in all_debug_info if x['url'] == 'about:blank'] if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: