Merge branch 'master' into qa

* master:
  fix attempt for deadlock-ish situation
  fix unclosed file warnings when running python in debug mode
  give vagrant vm a name in virtualbox
  add note to readme about browser version
  check browser version at startup
  Reinstate logging
  Fix typo and block legacy google-analytics.com/ga.js
  Use Network.setBlockedUrls instead of Debugger to block URLs
  bump dev version after PR merge
  back to dev version number
  commit for beta release
  this should fix travis build?
  fix tests
  update brozzler-easy for current warcprox api
  claim sites to brozzle in batches to reduce contention over sites table
  lengthen site session brozzling time to 15 minutes
  fix needs_browsing check
  new test test_needs_browsing
  increase timeout waiting for screenshot
  Use TCP_NODELAY in websocket connection to improve performance
This commit is contained in:
Noah Levitt 2018-02-13 17:10:10 -08:00
commit df0717d072
15 changed files with 236 additions and 114 deletions

View File

@ -9,7 +9,7 @@ before_install:
- sudo pip install ansible==2.1.3.0
install:
- ansible-playbook --extra-vars="brozzler_pip_name=file://$TRAVIS_BUILD_DIR#egg=brozzler user=travis" --inventory-file=ansible/hosts-localhost ansible/playbook.yml
- pip install $TRAVIS_BUILD_DIR 'warcprox==2.3' pytest
- pip install $TRAVIS_BUILD_DIR 'warcprox>=2.4b1' pytest
script:
- DISPLAY=:1 py.test -v tests
after_failure:

View File

@ -22,7 +22,7 @@ Requirements
- Python 3.4 or later
- RethinkDB deployment
- Chromium or Google Chrome browser
- Chromium or Google Chrome >= version 64
Worth noting is that the browser requires a graphical environment to run. You
already have this on your laptop, but on a server it will probably require

View File

@ -5,6 +5,7 @@
become: true
- name: install pywb in virtualenv
pip: name=pywb
version=0.33.2
virtualenv={{venv_root}}/pywb-ve34
virtualenv_python=python3.4
extra_args='--no-input --upgrade --pre --cache-dir=/tmp/pip-cache'

View File

@ -67,6 +67,21 @@ logging.Logger.trace = _logger_trace
logging._levelToName[TRACE] = 'TRACE'
logging._nameToLevel['TRACE'] = TRACE
# see https://github.com/internetarchive/brozzler/issues/91
def _logging_handler_handle(self, record):
rv = self.filter(record)
if rv:
try:
self.acquire()
self.emit(record)
finally:
try:
self.release()
except:
pass
return rv
logging.Handler.handle = _logging_handler_handle
_behaviors = None
def behaviors(behaviors_dir=None):
"""Return list of JS behaviors loaded from YAML file.

View File

@ -61,6 +61,33 @@ class BrowserPool:
self._in_use = set()
self._lock = threading.Lock()
def _fresh_browser(self):
# choose available port
sock = socket.socket()
sock.bind(('0.0.0.0', 0))
port = sock.getsockname()[1]
sock.close()
browser = Browser(port=port, **self.kwargs)
return browser
def acquire_multi(self, n=1):
'''
Returns a list of up to `n` browsers.
Raises:
NoBrowsersAvailable if none available
'''
browsers = []
with self._lock:
if len(self._in_use) >= self.size:
raise NoBrowsersAvailable
while len(self._in_use) < self.size and len(browsers) < n:
browser = self._fresh_browser()
browsers.append(browser)
self._in_use.add(browser)
return browsers
def acquire(self):
'''
Returns an available instance.
@ -74,14 +101,7 @@ class BrowserPool:
with self._lock:
if len(self._in_use) >= self.size:
raise NoBrowsersAvailable
# choose available port
sock = socket.socket()
sock.bind(('0.0.0.0', 0))
port = sock.getsockname()[1]
sock.close()
browser = Browser(port=port, **self.kwargs)
browser = self._fresh_browser()
self._in_use.add(browser)
return browser
@ -90,6 +110,13 @@ class BrowserPool:
with self._lock:
self._in_use.remove(browser)
def release_all(self, browsers):
for browser in browsers:
browser.stop() # make sure
with self._lock:
for browser in browsers:
self._in_use.remove(browser)
def shutdown_now(self):
self.logger.info(
'shutting down browser pool (%s browsers in use)',
@ -162,7 +189,8 @@ class WebsockReceiverThread(threading.Thread):
# ping_timeout is used as the timeout for the call to select.select()
# in addition to its documented purpose, and must have a value to avoid
# hangs in certain situations
self.websock.run_forever(ping_timeout=0.5)
self.websock.run_forever(sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),),
ping_timeout=0.5)
def _on_message(self, websock, message):
try:
@ -172,21 +200,6 @@ class WebsockReceiverThread(threading.Thread):
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
self.logger.debug('debugger paused! message=%s', message)
scriptId = message['params']['callFrames'][0]['location']['scriptId']
# replace script
self.websock.send(
json.dumps(dict(
id=0, method='Debugger.setScriptSource',
params={'scriptId': scriptId,
'scriptSource': 'console.log("google analytics is no more!");'})))
# resume execution
self.websock.send(json.dumps(dict(id=0, method='Debugger.resume')))
def _network_response_received(self, message):
if (message['params']['response']['status'] == 420
and 'Warcprox-Meta' in CaseInsensitiveDict(
@ -227,8 +240,6 @@ class WebsockReceiverThread(threading.Thread):
elif message['method'] == 'Network.requestWillBeSent':
if self.on_request:
self.on_request(message)
elif message['method'] == 'Debugger.paused':
self._debugger_paused(message)
elif message['method'] == 'Page.interstitialShown':
# for AITFIVE-1529: handle http auth
# for now, we should consider killing the browser when we receive Page.interstitialShown and
@ -330,16 +341,14 @@ class Browser:
self.send_to_chrome(method='Network.enable')
self.send_to_chrome(method='Page.enable')
self.send_to_chrome(method='Console.enable')
self.send_to_chrome(method='Debugger.enable')
self.send_to_chrome(method='Runtime.enable')
# disable google analytics, see _handle_message() where breakpoint
# is caught Debugger.paused
# disable google analytics
self.send_to_chrome(
method='Debugger.setBreakpointByUrl',
params={
'lineNumber': 1,
'urlRegex': 'https?://www.google-analytics.com/analytics.js'})
method='Network.setBlockedURLs',
params={'urls': ['*google-analytics.com/analytics.js',
'*google-analytics.com/ga.js']}
)
def stop(self):
'''
@ -549,7 +558,7 @@ class Browser:
'problem extracting outlinks, result message: %s', message)
return frozenset()
def screenshot(self, timeout=30):
def screenshot(self, timeout=90):
self.logger.info('taking screenshot')
self.websock_thread.expect_result(self._command_id.peek())
msg_id = self.send_to_chrome(method='Page.captureScreenshot')

View File

@ -29,6 +29,35 @@ import signal
import sqlite3
import json
import tempfile
import sys
def check_version(chrome_exe):
'''
Raises SystemExit if `chrome_exe` is not a supported browser version.
Must run in the main thread to have the desired effect.
'''
# mac$ /Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --version
# Google Chrome 64.0.3282.140
# mac$ /Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary --version
# Google Chrome 66.0.3341.0 canary
# linux$ chromium-browser --version
# Using PPAPI flash.
# --ppapi-flash-path=/usr/lib/adobe-flashplugin/libpepflashplayer.so --ppapi-flash-version=
# Chromium 61.0.3163.100 Built on Ubuntu , running on Ubuntu 16.04
cmd = [chrome_exe, '--version']
out = subprocess.check_output(cmd, timeout=60)
m = re.search(br'(Chromium|Google Chrome) ([\d.]+)', out)
if not m:
sys.exit(
'unable to parse browser version from output of '
'%r: %r' % (subprocess.list2cmdline(cmd), out))
version_str = m.group(2).decode()
major_version = int(version_str.split('.')[0])
if major_version < 64:
sys.exit('brozzler requires chrome/chromium version 64 or '
'later but %s reports version %s' % (
chrome_exe, version_str))
class Chrome:
logger = logging.getLogger(__module__ + '.' + __qualname__)
@ -279,6 +308,8 @@ class Chrome:
'SIGKILL', self.chrome_process.pid, status)
finally:
self.chrome_process.stdout.close()
self.chrome_process.stderr.close()
try:
self._home_tmpdir.cleanup()
except:

View File

@ -167,6 +167,7 @@ def brozzle_page(argv=None):
args = arg_parser.parse_args(args=argv[1:])
configure_logging(args)
brozzler.chrome.check_version(args.chrome_exe)
behavior_parameters = {}
if args.behavior_parameters:
@ -326,6 +327,7 @@ def brozzler_worker(argv=None):
args = arg_parser.parse_args(args=argv[1:])
configure_logging(args)
brozzler.chrome.check_version(args.chrome_exe)
def dump_state(signum, frame):
signal.signal(signal.SIGQUIT, signal.SIG_IGN)

View File

@ -3,7 +3,7 @@
brozzler-easy - brozzler-worker, warcprox, pywb, and brozzler-dashboard all
working together in a single process
Copyright (C) 2016 Internet Archive
Copyright (C) 2016-2018 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -122,8 +122,8 @@ class BrozzlerEasyController:
def __init__(self, args):
self.stop = threading.Event()
self.args = args
self.warcprox_controller = warcprox.main.init_controller(
self._warcprox_args(args))
self.warcprox_controller = warcprox.controller.WarcproxController(
self._warcprox_opts(args))
self.brozzler_worker = self._init_brozzler_worker(args)
self.pywb_httpd = self._init_pywb(args)
self.dashboard_httpd = self._init_brozzler_dashboard(args)
@ -221,40 +221,38 @@ class BrozzlerEasyController:
finally:
self.shutdown()
def _warcprox_args(self, args):
def _warcprox_opts(self, args):
'''
Takes args as produced by the argument parser built by
_build_arg_parser and builds warcprox arguments object suitable to pass
to warcprox.main.init_controller. Copies some arguments, renames some,
populates some with defaults appropriate for brozzler-easy, etc.
'''
warcprox_args = argparse.Namespace()
warcprox_args.address = 'localhost'
warcprox_opts = warcprox.Options()
warcprox_opts.address = 'localhost'
# let the OS choose an available port; discover it later using
# sock.getsockname()[1]
warcprox_args.port = 0
warcprox_args.cacert = args.cacert
warcprox_args.certs_dir = args.certs_dir
warcprox_args.directory = args.warcs_dir
warcprox_args.gzip = True
warcprox_args.prefix = 'brozzler'
warcprox_args.size = 1000 * 1000* 1000
warcprox_args.rollover_idle_time = 3 * 60
warcprox_args.digest_algorithm = 'sha1'
warcprox_args.base32 = True
warcprox_args.stats_db_file = None
warcprox_args.playback_port = None
warcprox_args.playback_index_db_file = None
warcprox_args.rethinkdb_servers = args.rethinkdb_servers
warcprox_args.rethinkdb_db = args.rethinkdb_db
warcprox_args.rethinkdb_big_table = True
warcprox_args.kafka_broker_list = None
warcprox_args.kafka_capture_feed_topic = None
warcprox_args.queue_size = 500
warcprox_args.max_threads = None
warcprox_args.profile = False
warcprox_args.onion_tor_socks_proxy = args.onion_tor_socks_proxy
return warcprox_args
warcprox_opts.port = 0
warcprox_opts.cacert = args.cacert
warcprox_opts.certs_dir = args.certs_dir
warcprox_opts.directory = args.warcs_dir
warcprox_opts.gzip = True
warcprox_opts.prefix = 'brozzler'
warcprox_opts.size = 1000 * 1000* 1000
warcprox_opts.rollover_idle_time = 3 * 60
warcprox_opts.digest_algorithm = 'sha1'
warcprox_opts.base32 = True
warcprox_opts.stats_db_file = None
warcprox_opts.playback_port = None
warcprox_opts.playback_index_db_file = None
warcprox_opts.rethinkdb_big_table_url = (
'rethinkdb://%s/%s/captures' % (
args.rethinkdb_servers, args.rethinkdb_db))
warcprox_opts.queue_size = 500
warcprox_opts.max_threads = None
warcprox_opts.profile = False
warcprox_opts.onion_tor_socks_proxy = args.onion_tor_socks_proxy
return warcprox_opts
def dump_state(self, signum=None, frame=None):
state_strs = []
@ -270,6 +268,7 @@ def main(argv=None):
arg_parser = _build_arg_parser(argv)
args = arg_parser.parse_args(args=argv[1:])
brozzler.cli.configure_logging(args)
brozzler.chrome.check_version(args.chrome_exe)
controller = BrozzlerEasyController(args)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())

View File

@ -93,7 +93,7 @@ class RethinkDbFrontier:
raise UnexpectedDbResult("expected %r to be %r in %r" % (
k, expected, result))
def claim_site(self, worker_id):
def claim_sites(self, n=1):
# XXX keep track of aggregate priority and prioritize sites accordingly?
while True:
result = (
@ -104,34 +104,43 @@ class RethinkDbFrontier:
.order_by(index="sites_last_disclaimed")
.filter((r.row["claimed"] != True) | (
r.row["last_claimed"] < r.now() - 60*60))
.limit(1)
.limit(n)
.update(
# try to avoid a race condition resulting in multiple
# brozzler-workers claiming the same site
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
r.branch((r.row["claimed"] != True) | (
r.row["last_claimed"] < r.now() - 60*60), {
"claimed": True, "last_claimed_by": worker_id,
"claimed": True,
"last_claimed": doublethink.utcnow()}, {}),
return_changes=True)).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
if result["replaced"] == 1:
if result["changes"][0]["old_val"]["claimed"]:
self._vet_result(
result, replaced=list(range(n+1)),
unchanged=list(range(n+1)))
sites = []
for i in range(result["replaced"]):
if result["changes"][i]["old_val"]["claimed"]:
self.logger.warn(
"re-claimed site that was still marked 'claimed' "
"because it was last claimed a long time ago "
"at %s, and presumably some error stopped it from "
"being disclaimed",
result["changes"][0]["old_val"]["last_claimed"])
site = brozzler.Site(self.rr, result["changes"][0]["new_val"])
else:
result["changes"][i]["old_val"]["last_claimed"])
site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
sites.append(site)
if not sites:
raise brozzler.NothingToClaim
# XXX This is the only place we enforce time limit for now. Worker
# loop should probably check time limit. Maybe frontier needs a
# housekeeping thread to ensure that time limits get enforced in a
# timely fashion.
if not self._enforce_time_limit(site):
return site
for site in list(sites):
if self._enforce_time_limit(site):
sites.remove(site)
if sites:
return sites
# else try again
def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0

View File

@ -37,6 +37,7 @@ import urlcanon
from requests.structures import CaseInsensitiveDict
import rethinkdb as r
import datetime
import urllib.parse
class ExtraHeaderAdder(urllib.request.BaseHandler):
def __init__(self, extra_headers):
@ -87,7 +88,9 @@ class YoutubeDLSpy(urllib.request.BaseHandler):
final_url = url
while final_url in redirects:
final_url = redirects.pop(final_url)['response_headers']['location']
txn = redirects.pop(final_url)
final_url = urllib.parse.urljoin(
txn['url'], txn['response_headers']['location'])
final_bounces = []
for txn in self.transactions:
@ -100,6 +103,7 @@ class BrozzlerWorker:
logger = logging.getLogger(__module__ + "." + __qualname__)
HEARTBEAT_INTERVAL = 20.0
SITE_SESSION_MINUTES = 15
def __init__(
self, frontier, service_registry=None, max_browsers=1,
@ -188,7 +192,7 @@ class BrozzlerWorker:
# in case youtube-dl takes a long time, heartbeat site.last_claimed
# to prevent another brozzler-worker from claiming the site
try:
if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=7):
if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=self.SITE_SESSION_MINUTES):
self.logger.debug(
'heartbeating site.last_claimed to prevent another '
'brozzler-worker claiming this site id=%r', site.id)
@ -498,13 +502,16 @@ class BrozzlerWorker:
def brozzle_site(self, browser, site):
try:
site.last_claimed_by = '%s:%s' % (
socket.gethostname(), browser.chrome.port)
site.save()
start = time.time()
page = None
self._frontier.honor_stop_request(site)
self.logger.info(
"brozzling site (proxy=%r) %r",
self._proxy_for(site), site)
while time.time() - start < 7 * 60:
while time.time() - start < self.SITE_SESSION_MINUTES * 60:
site.refresh()
self._frontier.honor_stop_request(site)
page = self._frontier.claim_page(site, "%s:%s" % (
@ -517,8 +524,9 @@ class BrozzlerWorker:
page.blocked_by_robots = True
self._frontier.completed_page(site, page)
else:
outlinks = self.brozzle_page(browser, site, page,
enable_youtube_dl=not self._skip_youtube_dl)
outlinks = self.brozzle_page(
browser, site, page,
enable_youtube_dl=not self._skip_youtube_dl)
self._frontier.completed_page(site, page)
self._frontier.scope_and_schedule_outlinks(
site, page, outlinks)
@ -598,36 +606,50 @@ class BrozzlerWorker:
if due:
self._service_heartbeat()
def _start_browsing_some_sites(self):
'''
Starts browsing some sites.
Raises:
NoBrowsersAvailable if none available
'''
browsers = self._browser_pool.acquire_multi(
(self._browser_pool.num_available() + 1) // 2)
try:
sites = self._frontier.claim_sites(len(browsers))
except:
self._browser_pool.release_all(browsers)
raise
for i in range(len(browsers)):
if i < len(sites):
th = threading.Thread(
target=self._brozzle_site_thread_target,
args=(browsers[i], sites[i]),
name="BrozzlingThread:%s" % browsers[i].chrome.port,
daemon=True)
with self._browsing_threads_lock:
self._browsing_threads.add(th)
th.start()
else:
self._browser_pool.release(browsers[i])
def run(self):
self.logger.info("brozzler worker starting")
try:
latest_state = None
while not self._shutdown.is_set():
self._service_heartbeat_if_due()
try:
browser = self._browser_pool.acquire()
try:
site = self._frontier.claim_site("%s:%s" % (
socket.gethostname(), browser.chrome.port))
th = threading.Thread(
target=self._brozzle_site_thread_target,
args=(browser, site),
name="BrozzlingThread:%s" % browser.chrome.port,
daemon=True)
with self._browsing_threads_lock:
self._browsing_threads.add(th)
th.start()
except:
self._browser_pool.release(browser)
raise
self._start_browsing_some_sites()
except brozzler.browser.NoBrowsersAvailable:
if latest_state != "browsers-busy":
self.logger.info(
"all %s browsers are busy", self._max_browsers)
latest_state = "browsers-busy"
logging.trace(
"all %s browsers are in use", self._max_browsers)
except brozzler.NothingToClaim:
pass
logging.trace(
"all active sites are already claimed by a "
"brozzler worker")
time.sleep(0.5)
self.logger.info("shutdown requested")
except r.ReqlError as e:
self.logger.error(

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b12.dev279',
version='1.1b13.dev285',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',
@ -79,8 +79,8 @@ setuptools.setup(
extras_require={
'dashboard': ['flask>=0.11', 'gunicorn'],
'easy': [
'warcprox>=2.1b1.dev87',
'pywb',
'warcprox>=2.4b1.dev145',
'pywb<2',
'flask>=0.11',
'gunicorn'
],

View File

@ -661,11 +661,10 @@ def test_warcprox_outage_resiliency(httpd):
opts = warcprox.Options()
opts.address = '0.0.0.0'
opts.port = 0
opts.rethinkdb_services_url = 'rethinkdb://localhost/brozzler/services'
warcprox1 = warcprox.controller.WarcproxController(
service_registry=svcreg, options=opts)
warcprox2 = warcprox.controller.WarcproxController(
service_registry=svcreg, options=opts)
warcprox1 = warcprox.controller.WarcproxController(opts)
warcprox2 = warcprox.controller.WarcproxController(opts)
warcprox1_thread = threading.Thread(
target=warcprox1.run_until_shutdown, name='warcprox1')
warcprox2_thread = threading.Thread(

View File

@ -826,30 +826,34 @@ def test_claim_site():
rr.table('sites').delete().run() # clean slate
with pytest.raises(brozzler.NothingToClaim):
claimed_site = frontier.claim_site(worker_id='test_claim_site')
claimed_site = frontier.claim_sites()
site = brozzler.Site(rr, {'seed': 'http://example.org/'})
brozzler.new_site(frontier, site)
claimed_site = frontier.claim_site(worker_id='test_claim_site')
claimed_sites = frontier.claim_sites()
assert len(claimed_sites) == 1
claimed_site = claimed_sites[0]
assert claimed_site.id == site.id
assert claimed_site.claimed
assert claimed_site.last_claimed >= doublethink.utcnow() - datetime.timedelta(minutes=1)
with pytest.raises(brozzler.NothingToClaim):
claimed_site = frontier.claim_site(worker_id='test_claim_site')
claimed_site = frontier.claim_sites()
# site last_claimed less than 1 hour ago still not to be reclaimed
claimed_site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=55)
claimed_site.save()
with pytest.raises(brozzler.NothingToClaim):
claimed_site = frontier.claim_site(worker_id='test_claim_site')
claimed_site = frontier.claim_sites()
# site last_claimed more than 1 hour ago can be reclaimed
site = claimed_site
claimed_site = None
site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=65)
site.save()
claimed_site = frontier.claim_site(worker_id='test_claim_site')
claimed_sites = frontier.claim_sites()
assert len(claimed_sites) == 1
claimed_site = claimed_sites[0]
assert claimed_site.id == site.id
# clean up

View File

@ -310,3 +310,30 @@ def test_thread_raise_second_with_block():
th.join()
assert isinstance(thread_caught_exception, Exception2)
def test_needs_browsing():
# only one test case here right now, which exposed a bug
class ConvenientHeaders(http.client.HTTPMessage):
def __init__(self, headers):
http.client.HTTPMessage.__init__(self)
for (k, v) in headers.items():
self.add_header(k, v)
page = brozzler.Page(None, {
'url':'http://example.com/a'})
spy = brozzler.worker.YoutubeDLSpy()
spy.transactions.append({
'url': 'http://example.com/a',
'method': 'HEAD',
'status_code': 301,
'response_headers': ConvenientHeaders({'Location': '/b'})})
spy.transactions.append({
'url': 'http://example.com/b',
'method': 'GET',
'status_code': 200,
'response_headers': ConvenientHeaders({
'Content-Type': 'application/pdf'})})
assert not brozzler.worker.BrozzlerWorker._needs_browsing(None, page, spy)

4
vagrant/Vagrantfile vendored
View File

@ -15,4 +15,8 @@ Vagrant.configure(2) do |config|
ansible.inventory_path = "../ansible/hosts-vagrant"
ansible.playbook = "../ansible/playbook.yml"
end
config.vm.provider 'virtualbox' do |v|
v.name = 'brozzler-test-vm'
end
end