update readme, s/umbra/brozzler/ in most places, delete non-brozzler stuff

This commit is contained in:
Noah Levitt 2015-07-13 17:09:39 -07:00
parent 3eff099b16
commit fd0c3322ee
27 changed files with 41 additions and 720 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
*.pyc
*.diff
.*.sw*
/umbra.egg-info/
/brozzler.egg-info/

View File

@ -1,43 +1,16 @@
umbra
=====
Umbra is a browser automation tool, developed for the web archiving service
https://archive-it.org/.
brozzler
========
"browser" ^ "crawler" = "brozzler"
Umbra receives urls via AMQP. It opens them in the chrome or chromium browser,
with which it communicates using the chrome remote debug protocol (see
https://developer.chrome.com/devtools/docs/debugger-protocol). It runs
javascript behaviors to simulate user interaction with the page. It publishes
information about the the urls requested by the browser back to AMQP. The
format of the incoming and outgoing AMQP messages is described in `pydoc
umbra.controller`.
Brozzler is a distributed web crawler that uses a real browser (chrome or
chromium) to fetch pages and embedded urls and to extract links.
Umbra can be used with the Heritrix web crawler, using these heritrix modules:
* [AMQPUrlReceiver](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java)
* [AMQPPublishProcessor](https://github.com/internetarchive/heritrix3/blob/master/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java)
Install
------
Install via pip from this repo, e.g.
pip install git+https://github.com/internetarchive/umbra.git
Umbra requires an AMQP messaging service like RabbitMQ. On Ubuntu,
`sudo apt-get install rabbitmq-server` will install and start RabbitMQ at amqp://guest:guest@localhost:5672/%2f, which is the default AMQP url for umbra.
Run
---
The command `umbra` will start umbra with default configuration. `umbra --help`
describes all command line options.
Umbra also comes with these utilities:
* browse-url - open urls in chrome/chromium and run behaviors (without involving AMQP)
* queue-url - send url to umbra via AMQP
* drain-queue - consume messages from AMQP queue
It is forked from https://github.com/internetarchive/umbra.
License
-------
Copyright 2014 Internet Archive
Copyright 2015 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this software except in compliance with the License.

View File

@ -5,7 +5,7 @@ import argparse
import os
import sys
import logging
import umbra
import brozzler
import re
import datetime
@ -20,14 +20,14 @@ arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromi
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
with umbra.Browser(chrome_exe=args.chrome_exe) as browser:
with brozzler.Browser(chrome_exe=args.chrome_exe) as browser:
for url in args.urls:
def on_screenshot(screenshot_png):

4
bin/brozzler-add-site Normal file → Executable file
View File

@ -5,7 +5,7 @@ import argparse
import os
import sys
import logging
import umbra
import brozzler
import kombu
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
@ -17,7 +17,7 @@ arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:gu
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,

18
bin/brozzler-hq Normal file → Executable file
View File

@ -5,14 +5,14 @@ import argparse
import os
import sys
import logging
import umbra
import brozzler
import surt
import sqlite3
import time
import kombu
import kombu.simple
import json
import umbra.hq
import brozzler.hq
import signal
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
@ -25,7 +25,7 @@ arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:gu
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
@ -101,7 +101,7 @@ class BrozzlerHQDb:
break
site_dict = json.loads(row[1])
site_dict["id"] = row[0]
yield umbra.hq.Site(**site_dict)
yield brozzler.hq.Site(**site_dict)
def update_crawl_url(self, crawl_url):
cursor = self._conn.cursor()
@ -111,7 +111,7 @@ class BrozzlerHQDb:
if row:
# (id, priority, existing_crawl_url) = row
new_priority = crawl_url.calc_priority() + row[1]
existing_crawl_url = umbra.CrawlUrl(**json.loads(row[2]))
existing_crawl_url = brozzler.CrawlUrl(**json.loads(row[2]))
existing_crawl_url.hops_from_seed = min(crawl_url.hops_from_seed, existing_crawl_url.hops_from_seed)
cursor.execute("update brozzler_urls set priority=?, crawl_url_json=? where id=?", (new_priority, existing_crawl_url.to_json(), row[0]))
@ -145,7 +145,7 @@ class BrozzlerHQ:
def _new_site(self):
try:
msg = self._new_sites_q.get(block=False)
new_site = umbra.hq.Site(**msg.payload)
new_site = brozzler.hq.Site(**msg.payload)
msg.ack()
self.logger.info("new site {}".format(new_site))
@ -153,7 +153,7 @@ class BrozzlerHQ:
new_site.id = site_id
if new_site.is_permitted_by_robots(new_site.seed):
crawl_url = umbra.CrawlUrl(new_site.seed, site_id=new_site.id, hops_from_seed=0)
crawl_url = brozzler.CrawlUrl(new_site.seed, site_id=new_site.id, hops_from_seed=0)
self._db.schedule_url(crawl_url, priority=1000)
self._unclaimed_sites_q.put(new_site.to_dict())
else:
@ -176,7 +176,7 @@ class BrozzlerHQ:
for url in parent_url.outlinks:
if site.is_in_scope(url):
if site.is_permitted_by_robots(url):
crawl_url = umbra.CrawlUrl(url, site_id=site.id, hops_from_seed=parent_url.hops_from_seed+1)
crawl_url = brozzler.CrawlUrl(url, site_id=site.id, hops_from_seed=parent_url.hops_from_seed+1)
try:
self._db.update_crawl_url(crawl_url)
counts["updated"] += 1
@ -196,7 +196,7 @@ class BrozzlerHQ:
q = self._conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id))
try:
msg = q.get(block=False)
completed_url = umbra.CrawlUrl(**msg.payload)
completed_url = brozzler.CrawlUrl(**msg.payload)
msg.ack()
self._db.completed(completed_url)
self._scope_and_schedule_outlinks(site, completed_url)

View File

@ -5,13 +5,13 @@ import argparse
import os
import sys
import logging
import umbra
import brozzler
import threading
import time
import surt
import signal
import kombu
from umbra import hq
from brozzler import hq
import pprint
import traceback
import youtube_dl
@ -32,7 +32,7 @@ arg_parser.add_argument('--ignore-certificate-errors', dest='ignore_cert_errors'
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
@ -46,7 +46,7 @@ def next_url(site):
q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
msg = q.get(block=True, timeout=0.5)
crawl_url_dict = msg.payload
crawl_url = umbra.CrawlUrl(**crawl_url_dict)
crawl_url = brozzler.CrawlUrl(**crawl_url_dict)
msg.ack()
return crawl_url
@ -87,7 +87,7 @@ def brozzle_site(site, browser):
pass
# except kombu.simple.Empty:
# logging.info("finished {} (queue is empty)".format(site))
except umbra.browser.BrowsingAborted:
except brozzler.browser.BrowsingAborted:
logging.info("{} shut down".format(browser))
finally:
disclaim_site(site)
@ -116,7 +116,7 @@ signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
browser_pool = umbra.browser.BrowserPool(int(args.max_browsers),
browser_pool = brozzler.browser.BrowserPool(int(args.max_browsers),
chrome_exe=args.chrome_exe, proxy_server=args.proxy_server,
ignore_cert_errors=args.ignore_cert_errors)

View File

@ -1,182 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import argparse
import os
import sys
import logging
import umbra
import threading
import time
import sortedcontainers
import surt
import signal
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='crawl-url - browse urls, follow links',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse')
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
help='seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='executable to use to invoke chrome')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1',
help='max number of chrome instances simultaneously browsing pages')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
class CrawlUrl:
def __init__(self, url, priority=None, hops_from_seed=0):
self.url = url
self.hops_from_seed = hops_from_seed
self._surt = None
if priority:
self.set_priority(priority)
else:
self.set_priority(self.calc_priority())
def set_priority(self, priority):
# priority_key is both a sortable priority (higher value is higher
# priority) and a unique hash key
self.priority_key = (priority << 32) | (hash(self.surt) & (2**32 - 1))
def calc_priority(self):
priority = 0
priority += max(0, 10 - self.hops_from_seed)
priority += max(0, 6 - self.surt.count('/'))
return priority
def __repr__(self):
return """CrawlUrl(url="{}",priority={},hops_from_seed={})""".format(self.url, self.priority, self.hops_from_seed)
@property
def surt(self):
if self._surt is None:
self._surt = surt.surt(self.url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
return self._surt
@property
def priority(self):
return self.priority_key >> 32
class CrawlUrlQueue:
def __init__(self):
# {priority_key:CrawlUrl}
self._pq = sortedcontainers.SortedDict()
# {surt:CrawlUrl}
self._urls = {}
self.aggregate_priority = 0
def __len__(self):
assert len(self._urls) == len(self._pq)
return len(self._urls)
def schedule(self, crawl_url):
self.aggregate_priority += crawl_url.priority
try:
old_priority_key = self._urls.pop(crawl_url.surt).priority_key
old_crawl_url = self._pq.pop(old_priority_key)
# XXX dumb calculation of new priority, may not belong here
crawl_url.set_priority(crawl_url.priority + old_crawl_url.priority)
crawl_url.hops_from_seed = min(old_crawl_url.hops_from_seed, crawl_url.hops_from_seed)
except KeyError:
pass
self._urls[crawl_url.surt] = crawl_url
self._pq[crawl_url.priority_key] = crawl_url
def next_url(self):
res0 = self._pq.popitem(last=True)[1]
res1 = self._urls.pop(res0.surt)
assert res0 is res1
new_low_priority = CrawlUrl(res0.url, priority=-1000, hops_from_seed=res0.hops_from_seed)
self.schedule(new_low_priority)
return res0
class Site:
"""A seed url, scope definition, and prioritized url queue."""
def __init__(self, seed_url):
self.seed = CrawlUrl(seed_url, priority=1000)
self.q = CrawlUrlQueue()
self.q.schedule(self.seed)
def is_in_scope(self, url):
surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
return surtt.startswith(self.seed.surt)
def submit(self, urls, hops_from_seed):
for url in urls:
if self.is_in_scope(url):
logging.debug("{} accepted {}".format(self.seed.surt, url))
crawl_url = CrawlUrl(url, hops_from_seed=hops_from_seed)
self.q.schedule(crawl_url)
else:
logging.debug("{} rejected {}".format(self.seed.surt, url))
browsers = []
browsers_lock = threading.Lock()
# "browse" + "crawl" = "brozzle"
def brozzle_site(site, chrome_port):
with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser:
with browsers_lock:
browsers.append(browser)
try:
while True:
crawl_url = site.q.next_url()
logging.info("crawling {}".format(crawl_url))
outlinks = browser.browse_page(crawl_url.url)
site.submit(outlinks, hops_from_seed=crawl_url.hops_from_seed+1)
except umbra.browser.BrowsingAborted:
pass
class ShutdownRequested(Exception):
pass
def sigterm(signum, frame):
raise ShutdownRequested('shutdown requested (caught SIGTERM)')
def sigint(signum, frame):
raise ShutdownRequested('shutdown requested (caught SIGINT)')
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
chrome_port = 9200
for seed_url in args.urls:
site = Site(seed_url)
th = threading.Thread(target=lambda: brozzle_site(site, chrome_port),
name="BrowsingThread-{}".format(site.seed.surt))
th.start()
chrome_port += 1
try:
while True:
time.sleep(0.5)
except ShutdownRequested as e:
pass
logging.info("shutting down browsers")
with browsers_lock:
for browser in browsers:
browser.abort_browse_page()
for th in threading.enumerate():
if th != threading.current_thread():
th.join()
logging.info("all done, exiting")

View File

@ -1,55 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import os
import sys
import argparse
import logging
import socket
import umbra
from kombu import Connection, Exchange, Queue
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='drain-queue - consume messages from AMQP queue',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the AMQP server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--queue', dest='amqp_queue', default='urls',
help='AMQP queue name')
arg_parser.add_argument('-n', '--no-ack', dest='no_ack', action="store_const",
default=False, const=True, help="leave messages on the queue (default: remove them from the queue)")
arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store_const",
default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)")
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stderr, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
def print_and_maybe_ack(body, message):
# do this instead of print(body) so that output syntax is json, not python
# dict (they are similar but not identical)
print(message.body.decode('utf-8'))
if not args.no_ack:
message.ack()
exchange = Exchange(args.amqp_exchange, 'direct', durable=True)
queue = Queue(args.amqp_queue, exchange=exchange)
try:
with Connection(args.amqp_url) as conn:
with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer:
consumer.qos(prefetch_count=1)
while True:
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
if not args.run_forever:
logging.debug("exiting, no messages left on the queue")
break
except KeyboardInterrupt:
logging.debug("exiting, stopped by user")

View File

@ -1,38 +0,0 @@
#!/usr/bin/python3.4
# vim: set sw=4 et:
import os
import sys
import argparse
import logging
import umbra
import json
from kombu import Connection, Exchange, Queue
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='queue-json - send json message to umbra via AMQP',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the AMQP server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url',
help='AMQP routing key')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
arg_parser.add_argument('payload_json', metavar='JSON_PAYLOAD', help='json payload to send to umbra')
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
payload = json.loads(args.payload_json)
exchange = Exchange(args.amqp_exchange, 'direct', durable=True)
with Connection(args.amqp_url) as conn:
producer = conn.Producer(serializer='json')
logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload))
producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange)

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import os
import sys
import argparse
import logging
import umbra
from kombu import Connection, Exchange, Queue
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='queue-url - send url to umbra via AMQP',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the AMQP server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='urls',
help='AMQP routing key')
arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0',
help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra')
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
exchange = Exchange(args.amqp_exchange, 'direct', durable=True)
with Connection(args.amqp_url) as conn:
producer = conn.Producer(serializer='json')
for url in args.urls:
payload = {'url': url, 'metadata': {}, 'clientId': args.client_id}
logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload))
producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange)

123
bin/umbra
View File

@ -1,123 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import logging
import argparse
import time
import umbra
import sys
import signal
import os
import umbra
import signal
import threading
import traceback
import pprint
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='umbra - browser automation tool communicating via AMQP',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='Executable to use to invoke chrome')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra',
help='AMQP exchange name')
arg_parser.add_argument('--queue', dest='amqp_queue', default='urls',
help='AMQP queue to consume urls from')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1',
help='Max number of chrome instances simultaneously browsing pages')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
version="umbra {}".format(umbra.version))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.info("umbra {} starting up".format(umbra.version))
controller = umbra.Umbra(args.amqp_url, args.chrome_exe,
max_active_browsers=int(args.max_browsers),
exchange_name=args.amqp_exchange, queue_name=args.amqp_queue)
def browserdump_str(pp, b):
x = []
x.append(pp.pformat(b.__dict__))
if b._chrome_instance:
x.append("=> {} chrome instance:".format(b))
x.append(pp.pformat(b._chrome_instance.__dict__))
if b._behavior:
x.append("=> {} active behavior:".format(b))
x.append(pp.pformat(b._behavior.__dict__))
return "\n".join(x)
def dump_state(signum, frame):
pp = pprint.PrettyPrinter(indent=4)
state_strs = []
for th in threading.enumerate():
state_strs.append(str(th))
stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append("".join(stack))
state_strs.append("umbra controller:")
state_strs.append(pp.pformat(controller.__dict__))
state_strs.append("")
for b in controller._browser_pool._in_use:
state_strs.append("{} (in use):".format(b))
state_strs.append(browserdump_str(pp, b))
state_strs.append("")
for b in controller._browser_pool._available:
state_strs.append("{} (not in use):".format(b))
state_strs.append(browserdump_str(pp, b))
state_strs.append("")
logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)))
class ShutdownRequested(Exception):
pass
def sigterm(signum, frame):
raise ShutdownRequested('shutdown requested (caught SIGTERM)')
def sigint(signum, frame):
raise ShutdownRequested('shutdown requested (caught SIGINT)')
signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGHUP, controller.reconnect)
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
controller.start()
try:
while True:
time.sleep(0.5)
except ShutdownRequested as e:
logging.info(e)
except BaseException as e:
logging.fatal(e, exc_info=sys.exc_info())
finally:
try:
controller.shutdown()
for th in threading.enumerate():
if th != threading.current_thread():
th.join()
except BaseException as e:
logging.warn("caught exception {}".format(e))
for i in range(6,0,-1):
controller.shutdown_now()
try:
for th in threading.enumerate():
if th != threading.current_thread():
th.join()
break # if we get here, we're done, all threads finished
except:
logging.warn("caught exception {}".format(e))
logging.info("all finished, exiting")

View File

@ -1,7 +1,5 @@
from umbra.browser import Browser
from umbra.controller import AmqpBrowserController
from umbra.url import CrawlUrl
Umbra = AmqpBrowserController
from brozzler.browser import Browser
from brozzler.url import CrawlUrl
def _read_version():
import os

View File

@ -15,7 +15,7 @@ import os
import socket
import base64
import random
from umbra.behaviors import Behavior
from brozzler.behaviors import Behavior
class BrowserPool:
logger = logging.getLogger(__module__ + "." + __qualname__)

View File

@ -2,6 +2,5 @@ kombu
websocket-client-py3==0.13.1
argparse
PyYAML
sortedcontainers
git+https://github.com/ikreymer/surt.git@py3
youtube_dl

View File

@ -18,27 +18,27 @@ def full_version_bytes():
return VERSION_BYTES
version_bytes = full_version_bytes()
with open('umbra/version.txt', 'wb') as out:
with open('brozzler/version.txt', 'wb') as out:
out.write(version_bytes)
out.write(b'\n');
setuptools.setup(name='umbra',
setuptools.setup(name='brozzler',
version=version_bytes.decode('utf-8'),
description='Browser automation via chrome debug protocol',
url='https://github.com/internetarchive/umbra',
author='Eldon Stegall',
author_email='eldon@archive.org',
description='Distributed web crawling with browsers',
url='https://github.com/nlevitt/brozzler',
author='Noah Levitt',
author_email='nlevitt@archive.org',
long_description=open('README.md').read(),
license='Apache License 2.0',
packages=['umbra'],
package_data={'umbra':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']},
packages=['brozzler'],
package_data={'brozzler':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']},
scripts=glob.glob('bin/*'),
zip_safe=False,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Development Status :: 3 - Alpha',
'Environment :: Console',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Topic :: Internet :: WWW/HTTP',
'Topic :: System :: Archiving',
])

View File

@ -1,212 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import logging
import time
import threading
import kombu
import socket
from umbra.browser import BrowserPool, BrowsingException
class AmqpBrowserController:
"""
Consumes amqp messages representing requests to browse urls, from the
specified amqp queue (default: "urls") on the specified amqp exchange
(default: "umbra"). Incoming amqp message is a json object with 3
attributes:
{
"clientId": "umbra.client.123",
"url": "http://example.com/my_fancy_page",
"metadata": {"arbitrary":"fields", "etc":4}
}
"url" is the url to browse.
"clientId" uniquely identifies the client of umbra. Umbra uses the clientId
as the amqp routing key, to direct information via amqp back to the client.
It sends this information on the same specified amqp exchange (default:
"umbra").
Each url requested in the browser is published to amqp this way. The
outgoing amqp message is a json object:
{
"url": "http://example.com/images/embedded_thing.jpg",
"method": "GET",
"headers": {"User-Agent": "...", "Accept": "...", ...},
"parentUrl": "http://example.com/my_fancy_page",
"parentUrlMetadata": {"arbitrary":"fields", "etc":4, ...}
}
POST requests have an additional field, postData.
"""
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f',
chrome_exe='chromium-browser', max_active_browsers=1,
queue_name='urls', exchange_name='umbra'):
self.amqp_url = amqp_url
self.queue_name = queue_name
self.exchange_name = exchange_name
self.max_active_browsers = max_active_browsers
self._browser_pool = BrowserPool(size=max_active_browsers, chrome_exe=chrome_exe)
def start(self):
self._browsing_threads = set()
self._browsing_threads_lock = threading.Lock()
self._exchange = kombu.Exchange(name=self.exchange_name, type='direct',
durable=True)
self._reconnect_requested = False
self._producer = None
self._producer_lock = threading.Lock()
with self._producer_lock:
self._producer_conn = kombu.Connection(self.amqp_url)
self._producer = self._producer_conn.Producer(serializer='json')
self._consumer_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread')
self._consumer_stop = threading.Event()
self._consumer_thread.start()
def shutdown(self):
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self._consumer_stop.set()
self._consumer_thread.join()
def shutdown_now(self):
self._consumer_stop.set()
self._browser_pool.shutdown_now()
self._consumer_thread.join()
def reconnect(self, *args, **kwargs):
self._reconnect_requested = True
self._browser_pool.shutdown_now()
def _wait_for_and_browse_urls(self, conn, consumer, timeout):
start = time.time()
browser = None
consumer.qos(prefetch_count=self.max_active_browsers)
while not self._consumer_stop.is_set() and time.time() - start < timeout and not self._reconnect_requested:
try:
browser = self._browser_pool.acquire() # raises KeyError if none available
browser.start()
def callback(body, message):
try:
client_id, url, metadata = body['clientId'], body['url'], body['metadata']
except:
self.logger.error("unable to decipher message {}".format(message), exc_info=True)
self.logger.error("discarding bad message")
message.reject()
browser.stop()
self._browser_pool.release(browser)
return
self._start_browsing_page(browser, message, client_id, url, metadata)
consumer.callbacks = [callback]
while True:
try:
conn.drain_events(timeout=0.5)
break # out of "while True" to acquire another browser
except socket.timeout:
pass
except socket.error:
self.logger.error("problem consuming messages from AMQP, will try reconnecting after active browsing finishes", exc_info=True)
self._reconnect_requested = True
if self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested:
browser.stop()
self._browser_pool.release(browser)
break
except KeyError:
# no browsers available
time.sleep(0.5)
except:
self.logger.critical("problem with browser initialization", exc_info=True)
time.sleep(0.5)
finally:
consumer.callbacks = None
def _wait_for_active_browsers(self):
self.logger.info("waiting for browsing threads to finish")
while True:
with self._browsing_threads_lock:
if len(self._browsing_threads) == 0:
break
time.sleep(0.5)
self.logger.info("active browsing threads finished")
def _consume_amqp(self):
# XXX https://webarchive.jira.com/browse/ARI-3811
# After running for some amount of time (3 weeks in the latest case),
# consumer looks normal but doesn't consume any messages. Not clear if
# it's hanging in drain_events() or not. As a temporary measure for
# mitigation (if it works) or debugging (if it doesn't work), close and
# reopen the connection every 2.5 hours
RECONNECT_AFTER_SECONDS = 150 * 60
url_queue = kombu.Queue(self.queue_name, exchange=self._exchange)
while not self._consumer_stop.is_set():
try:
self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url))
self._reconnect_requested = False
with kombu.Connection(self.amqp_url) as conn:
with conn.Consumer(url_queue) as consumer:
self._wait_for_and_browse_urls(conn, consumer, timeout=RECONNECT_AFTER_SECONDS)
# need to wait for browsers to finish here, before closing
# the amqp connection, because they use it to do
# message.ack() after they finish browsing a page
self._wait_for_active_browsers()
except BaseException as e:
self.logger.error("caught exception {}".format(e), exc_info=True)
time.sleep(0.5)
self.logger.error("attempting to reopen amqp connection")
def _start_browsing_page(self, browser, message, client_id, url, parent_url_metadata):
def on_request(chrome_msg):
payload = chrome_msg['params']['request']
payload['parentUrl'] = url
payload['parentUrlMetadata'] = parent_url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload))
with self._producer_lock:
publish = self._producer_conn.ensure(self._producer, self._producer.publish)
publish(payload, exchange=self._exchange, routing_key=client_id)
def browse_page_sync():
self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url))
try:
browser.browse_page(url, on_request=on_request)
message.ack()
except BrowsingException as e:
self.logger.warn("browsing did not complete normally, requeuing url {} - {}".format(url, e))
message.requeue()
except:
self.logger.critical("problem browsing page, requeuing url {}, may have lost browser process".format(url), exc_info=True)
message.requeue()
finally:
browser.stop()
self._browser_pool.release(browser)
def browse_thread_run_then_cleanup():
browse_page_sync()
with self._browsing_threads_lock:
self._browsing_threads.remove(threading.current_thread())
import random
thread_name = "BrowsingThread{}-{}".format(browser.chrome_port,
''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6))))
th = threading.Thread(target=browse_thread_run_then_cleanup, name=thread_name)
with self._browsing_threads_lock:
self._browsing_threads.add(th)
th.start()