Merge pull request #22 from nlevitt/master

whole bunch of changes (already deployed on QA)
This commit is contained in:
Eldon 2014-05-06 09:13:56 -04:00
commit 154eb6f334
6 changed files with 367 additions and 147 deletions

View file

@ -0,0 +1,37 @@
// {"request_idle_timeout_sec":10}
//
// vim:set sw=8 et:
//
// Scrolls to the bottom of the page. That's it at the moment.
//
var umbraState = {'idleSince':null};
var umbraFinished = false;
var umbraIntervalFunc = function() {
var needToScroll = (window.scrollY + window.innerHeight < document.documentElement.scrollHeight);
// console.log('intervalFunc umbraState.idleSince=' + umbraState.idleSince + ' needToScroll=' + needToScroll + ' window.scrollY=' + window.scrollY + ' window.innerHeight=' + window.innerHeight + ' document.documentElement.scrollHeight=' + document.documentElement.scrollHeight);
if (needToScroll) {
window.scrollBy(0, 200);
umbraState.idleSince = null;
} else if (umbraState.idleSince == null) {
umbraState.idleSince = Date.now();
}
}
// If we haven't had anything to do (scrolled, clicked, etc) in this amount of
// time, then we consider ourselves finished with the page.
var UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC = 10;
// Called from outside of this script.
var umbraBehaviorFinished = function() {
if (umbraState.idleSince != null) {
var idleTimeMs = Date.now() - umbraState.idleSince;
if (idleTimeMs / 1000 > UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC) {
return true;
}
}
return false;
}
var umbraIntervalId = setInterval(umbraIntervalFunc, 100);

View file

@ -1,7 +1,9 @@
//^https?://(?:www\.)?facebook.com/.*$ // {"url_regex":"^https?://(?:www\\.)?facebook\\.com/.*$", "request_idle_timeout_sec":30}
//
// vim:set sw=8 et: // vim:set sw=8 et:
//
var aboveBelowOrOnScreen = function(e) { var umbraAboveBelowOrOnScreen = function(e) {
var eTop = e.getBoundingClientRect().top; var eTop = e.getBoundingClientRect().top;
if (eTop < window.scrollY) { if (eTop < window.scrollY) {
return -1; // above return -1; // above
@ -13,65 +15,94 @@ var aboveBelowOrOnScreen = function(e) {
} }
// comments - 'a.UFIPagerLink > span, a.UFIPagerLink, span.UFIReplySocialSentenceLinkText' // comments - 'a.UFIPagerLink > span, a.UFIPagerLink, span.UFIReplySocialSentenceLinkText'
var THINGS_TO_CLICK_SELECTOR = 'a[href^="/browse/likes"], *[rel="theater"]'; var UMBRA_THINGS_TO_CLICK_SELECTOR = 'a[href^="/browse/likes"], *[rel="theater"]';
var alreadyClicked = {}; var umbraAlreadyClicked = {};
var intervalId; var umbraState = {'idleSince':null,'expectingSomething':null};
var intervalFunc = function() { var umbraIntervalFunc = function() {
var closeButton = document.querySelector('a[title="Close"]'); var closeButtons = document.querySelectorAll('a[title="Close"], a.closeTheater');
if (closeButton) { for (var i = 0; i < closeButtons.length; i++) {
console.log("clicking close button " + closeButton.outerHTML); // XXX closeTheater buttons stick around in the dom after closing, clientWidth>0 is one way to check if they're visible
closeButton.click(); if (closeButtons[i].clientWidth > 0) {
if (umbraState.expectingSomething == 'closeButton') {
console.log("found expected close button, clicking on it " + closeButtons[i].outerHTML);
umbraState.expectingSomething = null;
} else {
console.warn("found UNexpected close button, umbraState.expectingSomething=" + umbraState.expectingSomething + " ... clicking on it " + closeButtons[i].outerHTML);
}
closeButtons[i].click();
return; return;
} }
var closeTheaterButton = document.querySelector('a.closeTheater'); }
if (closeTheaterButton && closeTheaterButton.offsetWidth > 0) { if (umbraState.expectingSomething == 'closeButton') {
console.log("clicking close button " + closeTheaterButton.outerHTML); console.log("waiting for close button, haven't seen it yet");
closeTheaterButton.click();
return; return;
} }
var thingsToClick = document.querySelectorAll(THINGS_TO_CLICK_SELECTOR); var thingsToClick = document.querySelectorAll(UMBRA_THINGS_TO_CLICK_SELECTOR);
var clickedSomething = false; var clickedSomething = false;
var somethingLeftBelow = false; var somethingLeftBelow = false;
var somethingLeftAbove = false;
var missedAbove = 0; var missedAbove = 0;
for (var i = 0; i < thingsToClick.length; i++) { for (var i = 0; i < thingsToClick.length; i++) {
var target = thingsToClick[i]; var target = thingsToClick[i];
if (!(target in alreadyClicked)) { if (!(target in umbraAlreadyClicked)) {
var where = aboveBelowOrOnScreen(target); var where = umbraAboveBelowOrOnScreen(target);
if (where == 0) { // on screen if (where == 0) { // on screen
// var pos = target.getBoundingClientRect().top; // var pos = target.getBoundingClientRect().top;
// window.scrollTo(0, target.getBoundingClientRect().top - 100); // window.scrollTo(0, target.getBoundingClientRect().top - 100);
console.log("clicking at " + target.getBoundingClientRect().top + " on " + target.outerHTML); console.log("clicking at " + target.getBoundingClientRect().top + " on " + target.outerHTML);
if(target.click != undefined) { if (target.click != undefined) {
umbraState.expectingSomething = 'closeButton';
target.click(); target.click();
} }
target.style.border = '1px solid #0a0'; target.style.border = '1px solid #0a0';
alreadyClicked[target] = true; umbraAlreadyClicked[target] = true;
clickedSomething = true; clickedSomething = true;
umbraState.idleSince = null;
break; break;
} else if (where > 0) { } else if (where > 0) {
somethingLeftBelow = true; somethingLeftBelow = true;
} else { } else if (where < 0) {
missedAbove++; somethingLeftAbove = true;
} }
} }
} }
if (missedAbove > 0) {
console.log("somehow missed " + missedAbove + " click targets above");
}
if (!clickedSomething) { if (!clickedSomething) {
if (somethingLeftBelow) { if (somethingLeftAbove) {
console.log("scrolling UP because everything on this screen has been clicked but we missed something above");
window.scrollBy(0, -200);
umbraState.idleSince = null;
} else if (somethingLeftBelow) {
console.log("scrolling because everything on this screen has been clicked but there's more below document.body.clientHeight=" + document.body.clientHeight); console.log("scrolling because everything on this screen has been clicked but there's more below document.body.clientHeight=" + document.body.clientHeight);
window.scrollBy(0, 200); window.scrollBy(0, 200);
} else if (window.scrollY + window.innerHeight + 10 < document.body.clientHeight) { umbraState.idleSince = null;
} else if (window.scrollY + window.innerHeight < document.documentElement.scrollHeight) {
console.log("scrolling because we're not to the bottom yet document.body.clientHeight=" + document.body.clientHeight); console.log("scrolling because we're not to the bottom yet document.body.clientHeight=" + document.body.clientHeight);
window.scrollBy(0, 200); window.scrollBy(0, 200);
umbraState.idleSince = null;
} else if (umbraState.idleSince == null) {
umbraState.idleSince = Date.now();
} }
} }
} }
var intervalId = setInterval(intervalFunc, 200); // If we haven't had anything to do (scrolled, clicked, etc) in this amount of
// time, then we consider ourselves finished with the page.
var UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC = 10;
// Called from outside of this script.
var umbraBehaviorFinished = function() {
if (umbraState.idleSince != null) {
var idleTimeMs = Date.now() - umbraState.idleSince;
if (idleTimeMs / 1000 > UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC) {
return true;
}
}
return false;
}
var umbraIntervalId = setInterval(umbraIntervalFunc, 200);

View file

@ -1,12 +1,15 @@
//^https?://(?:www\.)?flickr.com/.*$ // {"url_regex":"^https?://(?:www\\.)?flickr\\.com/.*$", "request_idle_timeout_sec":10}
//
// vim:set sw=8 et:
//
setInterval(function() { window.scrollBy(0,50); }, 100); setInterval(function() { window.scrollBy(0,50); }, 100);
setTimeout(function() { setTimeout(function() {
a = document.evaluate("//a[contains(@class, 'sn-ico-slideshow')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null ); a = document.evaluate("//a[contains(@class, 'sn-ico-slideshow')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null );
f = a.iterateNext(); f = a.iterateNext();
f.click();}, f.click();
5000); }, 5000);
setTimeout(function() { setTimeout(function() {
a = document.evaluate("//a[contains(@data-track, 'photo-click')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null ); a = document.evaluate("//a[contains(@data-track, 'photo-click')]", document, null, XPathResult.UNORDERED_NODE_ITERATOR_TYPE, null );

View file

@ -1,7 +1,27 @@
//^https?://(?:www\.)?vimeo.com/.*$ // {"url_regex":"^https?://(?:www\\.)?vimeo\\.com/.*$", "request_idle_timeout_sec":10}
//
// vim:set sw=8 et:
//
var videoElements = document.getElementsByTagName('video'); var umbraState = {'idleSince':null};
for (var i = 0; i < videoElements.length; i++) { var umbraVideoElements = document.getElementsByTagName('video');
videoElements[i].play(); for (var i = 0; i < umbraVideoElements.length; i++) {
umbraVideoElements[i].play();
}
umbraState.idleSince = Date.now();
// If we haven't had anything to do (scrolled, clicked, etc) in this amount of
// time, then we consider ourselves finished with the page.
var UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC = 10;
// Called from outside of this script.
var umbraBehaviorFinished = function() {
if (umbraState.idleSince != null) {
var idleTimeMs = Date.now() - umbraState.idleSince;
if (idleTimeMs / 1000 > UMBRA_USER_ACTION_IDLE_TIMEOUT_SEC) {
return true;
}
}
return false;
} }

View file

@ -1,25 +1,106 @@
# vim: set sw=4 et: # vim: set sw=4 et:
from json import dumps, load import json
from time import sleep import itertools
from itertools import chain import os
import os, re import re
import logging import logging
import time
import sys
class Behavior:
logger = logging.getLogger('umbra.behaviors.Behavior')
_behaviors = None
_default_behavior = None
@staticmethod
def behaviors():
if Behavior._behaviors is None:
behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d'])
behavior_files = itertools.chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)])
Behavior._behaviors = []
for file_name in behavior_files:
Behavior.logger.debug("reading behavior file {}".format(file_name))
script = open(file_name, encoding='utf-8').read()
first_line = script[:script.find('\n')]
behavior = json.loads(first_line[2:].strip())
behavior['script'] = script
behavior['file'] = file_name
Behavior._behaviors.append(behavior)
Behavior.logger.info("will run behaviors from {} on urls matching {}".format(file_name, behavior['url_regex']))
return Behavior._behaviors
@staticmethod
def default_behavior():
if Behavior._default_behavior is None:
behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d'])
file_name = os.path.join(behaviors_directory, 'default.js')
Behavior.logger.debug("reading default behavior file {}".format(file_name))
script = open(file_name, encoding='utf-8').read()
first_line = script[:script.find('\n')]
behavior = json.loads(first_line[2:].strip())
behavior['script'] = script
behavior['file'] = file_name
Behavior._default_behavior = behavior
return Behavior._default_behavior
def __init__(self, url, umbra_worker):
self.url = url
self.umbra_worker = umbra_worker
self.script_finished = False
self.waiting_result_msg_ids = []
self.active_behavior = None
self.last_activity = time.time()
def start(self):
for behavior in Behavior.behaviors():
if re.match(behavior['url_regex'], self.url):
self.active_behavior = behavior
break
if self.active_behavior is None:
self.active_behavior = Behavior.default_behavior()
self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": self.active_behavior['script']})
self.notify_of_activity()
def is_finished(self):
msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"})
self.waiting_result_msg_ids.append(msg_id)
request_idle_timeout_sec = 30
if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior:
request_idle_timeout_sec = self.active_behavior['request_idle_timeout_sec']
idle_time = time.time() - self.last_activity
return self.script_finished and idle_time > request_idle_timeout_sec
def is_waiting_on_result(self, msg_id):
return msg_id in self.waiting_result_msg_ids
def notify_of_result(self, chrome_message):
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}}
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}}
self.waiting_result_msg_ids.remove(chrome_message['id'])
if ('result' in chrome_message
and not ('wasThrown' in chrome_message['result'] and chrome_message['result']['wasThrown'])
and 'result' in chrome_message['result']
and type(chrome_message['result']['result']['value']) == bool):
self.script_finished = chrome_message['result']['result']['value']
else:
self.logger.error("chrome message doesn't look like a boolean result! {}".format(chrome_message))
def notify_of_activity(self):
self.last_activity = time.time()
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logger = logging.getLogger('umbra.behaviors')
logger.info("custom behaviors: {}".format(Behavior.behaviors()))
logger.info("default behavior: {}".format(Behavior.default_behavior()))
logger = logging.getLogger('behaviors')
behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d'])
behavior_files = chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js')] for dir, dirs, files in os.walk(behaviors_directory)])
behaviors = []
for file_name in behavior_files:
logger.debug("reading behavior file {}".format(file_name))
lines = open(file_name).readlines()
pattern, script = lines[0][2:].strip(), ''.join(lines[1:])
behaviors.append({'url_regex': pattern, 'script': script, 'file': file_name})
logger.info("will run behaviors from {} to urls matching {}".format(file_name, pattern))
def execute(url, websock, command_id):
for behavior in behaviors:
if re.match(behavior['url_regex'], url):
msg = dumps(dict(method="Runtime.evaluate", params={"expression": behavior['script']}, id=next(command_id)))
logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)

View file

@ -2,107 +2,105 @@
# vim: set sw=4 et: # vim: set sw=4 et:
import logging import logging
import os, sys, argparse import sys
# logging.basicConfig(stream=sys.stdout, level=logging.INFO, # logging.basicConfig(stream=sys.stdout, level=logging.INFO,
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
from json import dumps, loads import os
import argparse
import json
import urllib.request, urllib.error, urllib.parse import urllib.request, urllib.error, urllib.parse
from itertools import count import itertools
import websocket import websocket
import time import time
import uuid import uuid
import threading import threading
import subprocess import subprocess
import signal import signal
from kombu import Connection, Exchange, Queue import kombu
import tempfile import tempfile
from umbra import behaviors from umbra.behaviors import Behavior
class UmbraWorker: class UmbraWorker:
"""Runs chrome/chromium to synchronously browse one page at a time using
worker.browse_page(). Currently the implementation starts up a new instance
of chrome for each page browsed, always on the same debug port. (In the
future, it may keep the browser running indefinitely.)"""
logger = logging.getLogger('umbra.UmbraWorker') logger = logging.getLogger('umbra.UmbraWorker')
HARD_TIMEOUT_SECONDS = 20 * 60
def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'): def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'):
self.command_id = count(1) self.command_id = itertools.count(1)
self.lock = threading.Lock() self.lock = threading.Lock()
self.umbra = umbra self.umbra = umbra
self.chrome_port = chrome_port self.chrome_port = chrome_port
self.chrome_exe = chrome_exe self.chrome_exe = chrome_exe
self.chrome_wait = chrome_wait self.chrome_wait = chrome_wait
self.client_id = client_id self.client_id = client_id
self.page_done = threading.Event() self._behavior = None
self.idle_timer = None self.websock = None
self.hard_stop_timer = None
def browse_page(self, url, url_metadata): def browse_page(self, url, url_metadata):
"""Synchronously browse a page and run behaviors."""
with self.lock: with self.lock:
self.url = url self.url = url
self.url_metadata = url_metadata self.url_metadata = url_metadata
with tempfile.TemporaryDirectory() as user_data_dir: with tempfile.TemporaryDirectory() as user_data_dir:
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url:
websock = websocket.WebSocketApp(websocket_url, self.websock = websocket.WebSocketApp(websocket_url,
on_open=self.visit_page, on_message=self.handle_message) on_open=self._visit_page,
websock_thread = threading.Thread(target=websock.run_forever) on_message=self._handle_message)
websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5})
websock_thread.start() websock_thread.start()
start = time.time()
self.page_done.clear() while True:
self._reset_idle_timer()
while not self.page_done.is_set():
time.sleep(0.5) time.sleep(0.5)
if not self.websock or not self.websock.sock or not self.websock.sock.connected:
self.logger.error("websocket closed, did chrome die??? {}".format(self.websock))
break
elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS:
self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url))
break
elif self._behavior != None and self._behavior.is_finished():
self.logger.info("finished browsing page according to behavior url={}".format(self.url))
break
websock.close() try:
self.idle_timer = None self.websock.close()
except BaseException as e:
self.logger.error("exception closing websocket {} - {}".format(self.websock, e))
def _reset_idle_timer(self): websock_thread.join()
def _idle_timeout(): self._behavior = None
self.logger.debug('idle timeout')
self.page_done.set()
if self.hard_stop_timer:
self.hard_stop_timer.cancel()
def _hard_timeout(): def send_to_chrome(self, **kwargs):
self.logger.debug('hard timeout') msg_id = next(self.command_id)
self.page_done.set() kwargs['id'] = msg_id
if self.idle_timer: msg = json.dumps(kwargs)
self.idle_timer.cancel() self.logger.debug('sending message to {}: {}'.format(self.websock, msg))
self.websock.send(msg)
return msg_id
if self.idle_timer: def _visit_page(self, websock):
self.idle_timer.cancel() self.send_to_chrome(method="Network.enable")
self.send_to_chrome(method="Page.enable")
self.send_to_chrome(method="Console.enable")
self.send_to_chrome(method="Debugger.enable")
self.send_to_chrome(method="Runtime.enable")
self.idle_timer = threading.Timer(30, _idle_timeout) # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused"
self.idle_timer.start() self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})
if not self.hard_stop_timer: # 15 minutes is as long as we should give 1 page # navigate to the page!
self.hard_stop_timer = threading.Timer(900, _hard_timeout) self.send_to_chrome(method="Page.navigate", params={"url": self.url})
self.hard_stop_timer.start()
def visit_page(self, websock): # XXX should this class know anything about amqp? or should it
msg = dumps(dict(method="Network.enable", id=next(self.command_id))) # delegate this back up to the Umbra class?
self.logger.debug('sending message to {}: {}'.format(websock, msg)) def _send_request_to_amqp(self, chrome_msg):
websock.send(msg)
msg = dumps(dict(method="Page.enable", id=next(self.command_id)))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Console.enable", id=next(self.command_id)))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Debugger.enable", id=next(self.command_id)))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Debugger.setBreakpointByUrl", id=next(self.command_id), params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url}))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
def send_request_to_amqp(self, chrome_msg):
payload = chrome_msg['params']['request'] payload = chrome_msg['params']['request']
payload['parentUrl'] = self.url payload['parentUrl'] = self.url
payload['parentUrlMetadata'] = self.url_metadata payload['parentUrlMetadata'] = self.url_metadata
@ -112,37 +110,82 @@ class UmbraWorker:
exchange=self.umbra.umbra_exchange, exchange=self.umbra.umbra_exchange,
routing_key=self.client_id) routing_key=self.client_id)
def handle_message(self, websock, message): def _handle_message(self, websock, message):
# self.logger.debug("message from {} - {}".format(websock.url, message[:95])) # self.logger.debug("message from {} - {}".format(websock.url, message[:95]))
# self.logger.debug("message from {} - {}".format(websock.url, message)) # self.logger.debug("message from {} - {}".format(websock.url, message))
message = loads(message) message = json.loads(message)
if "method" in message and message["method"] == "Network.requestWillBeSent": if "method" in message and message["method"] == "Network.requestWillBeSent":
self._reset_idle_timer() if self._behavior:
self._behavior.notify_of_activity()
if not message["params"]["request"]["url"].lower().startswith("data:"): if not message["params"]["request"]["url"].lower().startswith("data:"):
self.send_request_to_amqp(message) self._send_request_to_amqp(message)
else: else:
self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80]))
elif "method" in message and message["method"] == "Page.loadEventFired": elif "method" in message and message["method"] == "Page.loadEventFired":
self.logger.debug("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message)) if self._behavior is None:
behaviors.execute(self.url, websock, self.command_id) self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message))
self._behavior = Behavior(self.url, self)
self._behavior.start()
else:
self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message))
elif "method" in message and message["method"] == "Console.messageAdded": elif "method" in message and message["method"] == "Console.messageAdded":
self.logger.debug("{} console {} {}".format(websock.url, self.logger.debug("{} console.{} {}".format(websock.url,
message["params"]["message"]["level"], message["params"]["message"]["level"],
message["params"]["message"]["text"])) message["params"]["message"]["text"]))
elif "method" in message and message["method"] == "Debugger.paused": elif "method" in message and message["method"] == "Debugger.paused":
# We hit the breakpoint set in visit_page. Get rid of google
# analytics script!
self.logger.debug("debugger paused! message={}".format(message)) self.logger.debug("debugger paused! message={}".format(message))
scriptId = message['params']['callFrames'][0]['location']['scriptId'] scriptId = message['params']['callFrames'][0]['location']['scriptId']
msg = dumps(dict(method="Debugger.setScriptSource", id=next(self.command_id), params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})) # replace script
self.logger.debug('sending message to {}: {}'.format(websock, msg)) self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})
websock.send(msg)
msg = dumps(dict(method="Debugger.resume", id=next(self.command_id))) # resume execution
self.logger.debug('sending message to {}: {}'.format(websock, msg)) self.send_to_chrome(method="Debugger.resume")
websock.send(msg) elif "result" in message:
if self._behavior and self._behavior.is_waiting_on_result(message['id']):
self._behavior.notify_of_result(message)
# elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"):
# pass
# elif "method" in message:
# self.logger.debug("{} {}".format(message["method"], message))
# else:
# self.logger.debug("[no-method] {}".format(message))
class Umbra: class Umbra:
"""Consumes amqp messages representing requests to browse urls, from the
amqp queue "urls" on exchange "umbra". Incoming amqp message is a json
object with 3 attributes:
{
"clientId": "umbra.client.123",
"url": "http://example.com/my_fancy_page",
"metadata": {"arbitrary":"fields", "etc":4}
}
"url" is the url to browse.
"clientId" uniquely idenfities the client of
umbra. Umbra uses the clientId to direct information via amqp back to the
client. It sends this information on that same "umbra" exchange, and uses
the clientId as the amqp routing key.
Each url requested in the browser is published to amqp this way. The
outgoing amqp message is a json object:
{
'url': 'http://example.com/images/embedded_thing.jpg',
'method': 'GET',
'headers': {'User-Agent': '...', 'Accept': '...'}
'parentUrl': 'http://example.com/my_fancy_page',
'parentUrlMetadata': {"arbitrary":"fields", "etc":4},
}
POST requests have an additional field, postData.
"""
logger = logging.getLogger('umbra.Umbra') logger = logging.getLogger('umbra.Umbra')
def __init__(self, amqp_url, chrome_exe, browser_wait): def __init__(self, amqp_url, chrome_exe, browser_wait):
@ -153,7 +196,7 @@ class Umbra:
self.producer_lock = None self.producer_lock = None
self.workers = {} self.workers = {}
self.workers_lock = threading.Lock() self.workers_lock = threading.Lock()
self.amqp_thread = threading.Thread(target=self.consume_amqp) self.amqp_thread = threading.Thread(target=self._consume_amqp)
self.amqp_stop = threading.Event() self.amqp_stop = threading.Event()
self.amqp_thread.start() self.amqp_thread.start()
@ -162,18 +205,18 @@ class Umbra:
self.amqp_stop.set() self.amqp_stop.set()
self.amqp_thread.join() self.amqp_thread.join()
def consume_amqp(self): def _consume_amqp(self):
while not self.amqp_stop.is_set(): while not self.amqp_stop.is_set():
try: try:
self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True)
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange)
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
with Connection(self.amqp_url) as conn: with kombu.Connection(self.amqp_url) as conn:
if self.producer_lock is None: if self.producer_lock is None:
self.producer_lock = threading.Lock() self.producer_lock = threading.Lock()
with self.producer_lock: with self.producer_lock:
self.producer = conn.Producer(serializer='json') self.producer = conn.Producer(serializer='json')
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer:
import socket import socket
while not self.amqp_stop.is_set(): while not self.amqp_stop.is_set():
try: try:
@ -184,7 +227,12 @@ class Umbra:
self.logger.error("amqp exception {}".format(e)) self.logger.error("amqp exception {}".format(e))
self.logger.error("attempting to reopen amqp connection") self.logger.error("attempting to reopen amqp connection")
def fetch_url(self, body, message): def _browse_page_requested(self, body, message):
"""First waits for the UmbraWorker for the client body['clientId'] to
become available, or creates a new worker if this clientId has not been
served before. Starts a worker browsing the page asynchronously, then
acknowledges the amqp message, which lets the server know it can be
removed from the queue."""
client_id = body['clientId'] client_id = body['clientId']
with self.workers_lock: with self.workers_lock:
if not client_id in self.workers: if not client_id in self.workers:
@ -216,7 +264,7 @@ class Chrome:
def fetch_debugging_json(): def fetch_debugging_json():
raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read() raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read()
json = raw_json.decode('utf-8') json = raw_json.decode('utf-8')
return loads(json) return json.loads(json)
# returns websocket url to chrome window with about:blank loaded # returns websocket url to chrome window with about:blank loaded
def __enter__(self): def __enter__(self):
@ -237,7 +285,7 @@ class Chrome:
while True: while True:
try: try:
raw_json = urllib.request.urlopen(json_url).read() raw_json = urllib.request.urlopen(json_url).read()
all_debug_info = loads(raw_json.decode('utf-8')) all_debug_info = json.loads(raw_json.decode('utf-8'))
debug_info = [x for x in all_debug_info if x['url'] == 'about:blank'] debug_info = [x for x in all_debug_info if x['url'] == 'about:blank']
if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: if debug_info and 'webSocketDebuggerUrl' in debug_info[0]: