get rid of --browser-wait and --routing-key in favor of sensible defaults, some other tweaks

This commit is contained in:
Noah Levitt 2014-06-11 10:58:08 -07:00
parent a78e60f1da
commit 025db91dea
3 changed files with 75 additions and 75 deletions

View file

@ -45,16 +45,13 @@ class AmqpBrowserController:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f',
chrome_exe='chromium-browser', browser_wait=60,
max_active_browsers=1, queue_name='urls', routing_key='url',
exchange_name='umbra'):
chrome_exe='chromium-browser', max_active_browsers=1,
queue_name='urls', exchange_name='umbra'):
self.amqp_url = amqp_url
self.queue_name = queue_name
self.routing_key = routing_key
self.exchange_name = exchange_name
self._browser_pool = BrowserPool(size=max_active_browsers,
chrome_exe=chrome_exe, chrome_wait=browser_wait)
self._browser_pool = BrowserPool(size=max_active_browsers, chrome_exe=chrome_exe)
def start(self):
self._browsing_threads = set()
@ -83,6 +80,50 @@ class AmqpBrowserController:
self._browser_pool.shutdown_now()
self._consumer_thread.join()
def _wait_for_and_browse_urls(self, conn, consumer, timeout):
start = time.time()
browser = None
consumer.qos(prefetch_count=1)
while not self._consumer_stop.is_set() and time.time() - start < timeout:
try:
browser = self._browser_pool.acquire() # raises KeyError if none available
browser.start()
def callback(body, message):
self._start_browsing_page(browser, message, body['clientId'], body['url'], body['metadata'])
consumer.callbacks = [callback]
while True:
try:
conn.drain_events(timeout=0.5)
break # out of "while True" to acquire another browser
except socket.timeout:
pass
if self._consumer_stop.is_set() or time.time() - start >= timeout:
browser.stop()
self._browser_pool.release(browser)
break
except KeyError:
# no browsers available
time.sleep(0.5)
except:
self.logger.critical("problem with browser initialization", exc_info=True)
time.sleep(0.5)
finally:
consumer.callbacks = None
def _wait_for_active_browsers(self):
self.logger.info("waiting for browsing threads to finish")
while True:
with self._browsing_threads_lock:
if len(self._browsing_threads) == 0:
break
time.sleep(0.5)
self.logger.info("active browsing threads finished")
def _consume_amqp(self):
# XXX https://webarchive.jira.com/browse/ARI-3811
# After running for some amount of time (3 weeks in the latest case),
@ -92,65 +133,24 @@ class AmqpBrowserController:
# reopen the connection every 15 minutes
RECONNECT_AFTER_SECONDS = 15 * 60
url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key,
exchange=self._exchange)
url_queue = kombu.Queue(self.queue_name, exchange=self._exchange)
while not self._consumer_stop.is_set():
try:
self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url))
with kombu.Connection(self.amqp_url) as conn:
conn_opened = time.time()
with conn.Consumer(url_queue) as consumer:
consumer.qos(prefetch_count=1)
browser = None
while not self._consumer_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS:
try:
browser = self._browser_pool.acquire() # raises KeyError if none available
browser.start()
consumer.callbacks = [self._make_callback(browser)]
while True:
try:
conn.drain_events(timeout=0.5)
break # out of "while True" to acquire another browser
except socket.timeout:
pass
if self._consumer_stop.is_set() or time.time() - conn_opened >= RECONNECT_AFTER_SECONDS:
browser.stop()
self._browser_pool.release(browser)
break
except KeyError:
# no browsers available
time.sleep(0.5)
except:
self.logger.critical("problem with browser initialization", exc_info=True)
time.sleep(0.5)
finally:
consumer.callbacks = None
self._wait_for_and_browse_urls(conn, consumer, timeout=RECONNECT_AFTER_SECONDS)
# need to wait for browsers to finish here, before closing
# the amqp connection, because they use it to do
# message.ack() after they finish browsing a page
self.logger.info("waiting for browsing threads to finish")
while True:
with self._browsing_threads_lock:
if len(self._browsing_threads) == 0:
break
time.sleep(0.5)
self.logger.info("browsing threads finished")
self._wait_for_active_browsers()
except BaseException as e:
self.logger.error("caught exception {}".format(e), exc_info=True)
time.sleep(0.5)
self.logger.error("attempting to reopen amqp connection")
def _make_callback(self, browser):
def callback(body, message):
self._start_browsing_page(browser, message, body['clientId'], body['url'], body['metadata'])
return callback
def _start_browsing_page(self, browser, message, client_id, url, parent_url_metadata):
def on_request(chrome_msg):
payload = chrome_msg['params']['request']
@ -161,7 +161,7 @@ class AmqpBrowserController:
publish = self._producer_conn.ensure(self._producer, self._producer.publish)
publish(payload, exchange=self._exchange, routing_key=client_id)
def browse_page_async():
def browse_page_sync():
self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url))
try:
browser.browse_page(url, on_request=on_request)
@ -176,13 +176,15 @@ class AmqpBrowserController:
browser.stop()
self._browser_pool.release(browser)
def browse_thread_run_then_cleanup():
browse_page_sync()
with self._browsing_threads_lock:
self._browsing_threads.remove(threading.current_thread())
import random
threadName = "BrowsingThread{}-{}".format(browser.chrome_port,
thread_name = "BrowsingThread{}-{}".format(browser.chrome_port,
''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6))))
th = threading.Thread(target=browse_page_async, name=threadName)
th = threading.Thread(target=browse_thread_run_then_cleanup, name=thread_name)
with self._browsing_threads_lock:
self._browsing_threads.add(th)
th.start()