diff --git a/bin/browse-url b/bin/browse-url index bbbc925..ebfa906 100755 --- a/bin/browse-url +++ b/bin/browse-url @@ -24,7 +24,7 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -browser = umbra.Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) -for url in args.urls: - browser.browse_page(url) +with umbra.Browser(chrome_exe=args.chrome_exe) as browser: + for url in args.urls: + browser.browse_page(url) diff --git a/bin/umbra b/bin/umbra index 891e096..63da89d 100755 --- a/bin/umbra +++ b/bin/umbra @@ -11,16 +11,13 @@ import os import umbra import signal import threading +import traceback +import pprint -if __name__=="__main__": - import faulthandler - faulthandler.register(signal.SIGQUIT) - +if __name__ == "__main__": arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='umbra - browser automation tool communicating via AMQP', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', - help='Seconds to wait for browser initialization') arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='Executable to use to invoke chrome') arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', @@ -29,9 +26,7 @@ if __name__=="__main__": help='AMQP exchange name') arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', help='AMQP queue to consume urls from') - arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', - help='AMQP routing key to bind to the AMQP queue') - arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='3', + arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', help='Max number of chrome instances simultaneously browsing pages') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) @@ -44,10 +39,45 @@ if __name__=="__main__": logging.info("umbra {} starting up".format(umbra.version)) - controller = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait, + controller = umbra.Umbra(args.amqp_url, args.chrome_exe, max_active_browsers=int(args.max_browsers), - exchange_name=args.amqp_exchange, queue_name=args.amqp_queue, - routing_key=args.amqp_routing_key) + exchange_name=args.amqp_exchange, queue_name=args.amqp_queue) + + def browserdump_str(pp, b): + x = [] + x.append(pp.pformat(b.__dict__)) + if b._chrome_instance: + x.append("=> {} chrome instance:".format(b)) + x.append(pp.pformat(b._chrome_instance.__dict__)) + if b._behavior: + x.append("=> {} active behavior:".format(b)) + x.append(pp.pformat(b._behavior.__dict__)) + return "\n".join(x) + + def dump_state(signum, frame): + pp = pprint.PrettyPrinter(indent=4) + state_strs = [] + + for th in threading.enumerate(): + state_strs.append(str(th)) + stack = traceback.format_stack(sys._current_frames()[th.ident]) + state_strs.append("".join(stack)) + + state_strs.append("umbra controller:") + state_strs.append(pp.pformat(controller.__dict__)) + state_strs.append("") + + for b in controller._browser_pool._in_use: + state_strs.append("{} (in use):".format(b)) + state_strs.append(browserdump_str(pp, b)) + state_strs.append("") + for b in controller._browser_pool._available: + state_strs.append("{} (not in use):".format(b)) + state_strs.append(browserdump_str(pp, b)) + state_strs.append("") + + logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) + class ShutdownRequested(Exception): pass @@ -57,6 +87,8 @@ if __name__=="__main__": def sigint(signum, frame): raise ShutdownRequested('shutdown requested (caught SIGINT)') + signal.signal(signal.SIGQUIT, dump_state) + signal.signal(signal.SIGHUP, controller.reconnect) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) @@ -68,7 +100,7 @@ if __name__=="__main__": except ShutdownRequested as e: logging.info(e) except BaseException as e: - logging.fatal(e) + logging.fatal(e, exc_info=sys.exc_info()) finally: try: controller.shutdown() @@ -76,11 +108,16 @@ if __name__=="__main__": if th != threading.current_thread(): th.join() except BaseException as e: - logging.warn("caught {}".format(e)) - controller.shutdown_now() - for th in threading.enumerate(): - if th != threading.current_thread(): - th.join() + logging.warn("caught exception {}".format(e)) + for i in range(6,0,-1): + controller.shutdown_now() + try: + for th in threading.enumerate(): + if th != threading.current_thread(): + th.join() + break # if we get here, we're done, all threads finished + except: + logging.warn("caught exception {}".format(e)) logging.info("all finished, exiting") diff --git a/umbra/browser.py b/umbra/browser.py index a3eccb3..dd042d2 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -20,12 +20,12 @@ class BrowserPool: BASE_PORT = 9200 - def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60): + def __init__(self, size=3, chrome_exe='chromium-browser'): self._available = set() self._in_use = set() for i in range(0, size): - browser = Browser(BrowserPool.BASE_PORT + i, chrome_exe, chrome_wait) + browser = Browser(BrowserPool.BASE_PORT + i, chrome_exe) self._available.add(browser) self._lock = threading.Lock() @@ -46,8 +46,10 @@ class BrowserPool: def shutdown_now(self): for browser in self._in_use: - browser.shutdown_now() + browser.abort_browse_page() +class BrowsingException(Exception): + pass class Browser: """Runs chrome/chromium to synchronously browse one page at a time using @@ -59,70 +61,95 @@ class Browser: HARD_TIMEOUT_SECONDS = 20 * 60 - def __init__(self, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=60): + def __init__(self, chrome_port=9222, chrome_exe='chromium-browser'): self.command_id = itertools.count(1) - self._lock = threading.Lock() self.chrome_port = chrome_port self.chrome_exe = chrome_exe - self.chrome_wait = chrome_wait self._behavior = None - self.websock = None - self._shutdown_now = False + self._websock = None + self._abort_browse_page = False + self._chrome_instance = None def __repr__(self): return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port) - def shutdown_now(self): - self._shutdown_now = True + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + def start(self): + # these can raise exceptions + self._work_dir = tempfile.TemporaryDirectory() + self._chrome_instance = Chrome(self.chrome_port, self.chrome_exe, + self._work_dir.name, os.sep.join([self._work_dir.name, "chrome-user-data"])) + self._websocket_url = self._chrome_instance.start() + + def stop(self): + self._chrome_instance.stop() + self._work_dir.cleanup() + + def abort_browse_page(self): + self._abort_browse_page = True def browse_page(self, url, on_request=None): - """Synchronously browses a page and runs behaviors. First blocks to - acquire lock to ensure we only browse one page at a time.""" - with self._lock: - self.url = url - self.on_request = on_request - with tempfile.TemporaryDirectory() as user_home_dir: - with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_home_dir) as websocket_url: - self.websock = websocket.WebSocketApp(websocket_url, - on_open=self._visit_page, - on_message=self._handle_message) + """Synchronously browses a page and runs behaviors. - import random - threadName = "WebsockThread{}-{}".format(self.chrome_port, - ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) - websock_thread = threading.Thread(target=self.websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) - websock_thread.start() - start = time.time() + Raises BrowsingException if browsing the page fails in a non-critical + way. + """ + self.url = url + self.on_request = on_request - while True: - time.sleep(0.5) - if not self.websock or not self.websock.sock or not self.websock.sock.connected: - self.logger.error("websocket closed, did chrome die??? {}".format(self.websock)) - break - elif time.time() - start > Browser.HARD_TIMEOUT_SECONDS: - self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url)) - break - elif self._behavior != None and self._behavior.is_finished(): - self.logger.info("finished browsing page according to behavior url={}".format(self.url)) - break - elif self._shutdown_now: - self.logger.warn("immediate shutdown requested") - break + self._websock = websocket.WebSocketApp(self._websocket_url, + on_open=self._visit_page, on_message=self._handle_message) - try: - self.websock.close() - except BaseException as e: - self.logger.error("exception closing websocket {} - {}".format(self.websock, e)) + import random + threadName = "WebsockThread{}-{}".format(self.chrome_port, + ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) + websock_thread = threading.Thread(target=self._websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) + websock_thread.start() + self._start = time.time() + aborted = False - websock_thread.join() - self._behavior = None + try: + while True: + time.sleep(0.5) + if not self._websock or not self._websock.sock or not self._websock.sock.connected: + raise BrowsingException("websocket closed, did chrome die? {}".format(self._websocket_url)) + elif time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS: + self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(Browser.HARD_TIMEOUT_SECONDS, self.url)) + return + elif self._behavior != None and self._behavior.is_finished(): + self.logger.info("finished browsing page according to behavior url={}".format(self.url)) + return + elif self._abort_browse_page: + raise BrowsingException("browsing page aborted") + finally: + if self._websock and self._websock.sock and self._websock.sock.connected: + try: + self._websock.close() + except BaseException as e: + self.logger.error("exception closing websocket {} - {}".format(self._websock, e)) + + websock_thread.join(timeout=30) + if websock_thread.is_alive(): + self.logger.error("{} still alive 30 seconds after closing {}, will forcefully nudge it again".format(websock_thread, self._websock)) + self._websock.keep_running = False + websock_thread.join(timeout=30) + if websock_thread.is_alive(): + self.logger.critical("{} still alive 60 seconds after closing {}".format(websock_thread, self._websock)) + + self._behavior = None def send_to_chrome(self, **kwargs): msg_id = next(self.command_id) kwargs['id'] = msg_id msg = json.dumps(kwargs) - self.logger.debug('sending message to {}: {}'.format(self.websock, msg)) - self.websock.send(msg) + self.logger.debug('sending message to {}: {}'.format(self._websock, msg)) + self._websock.send(msg) return msg_id def _visit_page(self, websock): @@ -188,17 +215,27 @@ class Browser: class Chrome: logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, port, executable, browser_wait, user_home_dir): + def __init__(self, port, executable, user_home_dir, user_data_dir): self.port = port self.executable = executable - self.browser_wait = browser_wait self.user_home_dir = user_home_dir + self.user_data_dir = user_data_dir # returns websocket url to chrome window with about:blank loaded def __enter__(self): + return self.start() + + def __exit__(self, *args): + self.stop() + + # returns websocket url to chrome window with about:blank loaded + def start(self): + timeout_sec = 60 new_env = os.environ.copy() new_env["HOME"] = self.user_home_dir chrome_args = [self.executable, + "--use-mock-keychain", # mac thing + "--user-data-dir={}".format(self.user_data_dir), "--remote-debugging-port={}".format(self.port), "--disable-web-sockets", "--disable-cache", "--window-size=1100,900", "--no-default-browser-check", @@ -208,7 +245,7 @@ class Chrome: self.logger.info("running {}".format(chrome_args)) self.chrome_process = subprocess.Popen(chrome_args, env=new_env, start_new_session=True) self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) - start = time.time() + self._start = time.time() # member variable just so that kill -QUIT reports it json_url = "http://localhost:%s/json" % self.port @@ -226,12 +263,12 @@ class Chrome: except: pass finally: - if time.time() - start > float(self.browser_wait): - raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - start)) + if time.time() - self._start > timeout_sec: + raise Exception("failed to retrieve {} after {} seconds".format(json_url, time.time() - self._start)) else: time.sleep(0.5) - def __exit__(self, *args): + def stop(self): timeout_sec = 60 self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid)) diff --git a/umbra/controller.py b/umbra/controller.py index 20316a6..f22fc69 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -5,7 +5,8 @@ import logging import time import threading import kombu -from umbra.browser import BrowserPool +import socket +from umbra.browser import BrowserPool, BrowsingException class AmqpBrowserController: """ @@ -44,41 +45,91 @@ class AmqpBrowserController: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, amqp_url='amqp://guest:guest@localhost:5672/%2f', - chrome_exe='chromium-browser', browser_wait=60, - max_active_browsers=1, queue_name='urls', routing_key='url', - exchange_name='umbra'): + chrome_exe='chromium-browser', max_active_browsers=1, + queue_name='urls', exchange_name='umbra'): self.amqp_url = amqp_url self.queue_name = queue_name - self.routing_key = routing_key self.exchange_name = exchange_name + self.max_active_browsers = max_active_browsers - self._browser_pool = BrowserPool(size=max_active_browsers, - chrome_exe=chrome_exe, chrome_wait=browser_wait) + self._browser_pool = BrowserPool(size=max_active_browsers, chrome_exe=chrome_exe) def start(self): + self._browsing_threads = set() + self._browsing_threads_lock = threading.Lock() + self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', durable=True) + self._reconnect_requested = False + self._producer = None self._producer_lock = threading.Lock() with self._producer_lock: self._producer_conn = kombu.Connection(self.amqp_url) self._producer = self._producer_conn.Producer(serializer='json') - self._amqp_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') - self._amqp_stop = threading.Event() - self._amqp_thread.start() + self._consumer_thread = threading.Thread(target=self._consume_amqp, name='AmqpConsumerThread') + self._consumer_stop = threading.Event() + self._consumer_thread.start() def shutdown(self): self.logger.info("shutting down amqp consumer {}".format(self.amqp_url)) - self._amqp_stop.set() - self._amqp_thread.join() - # with self._producer_lock: - # self._producer_conn.close() - # self._producer_conn = None + self._consumer_stop.set() + self._consumer_thread.join() def shutdown_now(self): + self._consumer_stop.set() self._browser_pool.shutdown_now() + self._consumer_thread.join() + + def reconnect(self, *args, **kwargs): + self._reconnect_requested = True + self._browser_pool.shutdown_now() + + def _wait_for_and_browse_urls(self, conn, consumer, timeout): + start = time.time() + browser = None + consumer.qos(prefetch_count=self.max_active_browsers) + + while not self._consumer_stop.is_set() and time.time() - start < timeout and not self._reconnect_requested: + try: + browser = self._browser_pool.acquire() # raises KeyError if none available + browser.start() + + def callback(body, message): + self._start_browsing_page(browser, message, body['clientId'], body['url'], body['metadata']) + consumer.callbacks = [callback] + + while True: + try: + conn.drain_events(timeout=0.5) + break # out of "while True" to acquire another browser + except socket.timeout: + pass + + if self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested: + browser.stop() + self._browser_pool.release(browser) + break + + except KeyError: + # no browsers available + time.sleep(0.5) + except: + self.logger.critical("problem with browser initialization", exc_info=True) + time.sleep(0.5) + finally: + consumer.callbacks = None + + def _wait_for_active_browsers(self): + self.logger.info("waiting for browsing threads to finish") + while True: + with self._browsing_threads_lock: + if len(self._browsing_threads) == 0: + break + time.sleep(0.5) + self.logger.info("active browsing threads finished") def _consume_amqp(self): # XXX https://webarchive.jira.com/browse/ARI-3811 @@ -86,49 +137,29 @@ class AmqpBrowserController: # consumer looks normal but doesn't consume any messages. Not clear if # it's hanging in drain_events() or not. As a temporary measure for # mitigation (if it works) or debugging (if it doesn't work), close and - # reopen the connection every 15 minutes - RECONNECT_AFTER_SECONDS = 15 * 60 + # reopen the connection every 2.5 hours + RECONNECT_AFTER_SECONDS = 150 * 60 - url_queue = kombu.Queue(self.queue_name, routing_key=self.routing_key, - exchange=self._exchange) + url_queue = kombu.Queue(self.queue_name, exchange=self._exchange) - while not self._amqp_stop.is_set(): + while not self._consumer_stop.is_set(): try: self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) + self._reconnect_requested = False with kombu.Connection(self.amqp_url) as conn: - conn_opened = time.time() with conn.Consumer(url_queue) as consumer: - consumer.qos(prefetch_count=1) - while (not self._amqp_stop.is_set() and time.time() - conn_opened < RECONNECT_AFTER_SECONDS): - import socket - try: - browser = self._browser_pool.acquire() # raises KeyError if none available - consumer.callbacks = [self._make_callback(browser)] - conn.drain_events(timeout=0.5) - consumer.callbacks = None - - # browser startup is a heavy operation, so do - # it once every 5 seconds at most - time.sleep(5) - except KeyError: - # thrown by self._browser_pool.acquire() - no browsers available - time.sleep(0.5) - except socket.timeout: - # thrown by conn.drain_events(timeout=0.5) - no urls in the queue - self._browser_pool.release(browser) + self._wait_for_and_browse_urls(conn, consumer, timeout=RECONNECT_AFTER_SECONDS) + # need to wait for browsers to finish here, before closing + # the amqp connection, because they use it to do + # message.ack() after they finish browsing a page + self._wait_for_active_browsers() except BaseException as e: self.logger.error("caught exception {}".format(e), exc_info=True) time.sleep(0.5) self.logger.error("attempting to reopen amqp connection") - def _make_callback(self, browser): - def callback(body, message): - self._browse_page(browser, body['clientId'], body['url'], body['metadata']) - message.ack() - return callback - - def _browse_page(self, browser, client_id, url, parent_url_metadata): + def _start_browsing_page(self, browser, message, client_id, url, parent_url_metadata): def on_request(chrome_msg): payload = chrome_msg['params']['request'] payload['parentUrl'] = url @@ -138,16 +169,31 @@ class AmqpBrowserController: publish = self._producer_conn.ensure(self._producer, self._producer.publish) publish(payload, exchange=self._exchange, routing_key=client_id) - def browse_page_async(): + def browse_page_sync(): self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) try: browser.browse_page(url, on_request=on_request) - self._browser_pool.release(browser) + message.ack() + except BrowsingException as e: + self.logger.warn("browsing did not complete normally, requeuing url {} - {}".format(url, e)) + message.requeue() except: - self.logger.critical("problem browsing page, may have lost browser process", exc_info=True) + self.logger.critical("problem browsing page, requeuing url {}, may have lost browser process".format(url), exc_info=True) + message.requeue() + finally: + browser.stop() + self._browser_pool.release(browser) + + def browse_thread_run_then_cleanup(): + browse_page_sync() + with self._browsing_threads_lock: + self._browsing_threads.remove(threading.current_thread()) import random - threadName = "BrowsingThread{}-{}".format(browser.chrome_port, + thread_name = "BrowsingThread{}-{}".format(browser.chrome_port, ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) - threading.Thread(target=browse_page_async, name=threadName).start() + th = threading.Thread(target=browse_thread_run_then_cleanup, name=thread_name) + with self._browsing_threads_lock: + self._browsing_threads.add(th) + th.start()