diff --git a/bin/umbra b/bin/umbra index 2b503c1..43bc807 100755 --- a/bin/umbra +++ b/bin/umbra @@ -27,13 +27,13 @@ if __name__=="__main__": help='Executable to use to invoke chrome') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') - arg_parser.add_argument('-n', '--max-workers', dest='max_workers', default='3', + arg_parser.add_argument('-n', '--max-workers', dest='max_browsers', default='3', help='Max number of chrome instances simultaneously browsing pages') args = arg_parser.parse_args(args=sys.argv[1:]) controller = umbra.controller.AmqpBrowserController(args.amqp_url, args.executable, args.browser_wait, - max_active_workers=int(args.max_workers)) + max_active_browsers=int(args.max_browsers)) try: while True: time.sleep(0.5) diff --git a/umbra/controller.py b/umbra/controller.py index 69aab39..feecbea 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -40,15 +40,15 @@ class AmqpBrowserController: logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', + def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', chrome_exe='chromium-browser', browser_wait=60, - max_active_workers=1, queue_name='urls', routing_key='url', + max_active_browsers=1, queue_name='urls', routing_key='url', exchange_name='umbra'): self.amqp_url = amqp_url self.chrome_exe = chrome_exe self.browser_wait = browser_wait - self.max_active_workers = max_active_workers + self.max_active_browsers = max_active_browsers self.queue_name = queue_name self.routing_key = routing_key self.exchange_name = exchange_name @@ -62,7 +62,7 @@ class AmqpBrowserController: self.browsers = {} self.browsers_lock = threading.Lock() - self.num_active_workers = 0 + self.num_active_browsers = 0 self.amqp_thread = threading.Thread(target=self._consume_amqp) self.amqp_stop = threading.Event() self.amqp_thread.start() @@ -71,6 +71,9 @@ class AmqpBrowserController: self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) self.amqp_stop.set() self.amqp_thread.join() + with self.producer_lock: + self.producer_conn.close() + self.producer_conn = None def _consume_amqp(self): # XXX https://webarchive.jira.com/browse/ARI-3811 @@ -91,7 +94,10 @@ class AmqpBrowserController: import socket while (not self.amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): try: - conn.drain_events(timeout=0.5) + if self.num_active_browsers < self.max_active_browsers: + conn.drain_events(timeout=0.5) + else: + time.sleep(0.5) except socket.timeout: pass except BaseException as e: @@ -126,15 +132,15 @@ class AmqpBrowserController: self.logger.info('client_id={} body={}'.format(client_id, body)) while True: with self.browsers_lock: - if self.num_active_workers < self.max_active_workers: - self.num_active_workers += 1 + if self.num_active_browsers < self.max_active_browsers: + self.num_active_browsers += 1 break time.sleep(0.5) browser.browse_page(body['url'], on_request=on_request) with self.browsers_lock: - self.num_active_workers -= 1 + self.num_active_browsers -= 1 threading.Thread(target=browse_page_async).start()