Merge branch 'master' into qa

* master:
  implement resilience to warcprox outage, i.e. deal with brozzler.ProxyError in brozzler-worker
  have _warcprox_write_record also raise ProxyError when appropriate, and test this
This commit is contained in:
Noah Levitt 2017-04-18 17:54:21 -07:00
commit d8904dc9e7
5 changed files with 153 additions and 6 deletions

View File

@ -127,7 +127,7 @@ class BrozzlerWorker:
elif self._warcprox_auto:
svc = self._service_registry.available_service('warcprox')
if svc is None:
raise Exception(
raise brozzler.ProxyError(
'no available instances of warcprox in the service '
'registry')
site.proxy = '%s:%s' % (svc['host'], svc['port'])
@ -205,6 +205,8 @@ class BrozzlerWorker:
'got "%s %s" response on warcprox '
'WARCPROX_WRITE_RECORD request (expected 204)',
e.getcode(), e.info())
except urllib.error.URLError as e:
raise brozzler.ProxyError('_warcprox_write_record: %s', e)
def _remember_videos(self, page, ydl_spy):
if not 'videos' in page:
@ -415,6 +417,10 @@ class BrozzlerWorker:
def brozzle_site(self, browser, site):
try:
page = None
self._frontier.honor_stop_request(site)
self.logger.info(
"brozzling site (proxy=%s) %s",
repr(self._proxy_for(site)), site)
start = time.time()
while time.time() - start < 7 * 60:
site.refresh()
@ -447,6 +453,18 @@ class BrozzlerWorker:
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
# except brozzler.browser.BrowsingAborted:
# self.logger.info("{} shut down".format(browser))
except brozzler.ProxyError as e:
if self._warcprox_auto:
logging.error(
'proxy error (site.proxy=%s), will try to choose a '
'healthy instance next time site is brozzled: %s',
site.proxy, e)
site.proxy = None
else:
# using brozzler-worker --proxy, nothing to do but try the
# same proxy again next time
logging.error(
'proxy error (site.proxy=%s): %s', repr(site.proxy), e)
except:
self.logger.critical("unexpected exception", exc_info=True)
finally:
@ -506,9 +524,6 @@ class BrozzlerWorker:
try:
site = self._frontier.claim_site("%s:%s" % (
socket.gethostname(), browser.chrome.port))
self.logger.info(
"brozzling site (proxy=%s) %s",
repr(self._proxy_for(site)), site)
th = threading.Thread(
target=self._brozzle_site_thread_target,
args=(browser, site),

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b11.dev234',
version='1.1b11.dev236',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',

View File

@ -32,6 +32,7 @@ import requests
import subprocess
import http.server
import logging
import warcprox
def start_service(service):
subprocess.check_call(['sudo', 'service', service, 'start'])
@ -579,3 +580,125 @@ def test_stop_crawl(httpd):
sites[2].refresh()
assert sites[2].status == 'FINISHED_STOP_REQUESTED'
def test_warcprox_outage_resiliency(httpd):
'''
Tests resiliency to warcprox outage.
If no instances of warcprox are healthy when starting to crawl a site,
brozzler-worker should sit there and wait until a healthy instance appears.
If an instance goes down, sites assigned to that instance should bounce
over to a healthy instance.
If all instances of warcprox go down, brozzler-worker should sit and wait.
'''
rr = doublethink.Rethinker('localhost', db='brozzler')
frontier = brozzler.RethinkDbFrontier(rr)
svcreg = doublethink.ServiceRegistry(rr)
# run two instances of warcprox
opts = warcprox.Options()
opts.address = '0.0.0.0'
opts.port = 0
warcprox1 = warcprox.controller.WarcproxController(
service_registry=svcreg, options=opts)
warcprox2 = warcprox.controller.WarcproxController(
service_registry=svcreg, options=opts)
warcprox1_thread = threading.Thread(
target=warcprox1.run_until_shutdown, name='warcprox1')
warcprox2_thread = threading.Thread(
target=warcprox2.run_until_shutdown, name='warcprox2')
# put together a site to crawl
test_id = 'test_warcprox_death-%s' % datetime.datetime.utcnow().isoformat()
site = brozzler.Site(rr, {
'seed': 'http://localhost:%s/infinite/' % httpd.server_port,
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
try:
# we manage warcprox instances ourselves, so stop the one running on
# the system, if any
try:
stop_service('warcprox')
except Exception as e:
logging.warn('problem stopping warcprox service: %s', e)
# queue the site for brozzling
brozzler.new_site(frontier, site)
# check that nothing happens
# XXX tail brozzler-worker.log or something?
time.sleep(30)
site.refresh()
assert site.status == 'ACTIVE'
assert not site.proxy
assert len(list(frontier.site_pages(site.id))) == 1
# start one instance of warcprox
warcprox1_thread.start()
# check that it started using that instance
start = time.time()
while not site.proxy and time.time() - start < 30:
time.sleep(0.5)
site.refresh()
assert site.proxy.endswith(':%s' % warcprox1.proxy.server_port)
# check that the site accumulates pages in the frontier, confirming
# that crawling is really happening
start = time.time()
while (len(list(frontier.site_pages(site.id))) <= 1
and time.time() - start < 60):
time.sleep(0.5)
site.refresh()
assert len(list(frontier.site_pages(site.id))) > 1
# stop warcprox #1, start warcprox #2
warcprox2_thread.start()
warcprox1.stop.set()
warcprox1_thread.join()
# check that it switched over to warcprox #2
start = time.time()
while ((not site.proxy
or not site.proxy.endswith(':%s' % warcprox2.proxy.server_port))
and time.time() - start < 30):
time.sleep(0.5)
site.refresh()
assert site.proxy.endswith(':%s' % warcprox2.proxy.server_port)
# stop warcprox #2
warcprox2.stop.set()
warcprox2_thread.join()
page_count = len(list(frontier.site_pages(site.id)))
assert page_count > 1
# check that it is waiting for a warcprox to appear
time.sleep(30)
site.refresh()
assert site.status == 'ACTIVE'
assert not site.proxy
assert len(list(frontier.site_pages(site.id))) == page_count
# stop crawling the site, else it can pollute subsequent test runs
brozzler.cli.brozzler_stop_crawl([
'brozzler-stop-crawl', '--site=%s' % site.id])
site.refresh()
assert site.stop_requested
# stop request should be honored quickly
start = time.time()
while not site.status.startswith(
'FINISHED') and time.time() - start < 120:
time.sleep(0.5)
site.refresh()
assert site.status == 'FINISHED_STOP_REQUESTED'
finally:
warcprox1.stop.set()
warcprox2.stop.set()
warcprox1_thread.join()
warcprox2_thread.join()
start_service('warcprox')

View File

@ -146,6 +146,15 @@ def test_proxy_down():
with pytest.raises(brozzler.ProxyError):
worker._fetch_url(site, page)
# WARCPROX_WRITE_RECORD
with pytest.raises(brozzler.ProxyError):
worker._warcprox_write_record(
warcprox_address=not_listening_proxy,
url='test://proxy_down/warcprox_write_record',
warc_type='metadata',
content_type='text/plain',
payload=b'''payload doesn't matter here''')
def test_start_stop_backwards_compat():
site = brozzler.Site(None, {'seed': 'http://example.com/'})
assert len(site.starts_and_stops) == 1

View File

@ -17,5 +17,5 @@ vagrant ssh -- 'status warcprox ;
status vnc-websock'
echo
vagrant ssh -- 'source /opt/brozzler-ve34/bin/activate && pip install pytest'
vagrant ssh -- 'set -x ; source /opt/brozzler-ve34/bin/activate && pip install pytest && pip install --upgrade --pre "warcprox>=2.1b1.dev71"'
vagrant ssh -- "source /opt/brozzler-ve34/bin/activate && DISPLAY=:1 py.test -v /brozzler/tests $@"