diff --git a/brozzler/worker.py b/brozzler/worker.py index 1aef258..bc5bb17 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -127,7 +127,7 @@ class BrozzlerWorker: elif self._warcprox_auto: svc = self._service_registry.available_service('warcprox') if svc is None: - raise Exception( + raise brozzler.ProxyError( 'no available instances of warcprox in the service ' 'registry') site.proxy = '%s:%s' % (svc['host'], svc['port']) @@ -205,6 +205,8 @@ class BrozzlerWorker: 'got "%s %s" response on warcprox ' 'WARCPROX_WRITE_RECORD request (expected 204)', e.getcode(), e.info()) + except urllib.error.URLError as e: + raise brozzler.ProxyError('_warcprox_write_record: %s', e) def _remember_videos(self, page, ydl_spy): if not 'videos' in page: @@ -415,6 +417,10 @@ class BrozzlerWorker: def brozzle_site(self, browser, site): try: page = None + self._frontier.honor_stop_request(site) + self.logger.info( + "brozzling site (proxy=%s) %s", + repr(self._proxy_for(site)), site) start = time.time() while time.time() - start < 7 * 60: site.refresh() @@ -447,6 +453,18 @@ class BrozzlerWorker: self._frontier.finished(site, "FINISHED_STOP_REQUESTED") # except brozzler.browser.BrowsingAborted: # self.logger.info("{} shut down".format(browser)) + except brozzler.ProxyError as e: + if self._warcprox_auto: + logging.error( + 'proxy error (site.proxy=%s), will try to choose a ' + 'healthy instance next time site is brozzled: %s', + site.proxy, e) + site.proxy = None + else: + # using brozzler-worker --proxy, nothing to do but try the + # same proxy again next time + logging.error( + 'proxy error (site.proxy=%s): %s', repr(site.proxy), e) except: self.logger.critical("unexpected exception", exc_info=True) finally: @@ -506,9 +524,6 @@ class BrozzlerWorker: try: site = self._frontier.claim_site("%s:%s" % ( socket.gethostname(), browser.chrome.port)) - self.logger.info( - "brozzling site (proxy=%s) %s", - repr(self._proxy_for(site)), site) th = threading.Thread( target=self._brozzle_site_thread_target, args=(browser, site), diff --git a/setup.py b/setup.py index fb5b62a..ff27ad1 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev234', + version='1.1b11.dev236', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 4202fbd..59b289c 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -32,6 +32,7 @@ import requests import subprocess import http.server import logging +import warcprox def start_service(service): subprocess.check_call(['sudo', 'service', service, 'start']) @@ -579,3 +580,125 @@ def test_stop_crawl(httpd): sites[2].refresh() assert sites[2].status == 'FINISHED_STOP_REQUESTED' +def test_warcprox_outage_resiliency(httpd): + ''' + Tests resiliency to warcprox outage. + + If no instances of warcprox are healthy when starting to crawl a site, + brozzler-worker should sit there and wait until a healthy instance appears. + + If an instance goes down, sites assigned to that instance should bounce + over to a healthy instance. + + If all instances of warcprox go down, brozzler-worker should sit and wait. + ''' + rr = doublethink.Rethinker('localhost', db='brozzler') + frontier = brozzler.RethinkDbFrontier(rr) + svcreg = doublethink.ServiceRegistry(rr) + + # run two instances of warcprox + opts = warcprox.Options() + opts.address = '0.0.0.0' + opts.port = 0 + + warcprox1 = warcprox.controller.WarcproxController( + service_registry=svcreg, options=opts) + warcprox2 = warcprox.controller.WarcproxController( + service_registry=svcreg, options=opts) + warcprox1_thread = threading.Thread( + target=warcprox1.run_until_shutdown, name='warcprox1') + warcprox2_thread = threading.Thread( + target=warcprox2.run_until_shutdown, name='warcprox2') + + # put together a site to crawl + test_id = 'test_warcprox_death-%s' % datetime.datetime.utcnow().isoformat() + site = brozzler.Site(rr, { + 'seed': 'http://localhost:%s/infinite/' % httpd.server_port, + 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) + + try: + # we manage warcprox instances ourselves, so stop the one running on + # the system, if any + try: + stop_service('warcprox') + except Exception as e: + logging.warn('problem stopping warcprox service: %s', e) + + # queue the site for brozzling + brozzler.new_site(frontier, site) + + # check that nothing happens + # XXX tail brozzler-worker.log or something? + time.sleep(30) + site.refresh() + assert site.status == 'ACTIVE' + assert not site.proxy + assert len(list(frontier.site_pages(site.id))) == 1 + + # start one instance of warcprox + warcprox1_thread.start() + + # check that it started using that instance + start = time.time() + while not site.proxy and time.time() - start < 30: + time.sleep(0.5) + site.refresh() + assert site.proxy.endswith(':%s' % warcprox1.proxy.server_port) + + # check that the site accumulates pages in the frontier, confirming + # that crawling is really happening + start = time.time() + while (len(list(frontier.site_pages(site.id))) <= 1 + and time.time() - start < 60): + time.sleep(0.5) + site.refresh() + assert len(list(frontier.site_pages(site.id))) > 1 + + # stop warcprox #1, start warcprox #2 + warcprox2_thread.start() + warcprox1.stop.set() + warcprox1_thread.join() + + # check that it switched over to warcprox #2 + start = time.time() + while ((not site.proxy + or not site.proxy.endswith(':%s' % warcprox2.proxy.server_port)) + and time.time() - start < 30): + time.sleep(0.5) + site.refresh() + assert site.proxy.endswith(':%s' % warcprox2.proxy.server_port) + + # stop warcprox #2 + warcprox2.stop.set() + warcprox2_thread.join() + + page_count = len(list(frontier.site_pages(site.id))) + assert page_count > 1 + + # check that it is waiting for a warcprox to appear + time.sleep(30) + site.refresh() + assert site.status == 'ACTIVE' + assert not site.proxy + assert len(list(frontier.site_pages(site.id))) == page_count + + # stop crawling the site, else it can pollute subsequent test runs + brozzler.cli.brozzler_stop_crawl([ + 'brozzler-stop-crawl', '--site=%s' % site.id]) + site.refresh() + assert site.stop_requested + + # stop request should be honored quickly + start = time.time() + while not site.status.startswith( + 'FINISHED') and time.time() - start < 120: + time.sleep(0.5) + site.refresh() + assert site.status == 'FINISHED_STOP_REQUESTED' + finally: + warcprox1.stop.set() + warcprox2.stop.set() + warcprox1_thread.join() + warcprox2_thread.join() + start_service('warcprox') + diff --git a/tests/test_units.py b/tests/test_units.py index d9825d5..911fceb 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -146,6 +146,15 @@ def test_proxy_down(): with pytest.raises(brozzler.ProxyError): worker._fetch_url(site, page) + # WARCPROX_WRITE_RECORD + with pytest.raises(brozzler.ProxyError): + worker._warcprox_write_record( + warcprox_address=not_listening_proxy, + url='test://proxy_down/warcprox_write_record', + warc_type='metadata', + content_type='text/plain', + payload=b'''payload doesn't matter here''') + def test_start_stop_backwards_compat(): site = brozzler.Site(None, {'seed': 'http://example.com/'}) assert len(site.starts_and_stops) == 1 diff --git a/vagrant/run-tests.sh b/vagrant/run-tests.sh index 122286d..978ef7d 100755 --- a/vagrant/run-tests.sh +++ b/vagrant/run-tests.sh @@ -17,5 +17,5 @@ vagrant ssh -- 'status warcprox ; status vnc-websock' echo -vagrant ssh -- 'source /opt/brozzler-ve34/bin/activate && pip install pytest' +vagrant ssh -- 'set -x ; source /opt/brozzler-ve34/bin/activate && pip install pytest && pip install --upgrade --pre "warcprox>=2.1b1.dev71"' vagrant ssh -- "source /opt/brozzler-ve34/bin/activate && DISPLAY=:1 py.test -v /brozzler/tests $@"