brozzler/umbra/umbra.py

227 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python
# vim: set sw=4 et:
from json import dumps, loads
from itertools import count
import os,sys,argparse, urllib.request, urllib.error, urllib.parse
import websocket
import time
import uuid
import logging
import threading
import subprocess
import signal
from kombu import Connection, Exchange, Queue
import tempfile
from umbra import behaviors
class UmbraWorker:
logger = logging.getLogger('umbra.UmbraWorker')
def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'):
self.command_id = count(1)
self.lock = threading.Lock()
self.umbra = umbra
self.chrome_port = chrome_port
self.chrome_exe = chrome_exe
self.chrome_wait = chrome_wait
self.client_id = client_id
self.page_done = threading.Event()
self.idle_timer = None
self.hard_stop_timer = None
def browse_page(self, url, url_metadata):
with self.lock:
self.url = url
self.url_metadata = url_metadata
with tempfile.TemporaryDirectory() as user_data_dir:
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url:
websock = websocket.WebSocketApp(websocket_url,
on_open=self.visit_page, on_message=self.handle_message)
websock_thread = threading.Thread(target=websock.run_forever)
websock_thread.start()
self.page_done.clear()
self._reset_idle_timer()
while not self.page_done.is_set():
time.sleep(0.5)
websock.close()
self.idle_timer = None
def _reset_idle_timer(self):
if self.idle_timer:
self.idle_timer.cancel()
self.idle_timer = threading.Timer(10, self.page_done.set)
if not self.hard_stop_timer: #10 minutes is as long as we should give 1 page
self.hard_stop_timer = threading.Timer(600, self.page_done.set)
self.idle_timer.start()
def visit_page(self, websock):
msg = dumps(dict(method="Network.enable", id=next(self.command_id)))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Page.enable", id=next(self.command_id)))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url}))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
def send_request_to_amqp(self, chrome_msg):
payload = chrome_msg['params']['request']
payload['parentUrl'] = self.url
payload['parentUrlMetadata'] = self.url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange, self.client_id, payload))
with self.umbra.producer_lock:
self.umbra.producer.publish(payload,
exchange=self.umbra.umbra_exchange,
routing_key=self.client_id)
def handle_message(self, websock, message):
self.logger.debug("handling message from websocket {} - {}".format(websock, message[:95]))
self._reset_idle_timer()
message = loads(message)
if "method" in message.keys() and message["method"] == "Network.requestWillBeSent":
self.send_request_to_amqp(message)
elif "method" in message.keys() and message["method"] == "Page.loadEventFired":
self.logger.debug("got Page.loadEventFired, starting behaviors for {}".format(self.url))
behaviors.execute(self.url, websock, self.command_id)
class Umbra:
logger = logging.getLogger('umbra.Umbra')
def __init__(self, amqp_url, chrome_exe, browser_wait):
self.amqp_url = amqp_url
self.chrome_exe = chrome_exe
self.browser_wait = browser_wait
self.producer = None
self.producer_lock = None
self.workers = {}
self.workers_lock = threading.Lock()
self.amqp_thread = threading.Thread(target=self.consume_amqp)
self.amqp_stop = threading.Event()
self.amqp_thread.start()
def shutdown(self):
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self.amqp_stop.set()
self.amqp_thread.join()
def consume_amqp(self):
self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True)
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
self.logger.info("connecting to amqp {} at {}".format(repr(self.umbra_exchange), self.amqp_url))
with Connection(self.amqp_url) as conn:
self.producer = conn.Producer(serializer='json')
self.producer_lock = threading.Lock()
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
import socket
while not self.amqp_stop.is_set():
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
pass
def fetch_url(self, body, message):
client_id = body['clientId']
with self.workers_lock:
if not client_id in self.workers:
port = 9222 + len(self.workers)
t = UmbraWorker(umbra=self, chrome_port=port,
chrome_exe=self.chrome_exe,
chrome_wait=self.browser_wait,
client_id=client_id)
self.workers[client_id] = t
def browse_page_async():
self.logger.info('client_id={} body={}'.format(client_id, body))
self.workers[client_id].browse_page(body['url'], body['metadata'])
threading.Thread(target=browse_page_async).start()
message.ack()
class Chrome:
logger = logging.getLogger('umbra.Chrome')
def __init__(self, port, executable, browser_wait, user_data_dir):
self.port = port
self.executable = executable
self.browser_wait = browser_wait
self.user_data_dir = user_data_dir
def fetch_debugging_json():
raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read()
json = raw_json.decode('utf-8')
return loads(json)
# returns websocket url to chrome window with about:blank loaded
def __enter__(self):
chrome_args = [self.executable,
"--user-data-dir={}".format(self.user_data_dir),
"--remote-debugging-port=%s" % self.port,
"--disable-web-sockets", "--disable-cache",
"--window-size=1100,900", "--enable-logging",
"--homepage=about:blank", "about:blank"]
self.logger.info("running {}".format(chrome_args))
self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True)
self.logger.info("chrome running, pid {}".format(self.chrome_process.pid))
start = time.time()
json_url = "http://localhost:%s/json" % self.port
while True:
try:
raw_json = urllib.request.urlopen(json_url).read()
all_debug_info = loads(raw_json.decode('utf-8'))
debug_info = [x for x in all_debug_info if x['url'] == 'about:blank']
if debug_info and 'webSocketDebuggerUrl' in debug_info[0]:
self.logger.debug("{} returned {}".format(json_url, raw_json))
url = debug_info[0]['webSocketDebuggerUrl']
self.logger.info('got chrome window websocket debug url {} from {}'.format(url, json_url))
return url
except:
pass
finally:
if time.time() - start > float(self.browser_wait):
raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - start))
else:
time.sleep(0.5)
def __exit__(self, *args):
self.logger.info("killing chrome pid {}".format(self.chrome_process.pid))
os.killpg(self.chrome_process.pid, signal.SIGINT)
self.chrome_process.wait()
def main():
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='umbra - Browser automation tool',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='10',
help='Seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser',
help='Executable to use to invoke chrome')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
args = arg_parser.parse_args(args=sys.argv[1:])
umbra = Umbra(args.amqp_url, args.executable, args.browser_wait)
try:
while True:
time.sleep(0.5)
except:
pass
finally:
umbra.shutdown()
if __name__ == "__main__":
main()