mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 00:29:53 -05:00
Merge pull request #24 from nlevitt/dev
more improvements, mostly for robustness
This commit is contained in:
commit
e8456e0a62
@ -5,24 +5,26 @@ import argparse
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from umbra.browser import Browser
|
||||
import umbra
|
||||
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
|
||||
description='browse-url - open urls in chrome/chromium and run behaviors',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse')
|
||||
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
|
||||
help='seconds to wait for browser initialization')
|
||||
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
|
||||
help='executable to use to invoke chrome')
|
||||
arg_parser.add_argument('-v', '--verbose', dest='log_level',
|
||||
action="store_const", default=logging.INFO, const=logging.DEBUG)
|
||||
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse')
|
||||
arg_parser.add_argument('--version', action='version',
|
||||
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
|
||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=args.log_level,
|
||||
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
||||
|
||||
browser = Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait)
|
||||
browser = umbra.Browser(chrome_exe=args.chrome_exe, chrome_wait=args.browser_wait)
|
||||
for url in args.urls:
|
||||
browser.browse_page(url)
|
||||
|
||||
|
@ -5,9 +5,10 @@ import sys
|
||||
import argparse
|
||||
import logging
|
||||
import socket
|
||||
import umbra
|
||||
from kombu import Connection, Exchange, Queue
|
||||
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
|
||||
description='drain-queue - consume messages from AMQP queue',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
|
||||
@ -22,6 +23,8 @@ arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store
|
||||
default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)")
|
||||
arg_parser.add_argument('-v', '--verbose', dest='log_level',
|
||||
action="store_const", default=logging.INFO, const=logging.DEBUG)
|
||||
arg_parser.add_argument('--version', action='version',
|
||||
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
|
||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||
|
||||
logging.basicConfig(stream=sys.stderr, level=args.log_level,
|
||||
|
@ -5,9 +5,10 @@ import os
|
||||
import sys
|
||||
import argparse
|
||||
import logging
|
||||
import umbra
|
||||
from kombu import Connection, Exchange, Queue
|
||||
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
|
||||
description='queue-url - send url to umbra via AMQP',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
|
||||
@ -20,6 +21,8 @@ arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url
|
||||
help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to')
|
||||
arg_parser.add_argument('-v', '--verbose', dest='log_level',
|
||||
action="store_const", default=logging.INFO, const=logging.DEBUG)
|
||||
arg_parser.add_argument('--version', action='version',
|
||||
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
|
||||
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra')
|
||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||
|
||||
|
18
bin/umbra
18
bin/umbra
@ -16,7 +16,7 @@ if __name__=="__main__":
|
||||
import faulthandler
|
||||
faulthandler.register(signal.SIGQUIT)
|
||||
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
|
||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
|
||||
description='umbra - browser automation tool communicating via AMQP',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
|
||||
@ -35,12 +35,16 @@ if __name__=="__main__":
|
||||
help='Max number of chrome instances simultaneously browsing pages')
|
||||
arg_parser.add_argument('-v', '--verbose', dest='log_level',
|
||||
action="store_const", default=logging.INFO, const=logging.DEBUG)
|
||||
arg_parser.add_argument('--version', action='version',
|
||||
version="umbra {}".format(umbra.version))
|
||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=args.log_level,
|
||||
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
||||
|
||||
umbra = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait,
|
||||
logging.info("umbra {} starting up".format(umbra.version))
|
||||
|
||||
controller = umbra.Umbra(args.amqp_url, args.chrome_exe, args.browser_wait,
|
||||
max_active_browsers=int(args.max_browsers),
|
||||
exchange_name=args.amqp_exchange, queue_name=args.amqp_queue,
|
||||
routing_key=args.amqp_routing_key)
|
||||
@ -56,7 +60,7 @@ if __name__=="__main__":
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
signal.signal(signal.SIGINT, sigint)
|
||||
|
||||
umbra.start()
|
||||
controller.start()
|
||||
|
||||
try:
|
||||
while True:
|
||||
@ -67,12 +71,16 @@ if __name__=="__main__":
|
||||
logging.fatal(e)
|
||||
finally:
|
||||
try:
|
||||
umbra.shutdown()
|
||||
controller.shutdown()
|
||||
for th in threading.enumerate():
|
||||
if th != threading.current_thread():
|
||||
th.join()
|
||||
except BaseException as e:
|
||||
logging.warn("caught {}".format(e))
|
||||
umbra.shutdown_now()
|
||||
controller.shutdown_now()
|
||||
for th in threading.enumerate():
|
||||
if th != threading.current_thread():
|
||||
th.join()
|
||||
|
||||
logging.info("all finished, exiting")
|
||||
|
||||
|
26
setup.py
26
setup.py
@ -3,8 +3,28 @@
|
||||
import setuptools
|
||||
import glob
|
||||
|
||||
VERSION_BYTES = b'0.2'
|
||||
|
||||
def full_version_bytes():
|
||||
import subprocess, time
|
||||
try:
|
||||
commit_bytes = subprocess.check_output(['git', 'log', '-1', '--pretty=format:%h'])
|
||||
|
||||
t_bytes = subprocess.check_output(['git', 'log', '-1', '--pretty=format:%ct'])
|
||||
t = int(t_bytes.strip().decode('utf-8'))
|
||||
tm = time.gmtime(t)
|
||||
timestamp_utc = time.strftime("%Y%m%d%H%M%S", time.gmtime(t))
|
||||
return VERSION_BYTES + b'-' + timestamp_utc.encode('utf-8') + b'-' + commit_bytes.strip()
|
||||
except subprocess.CalledProcessError:
|
||||
return VERSION_BYTES
|
||||
|
||||
version_bytes = full_version_bytes()
|
||||
with open('umbra/version.txt', 'wb') as out:
|
||||
out.write(version_bytes)
|
||||
out.write(b'\n');
|
||||
|
||||
setuptools.setup(name='umbra',
|
||||
version='0.1',
|
||||
version=version_bytes.decode('utf-8'),
|
||||
description='Browser automation via chrome debug protocol',
|
||||
url='https://github.com/internetarchive/umbra',
|
||||
author='Eldon Stegall',
|
||||
@ -12,9 +32,9 @@ setuptools.setup(name='umbra',
|
||||
long_description=open('README.md').read(),
|
||||
license='Apache License 2.0',
|
||||
packages=['umbra'],
|
||||
package_data={'umbra':['behaviors.d/*.js']},
|
||||
package_data={'umbra':['behaviors.d/*.js', 'version.txt']},
|
||||
install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'],
|
||||
scripts=glob.glob("bin/*"),
|
||||
scripts=glob.glob('bin/*'),
|
||||
zip_safe=False,
|
||||
classifiers=[
|
||||
'Development Status :: 3 - Alpha Development Status',
|
||||
|
@ -1,3 +1,14 @@
|
||||
from umbra.browser import Browser
|
||||
from umbra.controller import AmqpBrowserController
|
||||
Umbra = AmqpBrowserController
|
||||
|
||||
def _read_version():
|
||||
import os
|
||||
version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt'])
|
||||
with open(version_txt, 'rb') as fin:
|
||||
version_bytes = fin.read()
|
||||
return version_bytes.strip().decode('utf-8')
|
||||
|
||||
version = _read_version()
|
||||
|
||||
# vim: set sw=4 et:
|
||||
|
@ -73,7 +73,7 @@ var umbraIntervalFunc = function() {
|
||||
if (!clickedSomething) {
|
||||
if (somethingLeftAbove) {
|
||||
console.log("scrolling UP because everything on this screen has been clicked but we missed something above");
|
||||
window.scrollBy(0, -200);
|
||||
window.scrollBy(0, -500);
|
||||
umbraState.idleSince = null;
|
||||
} else if (somethingLeftBelow) {
|
||||
console.log("scrolling because everything on this screen has been clicked but there's more below document.body.clientHeight=" + document.body.clientHeight);
|
||||
|
@ -9,7 +9,7 @@ import time
|
||||
import sys
|
||||
|
||||
class Behavior:
|
||||
logger = logging.getLogger('umbra.behaviors.Behavior')
|
||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||
|
||||
_behaviors = None
|
||||
_default_behavior = None
|
||||
|
@ -18,43 +18,30 @@ from umbra.behaviors import Behavior
|
||||
class BrowserPool:
|
||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||
|
||||
BASE_PORT = 9200
|
||||
|
||||
def __init__(self, size=3, chrome_exe='chromium-browser', chrome_wait=60):
|
||||
self._available = set()
|
||||
self._in_use = set()
|
||||
|
||||
for i in range(0, size):
|
||||
port_holder = self._grab_random_port()
|
||||
browser = Browser(port_holder.getsockname()[1], chrome_exe, chrome_wait)
|
||||
self._available.add((browser, port_holder))
|
||||
browser = Browser(BrowserPool.BASE_PORT + i, chrome_exe, chrome_wait)
|
||||
self._available.add(browser)
|
||||
|
||||
self._lock = threading.Lock()
|
||||
|
||||
self.logger.info("browser ports: {}".format([browser.chrome_port for (browser, port_holder) in self._available]))
|
||||
|
||||
def _grab_random_port(self):
|
||||
"""Returns socket bound to some port."""
|
||||
sock = socket.socket()
|
||||
sock.bind(('127.0.0.1', 0))
|
||||
return sock
|
||||
|
||||
def _hold_port(self, port):
|
||||
"""Returns socket bound to supplied port."""
|
||||
sock = socket.socket()
|
||||
sock.bind(('127.0.0.1', port))
|
||||
return sock
|
||||
self.logger.info("browser ports: {}".format([browser.chrome_port for browser in self._available]))
|
||||
|
||||
def acquire(self):
|
||||
"""Returns browser from pool if available, raises KeyError otherwise."""
|
||||
with self._lock:
|
||||
(browser, port_holder) = self._available.pop()
|
||||
port_holder.close()
|
||||
browser = self._available.pop()
|
||||
self._in_use.add(browser)
|
||||
return browser
|
||||
|
||||
def release(self, browser):
|
||||
with self._lock:
|
||||
port_holder = self._hold_port(browser.chrome_port)
|
||||
self._available.add((browser, port_holder))
|
||||
self._available.add(browser)
|
||||
self._in_use.remove(browser)
|
||||
|
||||
def shutdown_now(self):
|
||||
@ -82,6 +69,9 @@ class Browser:
|
||||
self.websock = None
|
||||
self._shutdown_now = False
|
||||
|
||||
def __repr__(self):
|
||||
return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port)
|
||||
|
||||
def shutdown_now(self):
|
||||
self._shutdown_now = True
|
||||
|
||||
@ -91,8 +81,8 @@ class Browser:
|
||||
with self._lock:
|
||||
self.url = url
|
||||
self.on_request = on_request
|
||||
with tempfile.TemporaryDirectory() as user_data_dir:
|
||||
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url:
|
||||
with tempfile.TemporaryDirectory() as user_home_dir:
|
||||
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_home_dir) as websocket_url:
|
||||
self.websock = websocket.WebSocketApp(websocket_url,
|
||||
on_open=self._visit_page,
|
||||
on_message=self._handle_message)
|
||||
@ -165,7 +155,9 @@ class Browser:
|
||||
self._behavior = Behavior(self.url, self)
|
||||
self._behavior.start()
|
||||
else:
|
||||
self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message))
|
||||
self.logger.warn("Page.loadEventFired again, perhaps original url had a meta refresh, or behaviors accidentally navigated to another page? starting behaviors again url={} message={}".format(self.url, message))
|
||||
self._behavior = Behavior(self.url, self)
|
||||
self._behavior.start()
|
||||
elif "method" in message and message["method"] == "Console.messageAdded":
|
||||
self.logger.debug("{} console.{} {}".format(websock.url,
|
||||
message["params"]["message"]["level"],
|
||||
@ -196,23 +188,24 @@ class Browser:
|
||||
class Chrome:
|
||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||
|
||||
def __init__(self, port, executable, browser_wait, user_data_dir):
|
||||
def __init__(self, port, executable, browser_wait, user_home_dir):
|
||||
self.port = port
|
||||
self.executable = executable
|
||||
self.browser_wait = browser_wait
|
||||
self.user_data_dir = user_data_dir
|
||||
self.user_home_dir = user_home_dir
|
||||
|
||||
# returns websocket url to chrome window with about:blank loaded
|
||||
def __enter__(self):
|
||||
new_env = os.environ.copy()
|
||||
new_env["HOME"] = self.user_home_dir
|
||||
chrome_args = [self.executable,
|
||||
"--user-data-dir={}".format(self.user_data_dir),
|
||||
"--remote-debugging-port={}".format(self.port),
|
||||
"--disable-web-sockets", "--disable-cache",
|
||||
"--window-size=1100,900", "--no-default-browser-check",
|
||||
"--disable-first-run-ui", "--no-first-run",
|
||||
"--homepage=about:blank", "about:blank"]
|
||||
self.logger.info("running {}".format(chrome_args))
|
||||
self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True)
|
||||
self.chrome_process = subprocess.Popen(chrome_args, env=new_env, start_new_session=True)
|
||||
self.logger.info("chrome running, pid {}".format(self.chrome_process.pid))
|
||||
start = time.time()
|
||||
|
||||
@ -238,7 +231,29 @@ class Chrome:
|
||||
time.sleep(0.5)
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.logger.info("killing chrome pid {}".format(self.chrome_process.pid))
|
||||
os.killpg(self.chrome_process.pid, signal.SIGINT)
|
||||
self.chrome_process.wait()
|
||||
timeout_sec = 60
|
||||
self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid))
|
||||
|
||||
self.chrome_process.terminate()
|
||||
first_sigterm = last_sigterm = time.time()
|
||||
|
||||
while time.time() - first_sigterm < timeout_sec:
|
||||
time.sleep(0.5)
|
||||
|
||||
status = self.chrome_process.poll()
|
||||
if status is not None:
|
||||
if status == 0:
|
||||
self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status))
|
||||
else:
|
||||
self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status))
|
||||
return
|
||||
|
||||
# sometimes a hung chrome process will terminate on repeated sigterms
|
||||
if time.time() - last_sigterm > 10:
|
||||
self.chrome_process.terminate()
|
||||
last_sigterm = time.time()
|
||||
|
||||
self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec))
|
||||
self.chrome_process.kill()
|
||||
status = self.chrome_process.wait()
|
||||
self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status))
|
||||
|
@ -106,15 +106,19 @@ class AmqpBrowserController:
|
||||
consumer.callbacks = [self._make_callback(browser)]
|
||||
conn.drain_events(timeout=0.5)
|
||||
consumer.callbacks = None
|
||||
|
||||
# browser startup is a heavy operation, so do
|
||||
# it once every 5 seconds at most
|
||||
time.sleep(5)
|
||||
except KeyError:
|
||||
# no browsers available
|
||||
# thrown by self._browser_pool.acquire() - no browsers available
|
||||
time.sleep(0.5)
|
||||
except socket.timeout:
|
||||
# no urls in the queue
|
||||
# thrown by conn.drain_events(timeout=0.5) - no urls in the queue
|
||||
self._browser_pool.release(browser)
|
||||
|
||||
except BaseException as e:
|
||||
self.logger.error("amqp exception {}".format(e))
|
||||
self.logger.error("caught exception {}".format(e), exc_info=True)
|
||||
time.sleep(0.5)
|
||||
self.logger.error("attempting to reopen amqp connection")
|
||||
|
||||
@ -138,8 +142,9 @@ class AmqpBrowserController:
|
||||
self.logger.info('browser={} client_id={} url={}'.format(browser, client_id, url))
|
||||
try:
|
||||
browser.browse_page(url, on_request=on_request)
|
||||
finally:
|
||||
self._browser_pool.release(browser)
|
||||
except:
|
||||
self.logger.critical("problem browsing page, may have lost browser process", exc_info=True)
|
||||
|
||||
import random
|
||||
threadName = "BrowsingThread{}-{}".format(browser.chrome_port,
|
||||
|
Loading…
x
Reference in New Issue
Block a user