clean shutdown without draining entire amqp queue (only consume urls from amqp when browser activity isn't saturated)

This commit is contained in:
Noah Levitt 2014-05-20 03:02:48 -07:00
parent 3e4232f32c
commit b59e76a5b9
2 changed files with 16 additions and 10 deletions

View file

@ -27,13 +27,13 @@ if __name__=="__main__":
help='Executable to use to invoke chrome') help='Executable to use to invoke chrome')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to') help='URL identifying the amqp server to talk to')
arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3', arg_parser.add_argument('-n', '--max-workers', dest='max_browsers', default='3',
help='Max number of chrome instances simultaneously browsing pages') help='Max number of chrome instances simultaneously browsing pages')
args = arg_parser.parse_args(args=sys.argv[1:]) args = arg_parser.parse_args(args=sys.argv[1:])
controller = umbra.controller.AmqpBrowserController(args.amqp_url, controller = umbra.controller.AmqpBrowserController(args.amqp_url,
args.executable, args.browser_wait, args.executable, args.browser_wait,
max_active_workers=int(args.max_workers)) max_active_browsers=int(args.max_browsers))
try: try:
while True: while True:
time.sleep(0.5) time.sleep(0.5)

View file

@ -42,13 +42,13 @@ class AmqpBrowserController:
def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f',
chrome_exe='chromium-browser', browser_wait=60, chrome_exe='chromium-browser', browser_wait=60,
max_active_workers=1, queue_name='urls', routing_key='url', max_active_browsers=1, queue_name='urls', routing_key='url',
exchange_name='umbra'): exchange_name='umbra'):
self.amqp_url = amqp_url self.amqp_url = amqp_url
self.chrome_exe = chrome_exe self.chrome_exe = chrome_exe
self.browser_wait = browser_wait self.browser_wait = browser_wait
self.max_active_workers = max_active_workers self.max_active_browsers = max_active_browsers
self.queue_name = queue_name self.queue_name = queue_name
self.routing_key = routing_key self.routing_key = routing_key
self.exchange_name = exchange_name self.exchange_name = exchange_name
@ -62,7 +62,7 @@ class AmqpBrowserController:
self.browsers = {} self.browsers = {}
self.browsers_lock = threading.Lock() self.browsers_lock = threading.Lock()
self.num_active_workers = 0 self.num_active_browsers = 0
self.amqp_thread = threading.Thread(target=self._consume_amqp) self.amqp_thread = threading.Thread(target=self._consume_amqp)
self.amqp_stop = threading.Event() self.amqp_stop = threading.Event()
self.amqp_thread.start() self.amqp_thread.start()
@ -71,6 +71,9 @@ class AmqpBrowserController:
self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self.amqp_stop.set() self.amqp_stop.set()
self.amqp_thread.join() self.amqp_thread.join()
with self.producer_lock:
self.producer_conn.close()
self.producer_conn = None
def _consume_amqp(self): def _consume_amqp(self):
# XXX https://webarchive.jira.com/browse/ARI-3811 # XXX https://webarchive.jira.com/browse/ARI-3811
@ -91,7 +94,10 @@ class AmqpBrowserController:
import socket import socket
while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS):
try: try:
if self.num_active_browsers < self.max_active_browsers:
conn.drain_events(timeout=0.5) conn.drain_events(timeout=0.5)
else:
time.sleep(0.5)
except socket.timeout: except socket.timeout:
pass pass
except BaseException as e: except BaseException as e:
@ -126,15 +132,15 @@ class AmqpBrowserController:
self.logger.info('client_id={} body={}'.format(client_id, body)) self.logger.info('client_id={} body={}'.format(client_id, body))
while True: while True:
with self.browsers_lock: with self.browsers_lock:
if self.num_active_workers < self.max_active_workers: if self.num_active_browsers < self.max_active_browsers:
self.num_active_workers += 1 self.num_active_browsers += 1
break break
time.sleep(0.5) time.sleep(0.5)
browser.browse_page(body['url'], on_request=on_request) browser.browse_page(body['url'], on_request=on_request)
with self.browsers_lock: with self.browsers_lock:
self.num_active_workers -= 1 self.num_active_browsers -= 1
threading.Thread(target=browse_page_async).start() threading.Thread(target=browse_page_async).start()