diff --git a/bin/browse-url b/bin/browse-url index bbbc925..8697413 100755 --- a/bin/browse-url +++ b/bin/browse-url @@ -24,7 +24,7 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -browser = umbra.Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) -for url in args.urls: - browser.browse_page(url) +with umbra.Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) as browser: + for url in args.urls: + browser.browse_page(url) diff --git a/bin/umbra b/bin/umbra index 891e096..5175043 100755 --- a/bin/umbra +++ b/bin/umbra @@ -76,7 +76,7 @@ if __name__=="__main__": if th != threading.current_thread(): th.join() except BaseException as e: - logging.warn("caught {}".format(e)) + logging.warn("caught exception {}".format(e)) controller.shutdown_now() for th in threading.enumerate(): if th != threading.current_thread(): diff --git a/umbra/browser.py b/umbra/browser.py index a3eccb3..768cd1f 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -46,8 +46,10 @@ class BrowserPool: def shutdown_now(self): for browser in self._in_use: - browser.shutdown_now() + browser.abort_browse_page() +class BrowsingException(Exception): + pass class Browser: """Runs chrome/chromium to synchronously browse one page at a time using @@ -61,68 +63,87 @@ class Browser: def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60): self.command_id = itertools.count(1) - self._lock = threading.Lock() self.chrome_port = chrome_port self.chrome_exe = chrome_exe self.chrome_wait = chrome_wait self._behavior = None - self.websock = None - self._shutdown_now = False + self._websock = None + self._abort_browse_page = False def __repr__(self): return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port) - def shutdown_now(self): - self._shutdown_now = True + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + def start(self): + # these can raise exceptions + self._work_dir = tempfile.TemporaryDirectory() + self._chrome_instance = Chrome(self.chrome_port, self.chrome_exe, + self.chrome_wait, self._work_dir.name, + os.sep.join([self._work_dir.name, "chrome-user-data"])) + self._websocket_url = self._chrome_instance.start() + + def stop(self): + self._chrome_instance.stop() + self._work_dir.cleanup() + + def abort_browse_page(self): + self._abort_browse_page = True def browse_page(self, url, on_request=None): - """Synchronously browses a page and runs behaviors. First blocks to - acquire lock to ensure we only browse one page at a time.""" - with self._lock: - self.url = url - self.on_request = on_request - with tempfile.TemporaryDirectory() as user_home_dir: - with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_home_dir) as websocket_url: - self.websock = websocket.WebSocketApp(websocket_url, - on_open=self._visit_page, - on_message=self._handle_message) + """Synchronously browses a page and runs behaviors. - import random - threadName = "WebsockThread{}-{}".format(self.chrome_port, - ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) - websock_thread = threading.Thread(target=self.websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) - websock_thread.start() - start = time.time() + Raises BrowsingException if browsing the page fails in a non-critical + way. + """ + self.url = url + self.on_request = on_request - while True: - time.sleep(0.5) - if not self.websock or not self.websock.sock or not self.websock.sock.connected: - self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) - break - elif time.time() - start > Browser.HARD_TIMEOUT_SECONDS: - self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url)) - break - elif self._behavior != None and self._behavior.is_finished(): - self.logger.info("finished browsing page according to behavior url={}".format(self.url)) - break - elif self._shutdown_now: - self.logger.warn("immediate shutdown requested") - break + self._websock = websocket.WebSocketApp(self._websocket_url, + on_open=self._visit_page, on_message=self._handle_message) - try: - self.websock.close() - except BaseException as e: - self.logger.error("exception closing websocket {} - {}".format(self.websock, e)) + import random + threadName = "WebsockThread{}-{}".format(self.chrome_port, + ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) + websock_thread = threading.Thread(target=self._websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) + websock_thread.start() + start = time.time() + aborted = False - websock_thread.join() - self._behavior = None + try: + while True: + time.sleep(0.5) + if not self._websock or not self._websock.sock or not self._websock.sock.connected: + raise BrowsingException("websocket closed, did chrome die? {}".format(self._websock)) + elif time.time() - start > Browser.HARD_TIMEOUT_SECONDS: + self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url)) + return + elif self._behavior != None and self._behavior.is_finished(): + self.logger.info("finished browsing page according to behavior url={}".format(self.url)) + return + elif self._abort_browse_page: + raise BrowsingException("browsing page aborted") + finally: + if self._websock and self._websock.sock and self._websock.sock.connected: + try: + self._websock.close() + except BaseException as e: + self.logger.error("exception closing websocket {} - {}".format(self._websock, e)) + + websock_thread.join() + self._behavior = None def send_to_chrome(self, **kwargs): msg_id = next(self.command_id) kwargs['id'] = msg_id msg = json.dumps(kwargs) - self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) - self.websock.send(msg) + self.logger.debug('sending message to {}: {}'.format(self._websock, msg)) + self._websock.send(msg) return msg_id def _visit_page(self, websock): @@ -188,17 +209,27 @@ class Browser: class Chrome: logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, port, executable, browser_wait, user_home_dir): + def __init__(self, port, executable, browser_wait, user_home_dir, user_data_dir): self.port = port self.executable = executable self.browser_wait = browser_wait self.user_home_dir = user_home_dir + self.user_data_dir = user_data_dir # returns websocket url to chrome window with about:blank loaded def __enter__(self): + return self.start() + + def __exit__(self, *args): + self.stop() + + # returns websocket url to chrome window with about:blank loaded + def start(self): new_env = os.environ.copy() new_env["HOME"] = self.user_home_dir chrome_args = [self.executable, + "--use-mock-keychain", # mac thing + "--user-data-dir={}".format(self.user_data_dir), "--remote-debugging-port={}".format(self.port), "--disable-web-sockets", "--disable-cache", "--window-size=1100,900", "--no-default-browser-check", @@ -231,7 +262,7 @@ class Chrome: else: time.sleep(0.5) - def __exit__(self, *args): + def stop(self): timeout_sec = 60 self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid)) diff --git a/umbra/controller.py b/umbra/controller.py index 20316a6..17b9a61 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -5,7 +5,8 @@ import logging import time import threading import kombu -from umbra.browser import BrowserPool +import socket +from umbra.browser import BrowserPool, BrowsingException class AmqpBrowserController: """ @@ -56,6 +57,9 @@ class AmqpBrowserController: chrome_exe=chrome_exe, chrome_wait=browser_wait) def start(self): + self._browsing_threads = set() + self._browsing_threads_lock = threading.Lock() + self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', durable=True) @@ -65,20 +69,19 @@ class AmqpBrowserController: self._producer_conn = kombu.Connection(self.amqp_url) self._producer = self._producer_conn.Producer(serializer='json') - self._amqp_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') - self._amqp_stop = threading.Event() - self._amqp_thread.start() + self._consumer_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') + self._consumer_stop = threading.Event() + self._consumer_thread.start() def shutdown(self): self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self._amqp_stop.set() - self._amqp_thread.join() - # with self._producer_lock: - # self._producer_conn.close() - # self._producer_conn = None + self._consumer_stop.set() + self._consumer_thread.join() def shutdown_now(self): + self._consumer_stop.set() self._browser_pool.shutdown_now() + self._consumer_thread.join() def _consume_amqp(self): # XXX https://webarchive.jira.com/browse/ARI-3811 @@ -92,30 +95,51 @@ class AmqpBrowserController: url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, exchange=self._exchange) - while not self._amqp_stop.is_set(): + while not self._consumer_stop.is_set(): try: self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) with kombu.Connection(self.amqp_url) as conn: conn_opened = time.time() with conn.Consumer(url_queue) as consumer: consumer.qos(prefetch_count=1) - while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): - import socket + browser = None + while not self._consumer_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS: try: browser = self._browser_pool.acquire() # raises KeyError if none available + browser.start() consumer.callbacks = [self._make_callback(browser)] - conn.drain_events(timeout=0.5) + + while True: + try: + conn.drain_events(timeout=0.5) + break # out of "while True" to acquire another browser + except socket.timeout: + pass + + if self._consumer_stop.is_set() or time.time() - conn_opened >= RECONNECT_AFTER_SECONDS: + browser.stop() + self._browser_pool.release(browser) + break + + except KeyError: + # no browsers available + time.sleep(0.5) + except: + self.logger.critical("problem with browser initialization", exc_info=True) + time.sleep(0.5) + finally: consumer.callbacks = None - # browser startup is a heavy operation, so do - # it once every 5 seconds at most - time.sleep(5) - except KeyError: - # thrown by self._browser_pool.acquire() - no browsers available - time.sleep(0.5) - except socket.timeout: - # thrown by conn.drain_events(timeout=0.5) - no urls in the queue - self._browser_pool.release(browser) + # need to wait for browsers to finish here, before closing + # the amqp connection, because they use it to do + # message.ack() after they finish browsing a page + self.logger.info("waiting for browsing threads to finish") + while True: + with self._browsing_threads_lock: + if len(self._browsing_threads) == 0: + break + time.sleep(0.5) + self.logger.info("browsing threads finished") except BaseException as e: self.logger.error("caught exception {}".format(e), exc_info=True) @@ -124,11 +148,10 @@ class AmqpBrowserController: def _make_callback(self, browser): def callback(body, message): - self._browse_page(browser, body['clientId'], body['url'], body['metadata']) - message.ack() + self._start_browsing_page(browser, message, body['clientId'], body['url'], body['metadata']) return callback - def _browse_page(self, browser, client_id, url, parent_url_metadata): + def _start_browsing_page(self, browser, message, client_id, url, parent_url_metadata): def on_request(chrome_msg): payload = chrome_msg['params']['request'] payload['parentUrl'] = url @@ -142,12 +165,25 @@ class AmqpBrowserController: self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) try: browser.browse_page(url, on_request=on_request) - self._browser_pool.release(browser) + message.ack() + except BrowsingException as e: + self.logger.warn("browsing did not complete normally, requeuing url {} - {}".format(url, e)) + message.requeue() except: - self.logger.critical("problem browsing page, may have lost browser process", exc_info=True) + self.logger.critical("problem browsing page, requeuing url {}, may have lost browser process".format(url), exc_info=True) + message.requeue() + finally: + browser.stop() + self._browser_pool.release(browser) + + with self._browsing_threads_lock: + self._browsing_threads.remove(threading.current_thread()) import random threadName = "BrowsingThread{}-{}".format(browser.chrome_port, ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) - threading.Thread(target=browse_page_async, name=threadName).start() + th = threading.Thread(target=browse_page_async, name=threadName) + with self._browsing_threads_lock: + self._browsing_threads.add(th) + th.start()