configurable max number of instances of chrome simultaneously browsing pages (default=3); close and reopen connection to amqp every 15 minutes (consumer only); increase default browser wait to 60 sec

This commit is contained in:
Noah Levitt 2014-05-20 01:09:11 -07:00
parent cc0ffee508
commit 6fdcdd0bf0

View File

@ -32,7 +32,7 @@ class UmbraWorker:
HARD_TIMEOUT_SECONDS = 20 * 60
def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'):
def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60, client_id='request'):
self.command_id = itertools.count(1)
self.lock = threading.Lock()
self.umbra = umbra
@ -44,7 +44,8 @@ class UmbraWorker:
self.websock = None
def browse_page(self, url, url_metadata):
"""Synchronously browse a page and run behaviors."""
"""Synchronously browses a page and runs behaviors. First blocks to
acquire lock to ensure self only browses one page at a time."""
with self.lock:
self.url = url
self.url_metadata = url_metadata
@ -106,8 +107,9 @@ class UmbraWorker:
payload['parentUrlMetadata'] = self.url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange.name, self.client_id, payload))
with self.umbra.producer_lock:
self.umbra.producer.publish(payload,
exchange=self.umbra.umbra_exchange,
publish = self.umbra.producer_conn.ensure(self.umbra.producer,
self.umbra.producer.publish)
publish(payload, exchange=self.umbra.umbra_exchange,
routing_key=self.client_id)
def _handle_message(self, websock, message):
@ -167,7 +169,7 @@ class Umbra:
"url" is the url to browse.
"clientId" uniquely idenfities the client of
"clientId" uniquely identifies the client of
umbra. Umbra uses the clientId to direct information via amqp back to the
client. It sends this information on that same "umbra" exchange, and uses
the clientId as the amqp routing key.
@ -188,14 +190,22 @@ class Umbra:
logger = logging.getLogger('umbra.Umbra')
def __init__(self, amqp_url, chrome_exe, browser_wait):
def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', chrome_exe='chromium-browser', browser_wait=60, max_active_workers=1):
self.amqp_url = amqp_url
self.chrome_exe = chrome_exe
self.browser_wait = browser_wait
self.max_active_workers = max_active_workers
self.producer = None
self.producer_lock = None
self.producer_lock = threading.Lock()
with self.producer_lock:
self.producer_conn = kombu.Connection(self.amqp_url)
self.producer = self.producer_conn.Producer(serializer='json')
self.workers = {}
self.workers_lock = threading.Lock()
self.num_active_workers = 0
self.amqp_thread = threading.Thread(target=self._consume_amqp)
self.amqp_stop = threading.Event()
self.amqp_thread.start()
@ -206,19 +216,24 @@ class Umbra:
self.amqp_thread.join()
def _consume_amqp(self):
# XXX https://webarchive.jira.com/browse/ARI-3811
# After running for some amount of time (3 weeks in the latest case),
# consumer looks normal but doesn't consume any messages. Not clear if
# it's hanging in drain_events() or not. As a temporary measure for
# mitigation (if it works) or debugging (if it doesn't work), close and
# reopen the connection every 15 minutes
RECONNECT_AFTER_MIN = 15
while not self.amqp_stop.is_set():
try:
self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True)
url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange)
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
with kombu.Connection(self.amqp_url) as conn:
if self.producer_lock is None:
self.producer_lock = threading.Lock()
with self.producer_lock:
self.producer = conn.Producer(serializer='json')
conn_opened = time.time()
with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer:
import socket
while not self.amqp_stop.is_set():
while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_MIN * 60):
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
@ -228,11 +243,8 @@ class Umbra:
self.logger.error("attempting to reopen amqp connection")
def _browse_page_requested(self, body, message):
"""First waits for the UmbraWorker for the client body['clientId'] to
become available, or creates a new worker if this clientId has not been
served before. Starts a worker browsing the page asynchronously, then
acknowledges the amqp message, which lets the server know it can be
removed from the queue."""
"""Kombu Consumer callback. Provisions an UmbraWorker and
asynchronously asks it to browse the requested url."""
client_id = body['clientId']
with self.workers_lock:
if not client_id in self.workers:
@ -245,8 +257,18 @@ class Umbra:
def browse_page_async():
self.logger.info('client_id={} body={}'.format(client_id, body))
while True:
with self.workers_lock:
if self.num_active_workers < self.max_active_workers:
self.num_active_workers += 1
break
time.sleep(0.5)
self.workers[client_id].browse_page(body['url'], body['metadata'])
with self.workers_lock:
self.num_active_workers -= 1
threading.Thread(target=browse_page_async).start()
message.ack()
@ -270,10 +292,10 @@ class Chrome:
def __enter__(self):
chrome_args = [self.executable,
"--user-data-dir={}".format(self.user_data_dir),
"--remote-debugging-port=%s" % self.port,
"--remote-debugging-port={}".format(self.port),
"--disable-web-sockets", "--disable-cache",
"--window-size=1100,900", "--enable-logging",
"--no-default-browser-check", "--disable-first-run-ui", "--no-first-run",
"--window-size=1100,900", "--no-default-browser-check",
"--disable-first-run-ui", "--no-first-run",
"--homepage=about:blank", "about:blank"]
self.logger.info("running {}".format(chrome_args))
self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True)
@ -313,15 +335,17 @@ def main():
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
description='umbra - Browser automation tool',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='10',
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
help='Seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser',
help='Executable to use to invoke chrome')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3',
help='Max number of chrome instances simultaneously browsing pages')
args = arg_parser.parse_args(args=sys.argv[1:])
umbra = Umbra(args.amqp_url, args.executable, args.browser_wait)
umbra = Umbra(args.amqp_url, args.executable, args.browser_wait, max_active_workers=int(args.max_workers))
try:
while True:
time.sleep(0.5)