make umbra amenable to clustering by using a pool of n browsers and removing the browser-clientId affinity (not useful currently since we start a fresh browser instance for each page browsed), and set prefetch_count=1 on amqp consumers to round-robin incoming urls among umbra instances

This commit is contained in:
Noah Levitt 2014-05-23 21:59:34 -07:00
parent 8d269f4c56
commit 2c4ba005b5
4 changed files with 94 additions and 70 deletions

View file

@ -5,7 +5,7 @@ import logging
import time
import threading
import kombu
from umbra.browser import Browser
from umbra.browser import BrowserPool
class AmqpBrowserController:
"""
@ -47,36 +47,35 @@ class AmqpBrowserController:
chrome_exe='chromium-browser', browser_wait=60,
max_active_browsers=1, queue_name='urls', routing_key='url',
exchange_name='umbra'):
self.amqp_url = amqp_url
self.chrome_exe = chrome_exe
self.browser_wait = browser_wait
self.max_active_browsers = max_active_browsers
self.queue_name = queue_name
self.routing_key = routing_key
self.exchange_name = exchange_name
self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', durable=True)
self.producer = None
self.producer_lock = threading.Lock()
with self.producer_lock:
self.producer_conn = kombu.Connection(self.amqp_url)
self.producer = self.producer_conn.Producer(serializer='json')
self._browser_pool = BrowserPool(size=max_active_browsers,
chrome_exe=chrome_exe, chrome_wait=browser_wait)
self.browsers = {}
self.browsers_lock = threading.Lock()
self.num_active_browsers = 0
self.amqp_thread = threading.Thread(target=self._consume_amqp)
self.amqp_stop = threading.Event()
self.amqp_thread.start()
def start(self):
self._exchange = kombu.Exchange(name=self.exchange_name, type='direct',
durable=True)
self._producer = None
self._producer_lock = threading.Lock()
with self._producer_lock:
self._producer_conn = kombu.Connection(self.amqp_url)
self._producer = self._producer_conn.Producer(serializer='json')
self._amqp_thread = threading.Thread(target=self._consume_amqp)
self._amqp_stop = threading.Event()
self._amqp_thread.start()
def shutdown(self):
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self.amqp_stop.set()
self.amqp_thread.join()
with self.producer_lock:
self.producer_conn.close()
self.producer_conn = None
self._amqp_stop.set()
self._amqp_thread.join()
with self._producer_lock:
self._producer_conn.close()
self._producer_conn = None
def _consume_amqp(self):
# XXX https://webarchive.jira.com/browse/ARI-3811
@ -87,66 +86,58 @@ class AmqpBrowserController:
# reopen the connection every 15 minutes
RECONNECT_AFTER_SECONDS = 15 * 60
while not self.amqp_stop.is_set():
browser = None
while not self._amqp_stop.is_set():
try:
url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, exchange=self._exchange)
self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url))
with kombu.Connection(self.amqp_url) as conn:
conn_opened = time.time()
with conn.Consumer(url_queue, callbacks=[self._browse_page_requested]) as consumer:
import socket
while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS):
with conn.Consumer(url_queue) as consumer:
consumer.qos(prefetch_count=1)
while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS):
import socket
try:
if self.num_active_browsers < self.max_active_browsers:
conn.drain_events(timeout=0.5)
else:
time.sleep(0.5)
except socket.timeout:
browser = self._browser_pool.acquire()
consumer.callbacks = [self._make_callback(browser)]
conn.drain_events(timeout=0.5)
consumer.callbacks = None
except KeyError:
# no browsers available
pass
except socket.timeout:
# no urls in the queue
self._browser_pool.release(browser)
except BaseException as e:
self.logger.error("amqp exception {}".format(e))
time.sleep(0.5)
self.logger.error("attempting to reopen amqp connection")
def _browse_page_requested(self, body, message):
def _make_callback(self, browser):
def callback(body, message):
self._browse_page(browser, body['clientId'], body['url'], body['metadata'])
message.ack()
return callback
def _browse_page(self, browser, client_id, url, parent_url_metadata):
"""Kombu Consumer callback. Provisions a Browser and
asynchronously asks it to browse the requested url."""
client_id = body['clientId']
def on_request(chrome_msg):
payload = chrome_msg['params']['request']
payload['parentUrl'] = body['url']
payload['parentUrlMetadata'] = body['metadata']
payload['parentUrl'] = url
payload['parentUrlMetadata'] = parent_url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.exchange_name, client_id, payload))
with self.producer_lock:
publish = self.producer_conn.ensure(self.producer, self.producer.publish)
with self._producer_lock:
publish = self._producer_conn.ensure(self._producer, self._producer.publish)
publish(payload, exchange=self._exchange, routing_key=client_id)
with self.browsers_lock:
if client_id in self.browsers:
browser = self.browsers[client_id]
else:
# XXX should reuse ports
port = 9222 + len(self.browsers)
browser = Browser(chrome_port=port, chrome_exe=self.chrome_exe,
chrome_wait=self.browser_wait)
self.browsers[client_id] = browser
def browse_page_async():
self.logger.info('client_id={} body={}'.format(client_id, body))
while True:
with self.browsers_lock:
if self.num_active_browsers < self.max_active_browsers:
self.num_active_browsers += 1
break
time.sleep(0.5)
browser.browse_page(body['url'], on_request=on_request)
with self.browsers_lock:
self.num_active_browsers -= 1
self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url))
browser.browse_page(url, on_request=on_request)
self._browser_pool.release(browser)
threading.Thread(target=browse_page_async).start()
message.ack()