diff --git a/bin/browse-url b/bin/browse-url index 9fd6311..bbbc925 100755 --- a/bin/browse-url +++ b/bin/browse-url @@ -5,24 +5,26 @@ import argparse import os import sys import logging -from umbra.browser import Browser +import umbra -arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), +arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='browse-url - open urls in chrome/chromium and run behaviors', formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', help='seconds to wait for browser initialization') arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='executable to use to invoke chrome') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) -arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') +arg_parser.add_argument('--version', action='version', + version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -browser = Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) +browser = umbra.Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait) for url in args.urls: browser.browse_page(url) diff --git a/bin/drain-queue b/bin/drain-queue index f198d15..2798c8c 100755 --- a/bin/drain-queue +++ b/bin/drain-queue @@ -5,9 +5,10 @@ import sys import argparse import logging import socket +import umbra from kombu import Connection, Exchange, Queue -arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), +arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='drain-queue - consume messages from AMQP queue', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', @@ -22,6 +23,8 @@ arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)") arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument('--version', action='version', + version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stderr, level=args.log_level, diff --git a/bin/queue-url b/bin/queue-url index 272c2e7..a56d806 100755 --- a/bin/queue-url +++ b/bin/queue-url @@ -5,9 +5,10 @@ import os import sys import argparse import logging +import umbra from kombu import Connection, Exchange, Queue -arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), +arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='queue-url - send url to umbra via AMQP', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', @@ -20,6 +21,8 @@ arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument('--version', action='version', + version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') args = arg_parser.parse_args(args=sys.argv[1:]) diff --git a/bin/umbra b/bin/umbra index 167f200..891e096 100755 --- a/bin/umbra +++ b/bin/umbra @@ -16,7 +16,7 @@ if __name__=="__main__": import faulthandler faulthandler.register(signal.SIGQUIT) - arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='umbra - browser automation tool communicating via AMQP', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', @@ -35,12 +35,16 @@ if __name__=="__main__": help='Max number of chrome instances simultaneously browsing pages') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) + arg_parser.add_argument('--version', action='version', + version="umbra {}".format(umbra.version)) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - umbra = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait, + logging.info("umbra {} starting up".format(umbra.version)) + + controller = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait, max_active_browsers=int(args.max_browsers), exchange_name=args.amqp_exchange, queue_name=args.amqp_queue, routing_key=args.amqp_routing_key) @@ -56,7 +60,7 @@ if __name__=="__main__": signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) - umbra.start() + controller.start() try: while True: @@ -67,12 +71,16 @@ if __name__=="__main__": logging.fatal(e) finally: try: - umbra.shutdown() + controller.shutdown() for th in threading.enumerate(): if th != threading.current_thread(): th.join() except BaseException as e: logging.warn("caught {}".format(e)) - umbra.shutdown_now() + controller.shutdown_now() + for th in threading.enumerate(): + if th != threading.current_thread(): + th.join() + logging.info("all finished, exiting") diff --git a/setup.py b/setup.py index ae5c4ab..aec3953 100644 --- a/setup.py +++ b/setup.py @@ -3,8 +3,28 @@ import setuptools import glob +VERSION_BYTES = b'0.2' + +def full_version_bytes(): + import subprocess, time + try: + commit_bytes = subprocess.check_output(['git', 'log', '-1', '--pretty=format:%h']) + + t_bytes = subprocess.check_output(['git', 'log', '-1', '--pretty=format:%ct']) + t = int(t_bytes.strip().decode('utf-8')) + tm = time.gmtime(t) + timestamp_utc = time.strftime("%Y%m%d%H%M%S", time.gmtime(t)) + return VERSION_BYTES + b'-' + timestamp_utc.encode('utf-8') + b'-' + commit_bytes.strip() + except subprocess.CalledProcessError: + return VERSION_BYTES + +version_bytes = full_version_bytes() +with open('umbra/version.txt', 'wb') as out: + out.write(version_bytes) + out.write(b'\n'); + setuptools.setup(name='umbra', - version='0.1', + version=version_bytes.decode('utf-8'), description='Browser automation via chrome debug protocol', url='https://github.com/internetarchive/umbra', author='Eldon Stegall', @@ -12,9 +32,9 @@ setuptools.setup(name='umbra', long_description=open('README.md').read(), license='Apache License 2.0', packages=['umbra'], - package_data={'umbra':['behaviors.d/*.js']}, + package_data={'umbra':['behaviors.d/*.js', 'version.txt']}, install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'], - scripts=glob.glob("bin/*"), + scripts=glob.glob('bin/*'), zip_safe=False, classifiers=[ 'Development Status :: 3 - Alpha Development Status', diff --git a/umbra/__init__.py b/umbra/__init__.py index b8964b2..7372e40 100644 --- a/umbra/__init__.py +++ b/umbra/__init__.py @@ -1,3 +1,14 @@ from umbra.browser import Browser from umbra.controller import AmqpBrowserController Umbra = AmqpBrowserController + +def _read_version(): + import os + version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt']) + with open(version_txt, 'rb') as fin: + version_bytes = fin.read() + return version_bytes.strip().decode('utf-8') + +version = _read_version() + +# vim: set sw=4 et: diff --git a/umbra/behaviors.d/facebook.js b/umbra/behaviors.d/facebook.js index 10adbef..475e3c4 100644 --- a/umbra/behaviors.d/facebook.js +++ b/umbra/behaviors.d/facebook.js @@ -73,7 +73,7 @@ var umbraIntervalFunc = function() { if (!clickedSomething) { if (somethingLeftAbove) { console.log("scrolling UP because everything on this screen has been clicked but we missed something above"); - window.scrollBy(0, -200); + window.scrollBy(0, -500); umbraState.idleSince = null; } else if (somethingLeftBelow) { console.log("scrolling because everything on this screen has been clicked but there's more below document.body.clientHeight=" + document.body.clientHeight); diff --git a/umbra/behaviors.py b/umbra/behaviors.py index c8acfbf..52bce1c 100644 --- a/umbra/behaviors.py +++ b/umbra/behaviors.py @@ -9,7 +9,7 @@ import time import sys class Behavior: - logger = logging.getLogger('umbra.behaviors.Behavior') + logger = logging.getLogger(__module__ + "." + __qualname__) _behaviors = None _default_behavior = None diff --git a/umbra/browser.py b/umbra/browser.py index 3ab63bb..f1bdce4 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -18,43 +18,30 @@ from umbra.behaviors import Behavior class BrowserPool: logger = logging.getLogger(__module__ + "." + __qualname__) + BASE_PORT = 9200 + def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60): self._available = set() self._in_use = set() for i in range(0, size): - port_holder = self._grab_random_port() - browser = Browser(port_holder.getsockname()[1], chrome_exe, chrome_wait) - self._available.add((browser, port_holder)) + browser = Browser(BrowserPool.BASE_PORT + i, chrome_exe, chrome_wait) + self._available.add(browser) self._lock = threading.Lock() - self.logger.info("browser ports: {}".format([browser.chrome_port for (browser, port_holder) in self._available])) - - def _grab_random_port(self): - """Returns socket bound to some port.""" - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - return sock - - def _hold_port(self, port): - """Returns socket bound to supplied port.""" - sock = socket.socket() - sock.bind(('127.0.0.1', port)) - return sock + self.logger.info("browser ports: {}".format([browser.chrome_port for browser in self._available])) def acquire(self): """Returns browser from pool if available, raises KeyError otherwise.""" with self._lock: - (browser, port_holder) = self._available.pop() - port_holder.close() + browser = self._available.pop() self._in_use.add(browser) return browser def release(self, browser): with self._lock: - port_holder = self._hold_port(browser.chrome_port) - self._available.add((browser, port_holder)) + self._available.add(browser) self._in_use.remove(browser) def shutdown_now(self): @@ -82,6 +69,9 @@ class Browser: self.websock = None self._shutdown_now = False + def __repr__(self): + return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port) + def shutdown_now(self): self._shutdown_now = True @@ -91,8 +81,8 @@ class Browser: with self._lock: self.url = url self.on_request = on_request - with tempfile.TemporaryDirectory() as user_data_dir: - with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url: + with tempfile.TemporaryDirectory() as user_home_dir: + with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_home_dir) as websocket_url: self.websock = websocket.WebSocketApp(websocket_url, on_open=self._visit_page, on_message=self._handle_message) @@ -165,7 +155,9 @@ class Browser: self._behavior = Behavior(self.url, self) self._behavior.start() else: - self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message)) + self.logger.warn("Page.loadEventFired again, perhaps original url had a meta refresh, or behaviors accidentally navigated to another page? starting behaviors again url={} message={}".format(self.url, message)) + self._behavior = Behavior(self.url, self) + self._behavior.start() elif "method" in message and message["method"] == "Console.messageAdded": self.logger.debug("{} console.{} {}".format(websock.url, message["params"]["message"]["level"], @@ -196,23 +188,24 @@ class Browser: class Chrome: logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, port, executable, browser_wait, user_data_dir): + def __init__(self, port, executable, browser_wait, user_home_dir): self.port = port self.executable = executable self.browser_wait = browser_wait - self.user_data_dir = user_data_dir + self.user_home_dir = user_home_dir # returns websocket url to chrome window with about:blank loaded def __enter__(self): + new_env = os.environ.copy() + new_env["HOME"] = self.user_home_dir chrome_args = [self.executable, - "--user-data-dir={}".format(self.user_data_dir), "--remote-debugging-port={}".format(self.port), "--disable-web-sockets", "--disable-cache", "--window-size=1100,900", "--no-default-browser-check", "--disable-first-run-ui", "--no-first-run", "--homepage=about:blank", "about:blank"] self.logger.info("running {}".format(chrome_args)) - self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True) + self.chrome_process = subprocess.Popen(chrome_args, env=new_env, start_new_session=True) self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) start = time.time() @@ -238,7 +231,29 @@ class Chrome: time.sleep(0.5) def __exit__(self, *args): - self.logger.info("killing chrome pid {}".format(self.chrome_process.pid)) - os.killpg(self.chrome_process.pid, signal.SIGINT) - self.chrome_process.wait() + timeout_sec = 60 + self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid)) + self.chrome_process.terminate() + first_sigterm = last_sigterm = time.time() + + while time.time() - first_sigterm < timeout_sec: + time.sleep(0.5) + + status = self.chrome_process.poll() + if status is not None: + if status == 0: + self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status)) + else: + self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status)) + return + + # sometimes a hung chrome process will terminate on repeated sigterms + if time.time() - last_sigterm > 10: + self.chrome_process.terminate() + last_sigterm = time.time() + + self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec)) + self.chrome_process.kill() + status = self.chrome_process.wait() + self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status)) diff --git a/umbra/controller.py b/umbra/controller.py index 198d8bb..20316a6 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -106,15 +106,19 @@ class AmqpBrowserController: consumer.callbacks = [self._make_callback(browser)] conn.drain_events(timeout=0.5) consumer.callbacks = None + + # browser startup is a heavy operation, so do + # it once every 5 seconds at most + time.sleep(5) except KeyError: - # no browsers available + # thrown by self._browser_pool.acquire() - no browsers available time.sleep(0.5) except socket.timeout: - # no urls in the queue + # thrown by conn.drain_events(timeout=0.5) - no urls in the queue self._browser_pool.release(browser) except BaseException as e: - self.logger.error("amqp exception {}".format(e)) + self.logger.error("caught exception {}".format(e), exc_info=True) time.sleep(0.5) self.logger.error("attempting to reopen amqp connection") @@ -138,8 +142,9 @@ class AmqpBrowserController: self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url)) try: browser.browse_page(url, on_request=on_request) - finally: self._browser_pool.release(browser) + except: + self.logger.critical("problem browsing page, may have lost browser process", exc_info=True) import random threadName = "BrowsingThread{}-{}".format(browser.chrome_port,