mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-06-20 12:54:23 -04:00
implement resilience to warcprox outage, i.e. deal with brozzler.ProxyError in brozzler-worker
This commit is contained in:
parent
5603ff5380
commit
8256a34b4f
4 changed files with 142 additions and 6 deletions
|
@ -127,7 +127,7 @@ class BrozzlerWorker:
|
||||||
elif self._warcprox_auto:
|
elif self._warcprox_auto:
|
||||||
svc = self._service_registry.available_service('warcprox')
|
svc = self._service_registry.available_service('warcprox')
|
||||||
if svc is None:
|
if svc is None:
|
||||||
raise Exception(
|
raise brozzler.ProxyError(
|
||||||
'no available instances of warcprox in the service '
|
'no available instances of warcprox in the service '
|
||||||
'registry')
|
'registry')
|
||||||
site.proxy = '%s:%s' % (svc['host'], svc['port'])
|
site.proxy = '%s:%s' % (svc['host'], svc['port'])
|
||||||
|
@ -417,6 +417,10 @@ class BrozzlerWorker:
|
||||||
def brozzle_site(self, browser, site):
|
def brozzle_site(self, browser, site):
|
||||||
try:
|
try:
|
||||||
page = None
|
page = None
|
||||||
|
self._frontier.honor_stop_request(site)
|
||||||
|
self.logger.info(
|
||||||
|
"brozzling site (proxy=%s) %s",
|
||||||
|
repr(self._proxy_for(site)), site)
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while time.time() - start < 7 * 60:
|
while time.time() - start < 7 * 60:
|
||||||
site.refresh()
|
site.refresh()
|
||||||
|
@ -449,6 +453,18 @@ class BrozzlerWorker:
|
||||||
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
||||||
# except brozzler.browser.BrowsingAborted:
|
# except brozzler.browser.BrowsingAborted:
|
||||||
# self.logger.info("{} shut down".format(browser))
|
# self.logger.info("{} shut down".format(browser))
|
||||||
|
except brozzler.ProxyError as e:
|
||||||
|
if self._warcprox_auto:
|
||||||
|
logging.error(
|
||||||
|
'proxy error (site.proxy=%s), will try to choose a '
|
||||||
|
'healthy instance next time site is brozzled: %s',
|
||||||
|
site.proxy, e)
|
||||||
|
site.proxy = None
|
||||||
|
else:
|
||||||
|
# using brozzler-worker --proxy, nothing to do but try the
|
||||||
|
# same proxy again next time
|
||||||
|
logging.error(
|
||||||
|
'proxy error (site.proxy=%s): %s', repr(site.proxy), e)
|
||||||
except:
|
except:
|
||||||
self.logger.critical("unexpected exception", exc_info=True)
|
self.logger.critical("unexpected exception", exc_info=True)
|
||||||
finally:
|
finally:
|
||||||
|
@ -508,9 +524,6 @@ class BrozzlerWorker:
|
||||||
try:
|
try:
|
||||||
site = self._frontier.claim_site("%s:%s" % (
|
site = self._frontier.claim_site("%s:%s" % (
|
||||||
socket.gethostname(), browser.chrome.port))
|
socket.gethostname(), browser.chrome.port))
|
||||||
self.logger.info(
|
|
||||||
"brozzling site (proxy=%s) %s",
|
|
||||||
repr(self._proxy_for(site)), site)
|
|
||||||
th = threading.Thread(
|
th = threading.Thread(
|
||||||
target=self._brozzle_site_thread_target,
|
target=self._brozzle_site_thread_target,
|
||||||
args=(browser, site),
|
args=(browser, site),
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -32,7 +32,7 @@ def find_package_data(package):
|
||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='brozzler',
|
name='brozzler',
|
||||||
version='1.1b11.dev235',
|
version='1.1b11.dev236',
|
||||||
description='Distributed web crawling with browsers',
|
description='Distributed web crawling with browsers',
|
||||||
url='https://github.com/internetarchive/brozzler',
|
url='https://github.com/internetarchive/brozzler',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
|
|
@ -32,6 +32,7 @@ import requests
|
||||||
import subprocess
|
import subprocess
|
||||||
import http.server
|
import http.server
|
||||||
import logging
|
import logging
|
||||||
|
import warcprox
|
||||||
|
|
||||||
def start_service(service):
|
def start_service(service):
|
||||||
subprocess.check_call(['sudo', 'service', service, 'start'])
|
subprocess.check_call(['sudo', 'service', service, 'start'])
|
||||||
|
@ -579,3 +580,125 @@ def test_stop_crawl(httpd):
|
||||||
sites[2].refresh()
|
sites[2].refresh()
|
||||||
assert sites[2].status == 'FINISHED_STOP_REQUESTED'
|
assert sites[2].status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
|
||||||
|
def test_warcprox_outage_resiliency(httpd):
|
||||||
|
'''
|
||||||
|
Tests resiliency to warcprox outage.
|
||||||
|
|
||||||
|
If no instances of warcprox are healthy when starting to crawl a site,
|
||||||
|
brozzler-worker should sit there and wait until a healthy instance appears.
|
||||||
|
|
||||||
|
If an instance goes down, sites assigned to that instance should bounce
|
||||||
|
over to a healthy instance.
|
||||||
|
|
||||||
|
If all instances of warcprox go down, brozzler-worker should sit and wait.
|
||||||
|
'''
|
||||||
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
svcreg = doublethink.ServiceRegistry(rr)
|
||||||
|
|
||||||
|
# run two instances of warcprox
|
||||||
|
opts = warcprox.Options()
|
||||||
|
opts.address = '0.0.0.0'
|
||||||
|
opts.port = 0
|
||||||
|
|
||||||
|
warcprox1 = warcprox.controller.WarcproxController(
|
||||||
|
service_registry=svcreg, options=opts)
|
||||||
|
warcprox2 = warcprox.controller.WarcproxController(
|
||||||
|
service_registry=svcreg, options=opts)
|
||||||
|
warcprox1_thread = threading.Thread(
|
||||||
|
target=warcprox1.run_until_shutdown, name='warcprox1')
|
||||||
|
warcprox2_thread = threading.Thread(
|
||||||
|
target=warcprox2.run_until_shutdown, name='warcprox2')
|
||||||
|
|
||||||
|
# put together a site to crawl
|
||||||
|
test_id = 'test_warcprox_death-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
|
site = brozzler.Site(rr, {
|
||||||
|
'seed': 'http://localhost:%s/infinite/' % httpd.server_port,
|
||||||
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
||||||
|
|
||||||
|
try:
|
||||||
|
# we manage warcprox instances ourselves, so stop the one running on
|
||||||
|
# the system, if any
|
||||||
|
try:
|
||||||
|
stop_service('warcprox')
|
||||||
|
except Exception as e:
|
||||||
|
logging.warn('problem stopping warcprox service: %s', e)
|
||||||
|
|
||||||
|
# queue the site for brozzling
|
||||||
|
brozzler.new_site(frontier, site)
|
||||||
|
|
||||||
|
# check that nothing happens
|
||||||
|
# XXX tail brozzler-worker.log or something?
|
||||||
|
time.sleep(30)
|
||||||
|
site.refresh()
|
||||||
|
assert site.status == 'ACTIVE'
|
||||||
|
assert not site.proxy
|
||||||
|
assert len(list(frontier.site_pages(site.id))) == 1
|
||||||
|
|
||||||
|
# start one instance of warcprox
|
||||||
|
warcprox1_thread.start()
|
||||||
|
|
||||||
|
# check that it started using that instance
|
||||||
|
start = time.time()
|
||||||
|
while not site.proxy and time.time() - start < 30:
|
||||||
|
time.sleep(0.5)
|
||||||
|
site.refresh()
|
||||||
|
assert site.proxy.endswith(':%s' % warcprox1.proxy.server_port)
|
||||||
|
|
||||||
|
# check that the site accumulates pages in the frontier, confirming
|
||||||
|
# that crawling is really happening
|
||||||
|
start = time.time()
|
||||||
|
while (len(list(frontier.site_pages(site.id))) <= 1
|
||||||
|
and time.time() - start < 60):
|
||||||
|
time.sleep(0.5)
|
||||||
|
site.refresh()
|
||||||
|
assert len(list(frontier.site_pages(site.id))) > 1
|
||||||
|
|
||||||
|
# stop warcprox #1, start warcprox #2
|
||||||
|
warcprox2_thread.start()
|
||||||
|
warcprox1.stop.set()
|
||||||
|
warcprox1_thread.join()
|
||||||
|
|
||||||
|
# check that it switched over to warcprox #2
|
||||||
|
start = time.time()
|
||||||
|
while ((not site.proxy
|
||||||
|
or not site.proxy.endswith(':%s' % warcprox2.proxy.server_port))
|
||||||
|
and time.time() - start < 30):
|
||||||
|
time.sleep(0.5)
|
||||||
|
site.refresh()
|
||||||
|
assert site.proxy.endswith(':%s' % warcprox2.proxy.server_port)
|
||||||
|
|
||||||
|
# stop warcprox #2
|
||||||
|
warcprox2.stop.set()
|
||||||
|
warcprox2_thread.join()
|
||||||
|
|
||||||
|
page_count = len(list(frontier.site_pages(site.id)))
|
||||||
|
assert page_count > 1
|
||||||
|
|
||||||
|
# check that it is waiting for a warcprox to appear
|
||||||
|
time.sleep(30)
|
||||||
|
site.refresh()
|
||||||
|
assert site.status == 'ACTIVE'
|
||||||
|
assert not site.proxy
|
||||||
|
assert len(list(frontier.site_pages(site.id))) == page_count
|
||||||
|
|
||||||
|
# stop crawling the site, else it can pollute subsequent test runs
|
||||||
|
brozzler.cli.brozzler_stop_crawl([
|
||||||
|
'brozzler-stop-crawl', '--site=%s' % site.id])
|
||||||
|
site.refresh()
|
||||||
|
assert site.stop_requested
|
||||||
|
|
||||||
|
# stop request should be honored quickly
|
||||||
|
start = time.time()
|
||||||
|
while not site.status.startswith(
|
||||||
|
'FINISHED') and time.time() - start < 120:
|
||||||
|
time.sleep(0.5)
|
||||||
|
site.refresh()
|
||||||
|
assert site.status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
finally:
|
||||||
|
warcprox1.stop.set()
|
||||||
|
warcprox2.stop.set()
|
||||||
|
warcprox1_thread.join()
|
||||||
|
warcprox2_thread.join()
|
||||||
|
start_service('warcprox')
|
||||||
|
|
||||||
|
|
|
@ -17,5 +17,5 @@ vagrant ssh -- 'status warcprox ;
|
||||||
status vnc-websock'
|
status vnc-websock'
|
||||||
echo
|
echo
|
||||||
|
|
||||||
vagrant ssh -- 'source /opt/brozzler-ve34/bin/activate && pip install pytest'
|
vagrant ssh -- 'set -x ; source /opt/brozzler-ve34/bin/activate && pip install pytest && pip install --upgrade --pre "warcprox>=2.1b1.dev71"'
|
||||||
vagrant ssh -- "source /opt/brozzler-ve34/bin/activate && DISPLAY=:1 py.test -v /brozzler/tests $@"
|
vagrant ssh -- "source /opt/brozzler-ve34/bin/activate && DISPLAY=:1 py.test -v /brozzler/tests $@"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue