2014-01-21 06:41:46 +00:00
#!/usr/bin/env python
2014-02-10 20:45:18 -08:00
# vim: set sw=4 et:
2014-03-06 18:25:46 -08:00
import logging
import os , sys , argparse
# logging.basicConfig(stream=sys.stdout, level=logging.INFO,
logging . basicConfig ( stream = sys . stdout , level = logging . DEBUG ,
format = ' %(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s . %(funcName)s ( %(filename)s : %(lineno)d ) %(message)s ' )
2014-01-21 06:41:46 +00:00
from json import dumps , loads
2014-03-06 18:25:46 -08:00
import urllib . request , urllib . error , urllib . parse
2014-02-12 19:31:03 -05:00
from itertools import count
2014-01-21 06:41:46 +00:00
import websocket
import time
2014-01-21 21:32:33 +00:00
import uuid
import threading
2014-02-13 01:59:09 -08:00
import subprocess
import signal
2014-01-22 07:38:03 +00:00
from kombu import Connection , Exchange , Queue
2014-02-14 19:45:16 -08:00
import tempfile
2014-03-05 23:19:09 -05:00
from umbra import behaviors
2014-01-21 06:41:46 +00:00
2014-02-13 01:59:09 -08:00
class UmbraWorker :
logger = logging . getLogger ( ' umbra.UmbraWorker ' )
def __init__ ( self , umbra , chrome_port = 9222 , chrome_exe = ' chromium-browser ' , chrome_wait = 10 , client_id = ' request ' ) :
self . command_id = count ( 1 )
self . lock = threading . Lock ( )
self . umbra = umbra
self . chrome_port = chrome_port
self . chrome_exe = chrome_exe
self . chrome_wait = chrome_wait
self . client_id = client_id
2014-02-14 15:10:23 -08:00
self . page_done = threading . Event ( )
self . idle_timer = None
2014-03-05 23:19:09 -05:00
self . hard_stop_timer = None
2014-02-13 01:59:09 -08:00
def browse_page ( self , url , url_metadata ) :
with self . lock :
self . url = url
self . url_metadata = url_metadata
2014-02-14 19:45:16 -08:00
with tempfile . TemporaryDirectory ( ) as user_data_dir :
with Chrome ( self . chrome_port , self . chrome_exe , self . chrome_wait , user_data_dir ) as websocket_url :
websock = websocket . WebSocketApp ( websocket_url ,
on_open = self . visit_page , on_message = self . handle_message )
websock_thread = threading . Thread ( target = websock . run_forever )
websock_thread . start ( )
2014-02-13 01:59:09 -08:00
2014-02-14 19:45:16 -08:00
self . page_done . clear ( )
self . _reset_idle_timer ( )
while not self . page_done . is_set ( ) :
time . sleep ( 0.5 )
2014-02-13 01:59:09 -08:00
2014-02-14 19:45:16 -08:00
websock . close ( )
self . idle_timer = None
2014-02-14 15:10:23 -08:00
def _reset_idle_timer ( self ) :
2014-03-07 19:39:27 -08:00
def _idle_timeout ( ) :
self . logger . debug ( ' idle timeout ' )
self . page_done . set ( )
if self . hard_stop_timer :
self . hard_stop_timer . cancel ( )
def _hard_timeout ( ) :
self . logger . debug ( ' hard timeout ' )
self . page_done . set ( )
if self . idle_timer :
self . idle_timer . cancel ( )
2014-02-14 15:10:23 -08:00
if self . idle_timer :
self . idle_timer . cancel ( )
2014-03-07 19:39:27 -08:00
self . idle_timer = threading . Timer ( 30 , _idle_timeout )
2014-03-06 18:32:30 -08:00
self . idle_timer . start ( )
2014-03-07 19:39:27 -08:00
if not self . hard_stop_timer : # 15 minutes is as long as we should give 1 page
self . hard_stop_timer = threading . Timer ( 900 , _hard_timeout )
2014-03-06 18:32:30 -08:00
self . hard_stop_timer . start ( )
2014-02-13 01:59:09 -08:00
def visit_page ( self , websock ) :
msg = dumps ( dict ( method = " Network.enable " , id = next ( self . command_id ) ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
2014-02-14 17:57:04 -08:00
msg = dumps ( dict ( method = " Page.enable " , id = next ( self . command_id ) ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
2014-03-07 19:39:27 -08:00
msg = dumps ( dict ( method = " Console.enable " , id = next ( self . command_id ) ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
2014-05-02 18:30:28 -07:00
msg = dumps ( dict ( method = " Debugger.enable " , id = next ( self . command_id ) ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
msg = dumps ( dict ( method = " Debugger.setBreakpointByUrl " , id = next ( self . command_id ) , params = { " lineNumber " : 1 , " urlRegex " : " https?://www.google-analytics.com/analytics.js " } ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
2014-02-13 01:59:09 -08:00
msg = dumps ( dict ( method = " Page.navigate " , id = next ( self . command_id ) , params = { " url " : self . url } ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
2014-02-14 17:57:04 -08:00
def send_request_to_amqp ( self , chrome_msg ) :
payload = chrome_msg [ ' params ' ] [ ' request ' ]
payload [ ' parentUrl ' ] = self . url
payload [ ' parentUrlMetadata ' ] = self . url_metadata
2014-03-07 19:39:27 -08:00
self . logger . debug ( ' sending to amqp exchange= {} routing_key= {} payload= {} ' . format ( self . umbra . umbra_exchange . name , self . client_id , payload ) )
2014-02-14 17:57:04 -08:00
with self . umbra . producer_lock :
self . umbra . producer . publish ( payload ,
exchange = self . umbra . umbra_exchange ,
routing_key = self . client_id )
2014-02-14 15:10:23 -08:00
2014-02-13 01:59:09 -08:00
def handle_message ( self , websock , message ) :
2014-03-07 19:39:27 -08:00
# self.logger.debug("message from {} - {}".format(websock.url, message[:95]))
# self.logger.debug("message from {} - {}".format(websock.url, message))
2014-02-13 01:59:09 -08:00
message = loads ( message )
2014-03-07 19:39:27 -08:00
if " method " in message and message [ " method " ] == " Network.requestWillBeSent " :
2014-03-06 18:35:04 -08:00
self . _reset_idle_timer ( )
2014-03-07 19:39:27 -08:00
if not message [ " params " ] [ " request " ] [ " url " ] . lower ( ) . startswith ( " data: " ) :
self . send_request_to_amqp ( message )
else :
self . logger . debug ( " ignoring data url {} " . format ( message [ " params " ] [ " request " ] [ " url " ] [ : 80 ] ) )
elif " method " in message and message [ " method " ] == " Page.loadEventFired " :
self . logger . debug ( " Page.loadEventFired, starting behaviors url= {} message= {} " . format ( self . url , message ) )
2014-02-14 19:45:16 -08:00
behaviors . execute ( self . url , websock , self . command_id )
2014-03-07 19:39:27 -08:00
elif " method " in message and message [ " method " ] == " Console.messageAdded " :
self . logger . debug ( " {} console {} {} " . format ( websock . url ,
message [ " params " ] [ " message " ] [ " level " ] ,
message [ " params " ] [ " message " ] [ " text " ] ) )
2014-05-02 18:30:28 -07:00
elif " method " in message and message [ " method " ] == " Debugger.paused " :
self . logger . debug ( " debugger paused! message= {} " . format ( message ) )
scriptId = message [ ' params ' ] [ ' callFrames ' ] [ 0 ] [ ' location ' ] [ ' scriptId ' ]
msg = dumps ( dict ( method = " Debugger.setScriptSource " , id = next ( self . command_id ) , params = { " scriptId " : scriptId , " scriptSource " : " console.log( ' google analytics is no more! ' ); " } ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
msg = dumps ( dict ( method = " Debugger.resume " , id = next ( self . command_id ) ) )
self . logger . debug ( ' sending message to {} : {} ' . format ( websock , msg ) )
websock . send ( msg )
2014-02-13 01:59:09 -08:00
2014-01-21 21:32:33 +00:00
class Umbra :
2014-02-10 20:45:18 -08:00
logger = logging . getLogger ( ' umbra.Umbra ' )
2014-02-13 01:59:09 -08:00
def __init__ ( self , amqp_url , chrome_exe , browser_wait ) :
2014-02-10 20:45:18 -08:00
self . amqp_url = amqp_url
2014-02-13 01:59:09 -08:00
self . chrome_exe = chrome_exe
self . browser_wait = browser_wait
self . producer = None
self . producer_lock = None
self . workers = { }
self . workers_lock = threading . Lock ( )
self . amqp_thread = threading . Thread ( target = self . consume_amqp )
self . amqp_stop = threading . Event ( )
self . amqp_thread . start ( )
def shutdown ( self ) :
self . logger . info ( " shutting down amqp consumer {} " . format ( self . amqp_url ) )
self . amqp_stop . set ( )
self . amqp_thread . join ( )
2014-02-10 20:45:18 -08:00
def consume_amqp ( self ) :
2014-04-07 11:45:12 -07:00
while not self . amqp_stop . is_set ( ) :
try :
self . umbra_exchange = Exchange ( name = ' umbra ' , type = ' direct ' , durable = True )
url_queue = Queue ( ' urls ' , routing_key = ' url ' , exchange = self . umbra_exchange )
self . logger . info ( " connecting to amqp exchange= {} at {} " . format ( self . umbra_exchange . name , self . amqp_url ) )
with Connection ( self . amqp_url ) as conn :
if self . producer_lock is None :
self . producer_lock = threading . Lock ( )
with self . producer_lock :
self . producer = conn . Producer ( serializer = ' json ' )
with conn . Consumer ( url_queue , callbacks = [ self . fetch_url ] ) as consumer :
import socket
while not self . amqp_stop . is_set ( ) :
try :
conn . drain_events ( timeout = 0.5 )
except socket . timeout :
pass
except BaseException as e :
self . logger . error ( " amqp exception {} " . format ( e ) )
self . logger . error ( " attempting to reopen amqp connection " )
2014-02-10 20:45:18 -08:00
def fetch_url ( self , body , message ) :
2014-02-13 01:59:09 -08:00
client_id = body [ ' clientId ' ]
with self . workers_lock :
if not client_id in self . workers :
port = 9222 + len ( self . workers )
t = UmbraWorker ( umbra = self , chrome_port = port ,
chrome_exe = self . chrome_exe ,
chrome_wait = self . browser_wait ,
client_id = client_id )
self . workers [ client_id ] = t
def browse_page_async ( ) :
self . logger . info ( ' client_id= {} body= {} ' . format ( client_id , body ) )
self . workers [ client_id ] . browse_page ( body [ ' url ' ] , body [ ' metadata ' ] )
threading . Thread ( target = browse_page_async ) . start ( )
message . ack ( )
class Chrome :
2014-02-10 20:45:18 -08:00
logger = logging . getLogger ( ' umbra.Chrome ' )
2014-02-14 19:45:16 -08:00
def __init__ ( self , port , executable , browser_wait , user_data_dir ) :
2014-02-10 20:45:18 -08:00
self . port = port
self . executable = executable
self . browser_wait = browser_wait
2014-02-14 19:45:16 -08:00
self . user_data_dir = user_data_dir
2014-02-10 20:45:18 -08:00
def fetch_debugging_json ( ) :
raw_json = urllib . request . urlopen ( " http://localhost: %s /json " % self . port ) . read ( )
json = raw_json . decode ( ' utf-8 ' )
return loads ( json )
2014-02-13 01:59:09 -08:00
# returns websocket url to chrome window with about:blank loaded
2014-02-10 20:45:18 -08:00
def __enter__ ( self ) :
2014-02-14 19:45:16 -08:00
chrome_args = [ self . executable ,
" --user-data-dir= {} " . format ( self . user_data_dir ) ,
2014-02-10 20:45:18 -08:00
" --remote-debugging-port= %s " % self . port ,
" --disable-web-sockets " , " --disable-cache " ,
2014-02-13 01:59:09 -08:00
" --window-size=1100,900 " , " --enable-logging " ,
2014-05-02 17:37:10 -07:00
" --no-default-browser-check " , " --disable-first-run-ui " , " --no-first-run " ,
2014-02-10 20:45:18 -08:00
" --homepage=about:blank " , " about:blank " ]
self . logger . info ( " running {} " . format ( chrome_args ) )
2014-02-13 01:59:09 -08:00
self . chrome_process = subprocess . Popen ( chrome_args , start_new_session = True )
2014-02-10 20:45:18 -08:00
self . logger . info ( " chrome running, pid {} " . format ( self . chrome_process . pid ) )
start = time . time ( )
json_url = " http://localhost: %s /json " % self . port
2014-02-13 01:59:09 -08:00
2014-02-10 20:45:18 -08:00
while True :
try :
raw_json = urllib . request . urlopen ( json_url ) . read ( )
all_debug_info = loads ( raw_json . decode ( ' utf-8 ' ) )
debug_info = [ x for x in all_debug_info if x [ ' url ' ] == ' about:blank ' ]
if debug_info and ' webSocketDebuggerUrl ' in debug_info [ 0 ] :
self . logger . debug ( " {} returned {} " . format ( json_url , raw_json ) )
url = debug_info [ 0 ] [ ' webSocketDebuggerUrl ' ]
self . logger . info ( ' got chrome window websocket debug url {} from {} ' . format ( url , json_url ) )
return url
except :
pass
finally :
if time . time ( ) - start > float ( self . browser_wait ) :
raise Exception ( " failed to retrieve {} after {} seconds " . format ( json_url , time . time ( ) - start ) )
else :
time . sleep ( 0.5 )
def __exit__ ( self , * args ) :
self . logger . info ( " killing chrome pid {} " . format ( self . chrome_process . pid ) )
2014-02-13 01:59:09 -08:00
os . killpg ( self . chrome_process . pid , signal . SIGINT )
self . chrome_process . wait ( )
2014-01-21 06:41:46 +00:00
2014-01-22 02:30:12 +00:00
def main ( ) :
2014-04-03 21:19:08 -07:00
import faulthandler
faulthandler . register ( signal . SIGQUIT )
2014-02-10 20:45:18 -08:00
arg_parser = argparse . ArgumentParser ( prog = os . path . basename ( sys . argv [ 0 ] ) ,
description = ' umbra - Browser automation tool ' ,
formatter_class = argparse . ArgumentDefaultsHelpFormatter )
arg_parser . add_argument ( ' -w ' , ' --browser-wait ' , dest = ' browser_wait ' , default = ' 10 ' ,
help = ' Seconds to wait for browser initialization ' )
arg_parser . add_argument ( ' -e ' , ' --executable ' , dest = ' executable ' , default = ' chromium-browser ' ,
help = ' Executable to use to invoke chrome ' )
arg_parser . add_argument ( ' -u ' , ' --url ' , dest = ' amqp_url ' , default = ' amqp://guest:guest@localhost:5672/ %2f ' ,
help = ' URL identifying the amqp server to talk to ' )
args = arg_parser . parse_args ( args = sys . argv [ 1 : ] )
2014-02-13 01:59:09 -08:00
umbra = Umbra ( args . amqp_url , args . executable , args . browser_wait )
try :
while True :
time . sleep ( 0.5 )
except :
pass
finally :
umbra . shutdown ( )
2014-01-22 02:30:12 +00:00
if __name__ == " __main__ " :
2014-02-10 20:45:18 -08:00
main ( )