Merge pull request #23 from nlevitt/master

improve robustness, refactor, tweaks
This commit is contained in:
vonrosen 2014-05-28 12:38:11 -07:00
commit 2dc30cc8bc
12 changed files with 458 additions and 246 deletions

8
.gitignore vendored
View file

@ -1,8 +1,4 @@
*.pyc
/bin/
/include/
/lib/
/local/
/share/
/build/
*.diff
.*.sw*
/umbra.egg-info/

View file

@ -1,18 +1,38 @@
umbra
=====
Umbra is a browser automation tool, developed for the web archiving service
https://archive-it.org/.
Browser automation via chrome debug protocol
Umbra receives urls via AMQP. It opens them in the chrome or chromium browser,
with which it communicates using the chrome remote debug protocol (see
https://developer.chrome.com/devtools/docs/debugger-protocol). It runs
javascript behaviors to simulate user interaction with the page. It publishes
information about the the urls requested by the browser back to AMQP. The
format of the incoming and outgoing AMQP messages is described in `pydoc
umbra.controller`.
Umbra can be used with the Heritrix web crawler, using these heritrix modules:
* [AMQPUrlReceiver](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java)
* [AMQPPublishProcessor](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java)
Install
======
Install via pip from this repo.
------
Install via pip from this repo, e.g.
pip install git+https://github.com/internetarchive/umbra.git
Umbra requires an AMQP messaging service like RabbitMQ. On Ubuntu,
`sudo apt-get install rabbitmq-server` will install and start RabbitMQ
at amqp://guest:guest@localhost:5672/%2f, which the default AMQP url for umbra.
Run
=====
"umbra" script should be in bin/.
load_url.py takes urls as arguments and puts them onto a rabbitmq queue
dump_queue.py prints resources discovered by the browser and sent over the return queue.
---
The command `umbra` will start umbra with default configuration. `umbra --help`
describes all command line options.
Umbra also comes with these utilities:
* browse-url - open urls in chrome/chromium and run behaviors (without involving AMQP)
* queue-url - send url to umbra via AMQP
* drain-queue - consume messages from AMQP queue
On ubuntu, rabbitmq install with `sudo apt-get install rabbitmq-server` should automatically
be set up for these three scripts to function on localhost ( the default amqp url ).

28
bin/browse-url Executable file
View file

@ -0,0 +1,28 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import argparse
import os
import sys
import logging
from umbra.browser import Browser
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='browse-url - open urls in chrome/chromium and run behaviors',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
help='seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='executable to use to invoke chrome')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse')
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
browser = Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait)
for url in args.urls:
browser.browse_page(url)

49
bin/drain-queue Executable file
View file

@ -0,0 +1,49 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import os
import sys
import argparse
import logging
import socket
from kombu import Connection, Exchange, Queue
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='drain-queue - consume messages from AMQP queue',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the AMQP server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--queue', dest='amqp_queue', default='urls',
help='AMQP queue name')
arg_parser.add_argument('-n', '--no-ack', dest='no_ack', action="store_const",
default=False, const=True, help="leave messages on the queue (default: remove them from the queue)")
arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store_const",
default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)")
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stderr, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
def print_and_maybe_ack(body, message):
print(body)
if not args.no_ack:
message.ack()
exchange = Exchange(args.amqp_exchange, 'direct', durable=True)
queue = Queue(args.amqp_queue, exchange=exchange)
try:
with Connection(args.amqp_url) as conn:
with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer:
consumer.qos(prefetch_count=1)
while True:
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
if not args.run_forever:
logging.debug("exiting, no messages left on the queue")
break
except KeyboardInterrupt:
logging.debug("exiting, stopped by user")

View file

@ -1,21 +0,0 @@
#!/usr/bin/env python
from json import dumps, loads
import os,sys,argparse, urllib.request, urllib.error, urllib.parse
import websocket
import time
import uuid
import logging
import threading
from kombu import Connection, Exchange, Queue
logging.basicConfig(level=logging.INFO)
umbra_exchange = Exchange('umbra', 'direct', durable=True)
requests_queue = Queue('requests', exchange=umbra_exchange)
def print_and_ack(body, message):
print(body['url'])
message.ack()
with Connection(sys.argv[1] if len(sys.argv) > 1 else "amqp://guest:guest@localhost:5672//") as conn:
with conn.Consumer(requests_queue, callbacks=[print_and_ack]) as consumer:
while True:
conn.drain_events()

View file

@ -1,30 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
from json import dumps, loads
import os,sys,argparse, urllib.request, urllib.error, urllib.parse
import websocket
import time
import uuid
import logging
import threading
from kombu import Connection, Exchange, Queue
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='load_url.py - send url to umbra via amqp',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0',
help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to')
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra')
args = arg_parser.parse_args(args=sys.argv[1:])
umbra_exchange = Exchange('umbra', 'direct', durable=True)
with Connection(args.amqp_url) as conn:
producer = conn.Producer(serializer='json')
for url in args.urls:
producer.publish({'url': url, 'metadata': {}, 'clientId': args.client_id}, 'url', exchange=umbra_exchange)

36
bin/queue-url Executable file
View file

@ -0,0 +1,36 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import os
import sys
import argparse
import logging
from kombu import Connection, Exchange, Queue
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='queue-url - send url to umbra via AMQP',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the AMQP server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url',
help='AMQP routing key')
arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0',
help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra')
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
exchange = Exchange(args.amqp_exchange, 'direct', durable=True)
with Connection(args.amqp_url) as conn:
producer = conn.Producer(serializer='json')
for url in args.urls:
payload = {'url': url, 'metadata': {}, 'clientId': args.client_id}
logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload))
producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange)

View file

@ -1,6 +1,78 @@
#!/usr/bin/env python
from umbra import umbra
# vim: set sw=4 et:
import logging
import argparse
import time
import umbra
import sys
import signal
import os
import umbra
import signal
import threading
if __name__=="__main__":
umbra.main()
import faulthandler
faulthandler.register(signal.SIGQUIT)
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='umbra - browser automation tool communicating via AMQP',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
help='Seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='Executable to use to invoke chrome')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--queue', dest='amqp_queue', default='urls',
help='AMQP queue to consume urls from')
arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url',
help='AMQP routing key to bind to the AMQP queue')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='3',
help='Max number of chrome instances simultaneously browsing pages')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
umbra = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait,
max_active_browsers=int(args.max_browsers),
exchange_name=args.amqp_exchange, queue_name=args.amqp_queue,
routing_key=args.amqp_routing_key)
class ShutdownRequested(Exception):
pass
def sigterm(signum, frame):
raise ShutdownRequested('shutdown requested (caught SIGTERM)')
def sigint(signum, frame):
raise ShutdownRequested('shutdown requested (caught SIGINT)')
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
umbra.start()
try:
while True:
time.sleep(0.5)
except ShutdownRequested as e:
logging.info(e)
except BaseException as e:
logging.fatal(e)
finally:
try:
umbra.shutdown()
for th in threading.enumerate():
if th != threading.current_thread():
th.join()
except BaseException as e:
logging.warn("caught {}".format(e))
umbra.shutdown_now()

View file

@ -1,8 +1,11 @@
import setuptools
# vim: set sw=4 et:
import setuptools
import glob
setuptools.setup(name='umbra',
version='0.1',
description='Google Chrome remote control interface',
description='Browser automation via chrome debug protocol',
url='https://github.com/internetarchive/umbra',
author='Eldon Stegall',
author_email='eldon@archive.org',
@ -10,8 +13,8 @@ setuptools.setup(name='umbra',
license='Apache License 2.0',
packages=['umbra'],
package_data={'umbra':['behaviors.d/*.js']},
install_requires=['kombu', 'websocket-client-py3','argparse'],
scripts=['bin/umbra', 'bin/load_url.py', 'bin/dump_queue.py'],
install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'],
scripts=glob.glob("bin/*"),
zip_safe=False,
classifiers=[
'Development Status :: 3 - Alpha Development Status',

View file

@ -0,0 +1,3 @@
from umbra.browser import Browser
from umbra.controller import AmqpBrowserController
Umbra = AmqpBrowserController

254
umbra/umbra.py → umbra/browser.py Executable file → Normal file
View file

@ -2,58 +2,105 @@
# vim: set sw=4 et:
import logging
import sys
# logging.basicConfig(stream=sys.stdout, level=logging.INFO,
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
import os
import argparse
import json
import urllib.request, urllib.error, urllib.parse
import urllib.request
import itertools
import websocket
import time
import uuid
import threading
import subprocess
import signal
import kombu
import tempfile
import os
import socket
from umbra.behaviors import Behavior
class UmbraWorker:
class BrowserPool:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60):
self._available = set()
self._in_use = set()
for i in range(0, size):
port_holder = self._grab_random_port()
browser = Browser(port_holder.getsockname()[1], chrome_exe, chrome_wait)
self._available.add((browser, port_holder))
self._lock = threading.Lock()
self.logger.info("browser ports: {}".format([browser.chrome_port for (browser, port_holder) in self._available]))
def _grab_random_port(self):
"""Returns socket bound to some port."""
sock = socket.socket()
sock.bind(('127.0.0.1', 0))
return sock
def _hold_port(self, port):
"""Returns socket bound to supplied port."""
sock = socket.socket()
sock.bind(('127.0.0.1', port))
return sock
def acquire(self):
"""Returns browser from pool if available, raises KeyError otherwise."""
with self._lock:
(browser, port_holder) = self._available.pop()
port_holder.close()
self._in_use.add(browser)
return browser
def release(self, browser):
with self._lock:
port_holder = self._hold_port(browser.chrome_port)
self._available.add((browser, port_holder))
self._in_use.remove(browser)
def shutdown_now(self):
for browser in self._in_use:
browser.shutdown_now()
class Browser:
"""Runs chrome/chromium to synchronously browse one page at a time using
worker.browse_page(). Currently the implementation starts up a new instance
of chrome for each page browsed, always on the same debug port. (In the
future, it may keep the browser running indefinitely.)"""
logger = logging.getLogger('umbra.UmbraWorker')
logger = logging.getLogger(__module__ + "." + __qualname__)
HARD_TIMEOUT_SECONDS = 20 * 60
def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'):
def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60):
self.command_id = itertools.count(1)
self.lock = threading.Lock()
self.umbra = umbra
self._lock = threading.Lock()
self.chrome_port = chrome_port
self.chrome_exe = chrome_exe
self.chrome_wait = chrome_wait
self.client_id = client_id
self._behavior = None
self.websock = None
self._shutdown_now = False
def browse_page(self, url, url_metadata):
"""Synchronously browse a page and run behaviors."""
with self.lock:
def shutdown_now(self):
self._shutdown_now = True
def browse_page(self, url, on_request=None):
"""Synchronously browses a page and runs behaviors. First blocks to
acquire lock to ensure we only browse one page at a time."""
with self._lock:
self.url = url
self.url_metadata = url_metadata
self.on_request = on_request
with tempfile.TemporaryDirectory() as user_data_dir:
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url:
self.websock = websocket.WebSocketApp(websocket_url,
on_open=self._visit_page,
on_message=self._handle_message)
websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5})
import random
threadName = "WebsockThread{}-{}".format(self.chrome_port,
''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6))))
websock_thread = threading.Thread(target=self.websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5})
websock_thread.start()
start = time.time()
@ -62,12 +109,15 @@ class UmbraWorker:
if not self.websock or not self.websock.sock or not self.websock.sock.connected:
self.logger.error("websocket closed, did chrome die??? {}".format(self.websock))
break
elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS:
self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url))
elif time.time() - start > Browser.HARD_TIMEOUT_SECONDS:
self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url))
break
elif self._behavior != None and self._behavior.is_finished():
self.logger.info("finished browsing page according to behavior url={}".format(self.url))
break
elif self._shutdown_now:
self.logger.warn("immediate shutdown requested")
break
try:
self.websock.close()
@ -98,18 +148,6 @@ class UmbraWorker:
# navigate to the page!
self.send_to_chrome(method="Page.navigate", params={"url": self.url})
# XXX should this class know anything about amqp? or should it
# delegate this back up to the Umbra class?
def _send_request_to_amqp(self, chrome_msg):
payload = chrome_msg['params']['request']
payload['parentUrl'] = self.url
payload['parentUrlMetadata'] = self.url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange.name, self.client_id, payload))
with self.umbra.producer_lock:
self.umbra.producer.publish(payload,
exchange=self.umbra.umbra_exchange,
routing_key=self.client_id)
def _handle_message(self, websock, message):
# self.logger.debug("message from {} - {}".format(websock.url, message[:95]))
# self.logger.debug("message from {} - {}".format(websock.url, message))
@ -117,10 +155,10 @@ class UmbraWorker:
if "method" in message and message["method"] == "Network.requestWillBeSent":
if self._behavior:
self._behavior.notify_of_activity()
if not message["params"]["request"]["url"].lower().startswith("data:"):
self._send_request_to_amqp(message)
else:
if message["params"]["request"]["url"].lower().startswith("data:"):
self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80]))
elif self.on_request:
self.on_request(message)
elif "method" in message and message["method"] == "Page.loadEventFired":
if self._behavior is None:
self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message))
@ -155,105 +193,8 @@ class UmbraWorker:
# self.logger.debug("[no-method] {}".format(message))
class Umbra:
"""Consumes amqp messages representing requests to browse urls, from the
amqp queue "urls" on exchange "umbra". Incoming amqp message is a json
object with 3 attributes:
{
"clientId": "umbra.client.123",
"url": "http://example.com/my_fancy_page",
"metadata": {"arbitrary":"fields", "etc":4}
}
"url" is the url to browse.
"clientId" uniquely idenfities the client of
umbra. Umbra uses the clientId to direct information via amqp back to the
client. It sends this information on that same "umbra" exchange, and uses
the clientId as the amqp routing key.
Each url requested in the browser is published to amqp this way. The
outgoing amqp message is a json object:
{
'url': 'http://example.com/images/embedded_thing.jpg',
'method': 'GET',
'headers': {'User-Agent': '...', 'Accept': '...'}
'parentUrl': 'http://example.com/my_fancy_page',
'parentUrlMetadata': {"arbitrary":"fields", "etc":4},
}
POST requests have an additional field, postData.
"""
logger = logging.getLogger('umbra.Umbra')
def __init__(self, amqp_url, chrome_exe, browser_wait):
self.amqp_url = amqp_url
self.chrome_exe = chrome_exe
self.browser_wait = browser_wait
self.producer = None
self.producer_lock = None
self.workers = {}
self.workers_lock = threading.Lock()
self.amqp_thread = threading.Thread(target=self._consume_amqp)
self.amqp_stop = threading.Event()
self.amqp_thread.start()
def shutdown(self):
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self.amqp_stop.set()
self.amqp_thread.join()
def _consume_amqp(self):
while not self.amqp_stop.is_set():
try:
self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True)
url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange)
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
with kombu.Connection(self.amqp_url) as conn:
if self.producer_lock is None:
self.producer_lock = threading.Lock()
with self.producer_lock:
self.producer = conn.Producer(serializer='json')
with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer:
import socket
while not self.amqp_stop.is_set():
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
pass
except BaseException as e:
self.logger.error("amqp exception {}".format(e))
self.logger.error("attempting to reopen amqp connection")
def _browse_page_requested(self, body, message):
"""First waits for the UmbraWorker for the client body['clientId'] to
become available, or creates a new worker if this clientId has not been
served before. Starts a worker browsing the page asynchronously, then
acknowledges the amqp message, which lets the server know it can be
removed from the queue."""
client_id = body['clientId']
with self.workers_lock:
if not client_id in self.workers:
port = 9222 + len(self.workers)
t = UmbraWorker(umbra=self, chrome_port=port,
chrome_exe=self.chrome_exe,
chrome_wait=self.browser_wait,
client_id=client_id)
self.workers[client_id] = t
def browse_page_async():
self.logger.info('client_id={} body={}'.format(client_id, body))
self.workers[client_id].browse_page(body['url'], body['metadata'])
threading.Thread(target=browse_page_async).start()
message.ack()
class Chrome:
logger = logging.getLogger('umbra.Chrome')
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, port, executable, browser_wait, user_data_dir):
self.port = port
@ -261,19 +202,14 @@ class Chrome:
self.browser_wait = browser_wait
self.user_data_dir = user_data_dir
def fetch_debugging_json():
raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read()
json = raw_json.decode('utf-8')
return json.loads(json)
# returns websocket url to chrome window with about:blank loaded
def __enter__(self):
chrome_args = [self.executable,
"--user-data-dir={}".format(self.user_data_dir),
"--remote-debugging-port=%s" % self.port,
"--remote-debugging-port={}".format(self.port),
"--disable-web-sockets", "--disable-cache",
"--window-size=1100,900", "--enable-logging",
"--no-default-browser-check", "--disable-first-run-ui", "--no-first-run",
"--window-size=1100,900", "--no-default-browser-check",
"--disable-first-run-ui", "--no-first-run",
"--homepage=about:blank", "about:blank"]
self.logger.info("running {}".format(chrome_args))
self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True)
@ -306,31 +242,3 @@ class Chrome:
os.killpg(self.chrome_process.pid, signal.SIGINT)
self.chrome_process.wait()
def main():
import faulthandler
faulthandler.register(signal.SIGQUIT)
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='umbra - Browser automation tool',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='10',
help='Seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser',
help='Executable to use to invoke chrome')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
args = arg_parser.parse_args(args=sys.argv[1:])
umbra = Umbra(args.amqp_url, args.executable, args.browser_wait)
try:
while True:
time.sleep(0.5)
except:
pass
finally:
umbra.shutdown()
if __name__ == "__main__":
main()

148
umbra/controller.py Normal file
View file

@ -0,0 +1,148 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import logging
import time
import threading
import kombu
from umbra.browser import BrowserPool
class AmqpBrowserController:
"""
Consumes amqp messages representing requests to browse urls, from the
specified amqp queue (default: "urls") on the specified amqp exchange
(default: "umbra"). Incoming amqp message is a json object with 3
attributes:
{
"clientId": "umbra.client.123",
"url": "http://example.com/my_fancy_page",
"metadata": {"arbitrary":"fields", "etc":4}
}
"url" is the url to browse.
"clientId" uniquely identifies the client of umbra. Umbra uses the clientId
as the amqp routing key, to direct information via amqp back to the client.
It sends this information on the same specified amqp exchange (default:
"umbra").
Each url requested in the browser is published to amqp this way. The
outgoing amqp message is a json object:
{
"url": "http://example.com/images/embedded_thing.jpg",
"method": "GET",
"headers": {"User-Agent": "...", "Accept": "...", ...},
"parentUrl": "http://example.com/my_fancy_page",
"parentUrlMetadata": {"arbitrary":"fields", "etc":4, ...}
}
POST requests have an additional field, postData.
"""
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f',
chrome_exe='chromium-browser', browser_wait=60,
max_active_browsers=1, queue_name='urls', routing_key='url',
exchange_name='umbra'):
self.amqp_url = amqp_url
self.queue_name = queue_name
self.routing_key = routing_key
self.exchange_name = exchange_name
self._browser_pool = BrowserPool(size=max_active_browsers,
chrome_exe=chrome_exe, chrome_wait=browser_wait)
def start(self):
self._exchange = kombu.Exchange(name=self.exchange_name, type='direct',
durable=True)
self._producer = None
self._producer_lock = threading.Lock()
with self._producer_lock:
self._producer_conn = kombu.Connection(self.amqp_url)
self._producer = self._producer_conn.Producer(serializer='json')
self._amqp_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread')
self._amqp_stop = threading.Event()
self._amqp_thread.start()
def shutdown(self):
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self._amqp_stop.set()
self._amqp_thread.join()
# with self._producer_lock:
# self._producer_conn.close()
# self._producer_conn = None
def shutdown_now(self):
self._browser_pool.shutdown_now()
def _consume_amqp(self):
# XXX https://webarchive.jira.com/browse/ARI-3811
# After running for some amount of time (3 weeks in the latest case),
# consumer looks normal but doesn't consume any messages. Not clear if
# it's hanging in drain_events() or not. As a temporary measure for
# mitigation (if it works) or debugging (if it doesn't work), close and
# reopen the connection every 15 minutes
RECONNECT_AFTER_SECONDS = 15 * 60
url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key,
exchange=self._exchange)
while not self._amqp_stop.is_set():
try:
self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url))
with kombu.Connection(self.amqp_url) as conn:
conn_opened = time.time()
with conn.Consumer(url_queue) as consumer:
consumer.qos(prefetch_count=1)
while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS):
import socket
try:
browser = self._browser_pool.acquire() # raises KeyError if none available
consumer.callbacks = [self._make_callback(browser)]
conn.drain_events(timeout=0.5)
consumer.callbacks = None
except KeyError:
# no browsers available
time.sleep(0.5)
except socket.timeout:
# no urls in the queue
self._browser_pool.release(browser)
except BaseException as e:
self.logger.error("amqp exception {}".format(e))
time.sleep(0.5)
self.logger.error("attempting to reopen amqp connection")
def _make_callback(self, browser):
def callback(body, message):
self._browse_page(browser, body['clientId'], body['url'], body['metadata'])
message.ack()
return callback
def _browse_page(self, browser, client_id, url, parent_url_metadata):
def on_request(chrome_msg):
payload = chrome_msg['params']['request']
payload['parentUrl'] = url
payload['parentUrlMetadata'] = parent_url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload))
with self._producer_lock:
publish = self._producer_conn.ensure(self._producer, self._producer.publish)
publish(payload, exchange=self._exchange, routing_key=client_id)
def browse_page_async():
self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url))
try:
browser.browse_page(url, on_request=on_request)
finally:
self._browser_pool.release(browser)
import random
threadName = "BrowsingThread{}-{}".format(browser.chrome_port,
''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6))))
threading.Thread(target=browse_page_async, name=threadName).start()