#!/usr/bin/env python ''' test_cluster.py - integration tests for a brozzler cluster, expects brozzler, warcprox, pywb, rethinkdb and other dependencies to be running already Copyright (C) 2016-2018 Internet Archive Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' import pytest import http.server import threading import urllib.request import os import socket import doublethink import time import brozzler import datetime import requests import subprocess import http.server import logging import warcprox def start_service(service): subprocess.check_call(['sudo', 'service', service, 'start']) def stop_service(service): subprocess.check_call(['sudo', 'service', service, 'stop']) @pytest.fixture(scope='module') def httpd(request): class RequestHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): if self.path == '/site5/redirect/': self.send_response(303, 'See other') self.send_header('Connection', 'close') self.send_header('Content-Length', 0) self.send_header('Location', '/site5/destination/') self.end_headers() self.wfile.write(b'') elif self.path == '/site9/redirect.html': self.send_response(303, 'See other') self.send_header('Connection', 'close') self.send_header('Content-Length', 0) self.send_header('Location', '/site9/destination.html') self.end_headers() self.wfile.write(b'') elif self.path.startswith('/infinite/'): payload = b''' infinite site a/ b/ c/ d/ e/ f/ g/ h/ i/ ''' self.send_response(200, 'OK') self.send_header('Connection', 'close') self.send_header('Content-Length', len(payload)) self.end_headers() self.wfile.write(payload) else: super().do_GET() # SimpleHTTPRequestHandler always uses CWD so we have to chdir os.chdir(os.path.join(os.path.dirname(__file__), 'htdocs')) httpd = http.server.HTTPServer(('localhost', 0), RequestHandler) httpd_thread = threading.Thread(name='httpd', target=httpd.serve_forever) httpd_thread.start() def fin(): httpd.shutdown() httpd.server_close() httpd_thread.join() request.addfinalizer(fin) return httpd def test_httpd(httpd): ''' Tests that our http server is working as expected, and that two fetches of the same url return the same payload, proving it can be used to test deduplication. ''' payload1 = content2 = None url = 'http://localhost:%s/site1/file1.txt' % httpd.server_port with urllib.request.urlopen(url) as response: assert response.status == 200 payload1 = response.read() assert payload1 with urllib.request.urlopen(url) as response: assert response.status == 200 payload2 = response.read() assert payload2 assert payload1 == payload2 def test_services_up(): '''Check that the expected services are up and running.''' # check that rethinkdb is listening and looks sane rr = doublethink.Rethinker(db='rethinkdb') # built-in db tbls = rr.table_list().run() assert len(tbls) > 10 # check that warcprox is listening with socket.socket() as s: # if the connect fails an exception is raised and the test fails s.connect(('localhost', 8000)) # check that pywb is listening with socket.socket() as s: # if the connect fails an exception is raised and the test fails s.connect(('localhost', 8880)) # check that brozzler dashboard is listening with socket.socket() as s: # if the connect fails an exception is raised and the test fails s.connect(('localhost', 8881)) def test_brozzle_site(httpd): test_id = 'test_brozzle_site-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/site1/' % httpd.server_port, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) # the two pages we expect to be crawled page1 = 'http://localhost:%s/site1/' % httpd.server_port page2 = 'http://localhost:%s/site1/file1.txt' % httpd.server_port robots = 'http://localhost:%s/robots.txt' % httpd.server_port # so we can examine rethinkdb before it does anything try: stop_service('brozzler-worker') assert site.id is None frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) assert site.id is not None assert len(list(frontier.site_pages(site.id))) == 1 finally: start_service('brozzler-worker') # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # check that we got the two pages we expected pages = list(frontier.site_pages(site.id)) assert len(pages) == 2 assert {page.url for page in pages} == { 'http://localhost:%s/site1/' % httpd.server_port, 'http://localhost:%s/site1/file1.txt' % httpd.server_port} time.sleep(2) # in case warcprox hasn't finished processing urls # take a look at the captures table captures = rr.table('captures').filter({'test_id':test_id}).run() captures_by_url = { c['url']: c for c in captures if c['http_method'] != 'HEAD'} assert robots in captures_by_url assert page1 in captures_by_url assert page2 in captures_by_url assert 'screenshot:%s' % page1 in captures_by_url assert 'thumbnail:%s' % page1 in captures_by_url # no screenshots of plaintext # check pywb t14 = captures_by_url[page2]['timestamp'].strftime('%Y%m%d%H%M%S') wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, page2) expected_payload = open(os.path.join( os.path.dirname(__file__), 'htdocs', 'site1', 'file1.txt'), 'rb').read() assert requests.get(wb_url).content == expected_payload url = 'screenshot:%s' % page1 t14 = captures_by_url[url]['timestamp'].strftime('%Y%m%d%H%M%S') wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, url) response = requests.get(wb_url) assert response.status_code == 200 assert response.headers['content-type'] == 'image/jpeg' url = 'thumbnail:%s' % page1 t14 = captures_by_url[url]['timestamp'].strftime('%Y%m%d%H%M%S') wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, url) response = requests.get(wb_url) assert response.status_code == 200 assert response.headers['content-type'] == 'image/jpeg' def test_proxy_warcprox(httpd): '''Test --proxy with proxy that happens to be warcprox''' try: stop_service('brozzler-worker') _test_proxy_setting( httpd, proxy='localhost:8000', warcprox_auto=False, is_warcprox=True) finally: start_service('brozzler-worker') def test_proxy_non_warcprox(httpd): '''Test --proxy with proxy that happens not to be warcprox''' class DumbProxyRequestHandler(http.server.SimpleHTTPRequestHandler): def do_HEAD(self): if not hasattr(self.server, 'requests'): self.server.requests = [] logging.info('%s %s', self.command, self.path) self.server.requests.append('%s %s' % (self.command, self.path)) response = urllib.request.urlopen(self.path) self.wfile.write(('HTTP/1.0 %s %s\r\n' % ( response.code, response.reason)).encode('ascii')) for header in response.getheaders(): self.wfile.write(('%s: %s\r\n' % ( header[0], header[1])).encode('ascii')) self.wfile.write(b'\r\n') return response def do_GET(self): response = self.do_HEAD() self.copyfile(response, self.wfile) def do_WARCPROX_WRITE_RECORD(self): if not hasattr(self.server, 'requests'): self.server.requests = [] logging.info('%s %s', self.command, self.path) self.send_error(400) proxy = http.server.HTTPServer(('localhost', 0), DumbProxyRequestHandler) th = threading.Thread(name='dumb-proxy', target=proxy.serve_forever) th.start() try: stop_service('brozzler-worker') _test_proxy_setting( httpd, proxy='localhost:%s' % proxy.server_port, warcprox_auto=False, is_warcprox=False) finally: start_service('brozzler-worker') assert len(proxy.requests) <= 15 assert proxy.requests.count('GET /status') == 1 assert ('GET http://localhost:%s/site1/' % httpd.server_port) in proxy.requests assert ('GET http://localhost:%s/site1/file1.txt' % httpd.server_port) in proxy.requests assert [req for req in proxy.requests if req.startswith('WARCPROX_WRITE_RECORD')] == [] proxy.shutdown() th.join() def test_no_proxy(httpd): try: stop_service('brozzler-worker') _test_proxy_setting( httpd, proxy=None, warcprox_auto=False, is_warcprox=False) finally: start_service('brozzler-worker') # XXX how to check that no proxy was used? def test_warcprox_auto(httpd): '''Test --warcprox-auto''' try: stop_service('brozzler-worker') _test_proxy_setting( httpd, proxy=None, warcprox_auto=True, is_warcprox=True) finally: start_service('brozzler-worker') def test_proxy_conflict(): with pytest.raises(AssertionError) as excinfo: worker = brozzler.worker.BrozzlerWorker( None, None, warcprox_auto=True, proxy='localhost:12345') def _test_proxy_setting( httpd, proxy=None, warcprox_auto=False, is_warcprox=False): test_id = 'test_proxy=%s_warcprox_auto=%s_is_warcprox=%s-%s' % ( proxy, warcprox_auto, is_warcprox, datetime.datetime.utcnow().isoformat()) # the two pages we expect to be crawled page1 = 'http://localhost:%s/site1/' % httpd.server_port page2 = 'http://localhost:%s/site1/file1.txt' % httpd.server_port robots = 'http://localhost:%s/robots.txt' % httpd.server_port rr = doublethink.Rethinker('localhost', db='brozzler') service_registry = doublethink.ServiceRegistry(rr) site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/site1/' % httpd.server_port, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) assert site.id is None frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) assert site.id is not None assert len(list(frontier.site_pages(site.id))) == 1 worker = brozzler.worker.BrozzlerWorker( frontier, service_registry, max_browsers=1, chrome_exe=brozzler.suggest_default_chrome_exe(), warcprox_auto=warcprox_auto, proxy=proxy) browser = worker._browser_pool.acquire() worker.brozzle_site(browser, site) worker._browser_pool.release(browser) # check proxy is set assert site.status == 'FINISHED' if warcprox_auto: assert site.proxy[-5:] == ':8000' else: assert not site.proxy site.refresh() # check that these things were persisted assert site.status == 'FINISHED' if warcprox_auto: assert site.proxy[-5:] == ':8000' else: assert not site.proxy # check that we got the two pages we expected pages = list(frontier.site_pages(site.id)) assert len(pages) == 2 assert {page.url for page in pages} == { 'http://localhost:%s/site1/' % httpd.server_port, 'http://localhost:%s/site1/file1.txt' % httpd.server_port} time.sleep(2) # in case warcprox hasn't finished processing urls # take a look at the captures table captures = rr.table('captures').filter({'test_id':test_id}).run() captures_by_url = { c['url']: c for c in captures if c['http_method'] != 'HEAD'} if is_warcprox: assert robots in captures_by_url assert page1 in captures_by_url assert page2 in captures_by_url assert 'screenshot:%s' % page1 in captures_by_url assert 'thumbnail:%s' % page1 in captures_by_url # check pywb t14 = captures_by_url[page2]['timestamp'].strftime('%Y%m%d%H%M%S') wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, page2) expected_payload = open(os.path.join( os.path.dirname(__file__), 'htdocs', 'site1', 'file1.txt'), 'rb').read() assert requests.get(wb_url).content == expected_payload else: assert captures_by_url == {} def test_obey_robots(httpd): test_id = 'test_obey_robots-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/site1/' % httpd.server_port, 'user_agent': 'im a badbot', # robots.txt blocks badbot 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) # so we can examine rethinkdb before it does anything try: stop_service('brozzler-worker') assert site.id is None frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) assert site.id is not None site_pages = list(frontier.site_pages(site.id)) assert len(site_pages) == 1 assert site_pages[0].url == site.seed assert site_pages[0].needs_robots_check finally: start_service('brozzler-worker') # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # check that only the one page is in rethinkdb pages = list(frontier.site_pages(site.id)) assert len(pages) == 1 page = pages[0] assert page.url == 'http://localhost:%s/site1/' % httpd.server_port assert page.blocked_by_robots # take a look at the captures table time.sleep(2) # in case warcprox hasn't finished processing urls robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port captures = list(rr.table('captures').filter({'test_id':test_id}).run()) assert len(captures) == 1 assert captures[0]['url'] == robots_url # check pywb t14 = captures[0]['timestamp'].strftime('%Y%m%d%H%M%S') wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, robots_url) expected_payload = open(os.path.join( os.path.dirname(__file__), 'htdocs', 'robots.txt'), 'rb').read() assert requests.get( wb_url, allow_redirects=False).content == expected_payload def test_login(httpd): test_id = 'test_login-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/site2/' % httpd.server_port, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}, 'username': 'test_username', 'password': 'test_password'}) frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # take a look at the captures table time.sleep(2) # in case warcprox hasn't finished processing urls robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port captures = list(rr.table('captures').filter( {'test_id':test_id}).order_by('timestamp').run()) meth_url = ['%s %s' % (c['http_method'], c['url']) for c in captures] # there are several forms in in htdocs/site2/login.html but only one # that brozzler's heuristic should match and try to submit, and it has # action='00', so we can check for that here assert ('POST http://localhost:%s/site2/00' % httpd.server_port) in meth_url # sanity check the rest of the crawl assert ('GET http://localhost:%s/robots.txt' % httpd.server_port) in meth_url assert ('GET http://localhost:%s/site2/' % httpd.server_port) in meth_url assert ('WARCPROX_WRITE_RECORD screenshot:http://localhost:%s/site2/' % httpd.server_port) in meth_url assert ('WARCPROX_WRITE_RECORD thumbnail:http://localhost:%s/site2/' % httpd.server_port) in meth_url assert ('GET http://localhost:%s/site2/login.html' % httpd.server_port) in meth_url assert ('WARCPROX_WRITE_RECORD screenshot:http://localhost:%s/site2/login.html' % httpd.server_port) in meth_url assert ('WARCPROX_WRITE_RECORD thumbnail:http://localhost:%s/site2/login.html' % httpd.server_port) in meth_url def test_seed_redirect(httpd): test_id = 'test_seed_redirect-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') seed_url = 'http://localhost:%s/site5/redirect/' % httpd.server_port site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/site5/redirect/' % httpd.server_port, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) assert site.scope == {'accepts': [{'ssurt': 'localhost,//%s:http:/site5/redirect/' % httpd.server_port}]} frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) assert site.id # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # take a look at the pages table pages = list(frontier.site_pages(site.id)) assert len(pages) == 2 pages.sort(key=lambda page: page.hops_from_seed) assert pages[0].hops_from_seed == 0 assert pages[0].url == seed_url assert pages[0].redirect_url == 'http://localhost:%s/site5/destination/' % httpd.server_port assert pages[1].hops_from_seed == 1 assert pages[1].url == 'http://localhost:%s/site5/destination/page2.html' % httpd.server_port # check that scope has been updated properly assert site.scope == {'accepts': [ {'ssurt': 'localhost,//%s:http:/site5/redirect/' % httpd.server_port}, {'ssurt': 'localhost,//%s:http:/site5/destination/' % httpd.server_port}]} def test_hashtags(httpd): test_id = 'test_hashtags-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') seed_url = 'http://localhost:%s/site7/' % httpd.server_port site = brozzler.Site(rr, { 'seed': seed_url, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) assert site.id # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # check that we the page we expected pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url) assert len(pages) == 2 assert pages[0].url == seed_url assert pages[0].hops_from_seed == 0 assert pages[0].brozzle_count == 1 assert pages[0].outlinks['accepted'] == ['http://localhost:%s/site7/foo.html' % httpd.server_port] assert not pages[0].hashtags assert pages[1].url == 'http://localhost:%s/site7/foo.html' % httpd.server_port assert pages[1].hops_from_seed == 1 assert pages[1].brozzle_count == 1 assert sorted(pages[1].hashtags) == ['#boosh','#ignored','#whee',] time.sleep(2) # in case warcprox hasn't finished processing urls # take a look at the captures table captures = rr.table('captures').filter({'test_id':test_id}).run() captures_by_url = { c['url']: c for c in captures if c['http_method'] != 'HEAD'} assert seed_url in captures_by_url assert 'http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url assert 'http://localhost:%s/site7/whee.txt' % httpd.server_port in captures_by_url assert 'http://localhost:%s/site7/boosh.txt' % httpd.server_port in captures_by_url assert 'screenshot:%s' % seed_url in captures_by_url assert 'thumbnail:%s' % seed_url in captures_by_url assert 'screenshot:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url assert 'thumbnail:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url def test_redirect_hashtags(httpd): test_id = 'test_hashtags-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') seed_url = 'http://localhost:%s/site9/' % httpd.server_port site = brozzler.Site(rr, { 'seed': seed_url, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) frontier = brozzler.RethinkDbFrontier(rr) brozzler.new_site(frontier, site) assert site.id # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # check that we the page we expected pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url) assert len(pages) == 2 assert pages[0].url == seed_url assert pages[0].hops_from_seed == 0 assert pages[0].brozzle_count == 1 assert pages[0].outlinks['accepted'] == ['http://localhost:%s/site9/redirect.html' % httpd.server_port] assert not pages[0].hashtags assert pages[1].url == 'http://localhost:%s/site9/redirect.html' % httpd.server_port assert pages[1].hops_from_seed == 1 assert pages[1].brozzle_count == 1 assert sorted(pages[1].hashtags) == ['#hash1','#hash2',] time.sleep(2) # in case warcprox hasn't finished processing urls # take a look at the captures table captures = rr.table('captures').filter({'test_id':test_id}).run() redirect_captures = [c for c in captures if c['url'] == 'http://localhost:%s/site9/redirect.html' % httpd.server_port and c['http_method'] == 'GET'] assert len(redirect_captures) == 2 # youtube-dl + browser, no hashtags # === expected captures === # 1. GET http://localhost:41243/favicon.ico # 2. GET http://localhost:41243/robots.txt # 3. GET http://localhost:41243/site9/ # 4. GET http://localhost:41243/site9/ # 5. GET http://localhost:41243/site9/destination.html # 6. GET http://localhost:41243/site9/destination.html # 7. GET http://localhost:41243/site9/redirect.html # 8. GET http://localhost:41243/site9/redirect.html # 9. HEAD http://localhost:41243/site9/ # 10. HEAD http://localhost:41243/site9/redirect.html # 11. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/ # 12. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/redirect.html # 13. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/ # 14. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/redirect.html def test_stop_crawl(httpd): test_id = 'test_stop_crawl_job-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') frontier = brozzler.RethinkDbFrontier(rr) # create a new job with three sites that could be crawled forever job_conf = {'seeds': [ {'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port}, {'url': 'http://localhost:%s/infinite/bar/' % httpd.server_port}, {'url': 'http://localhost:%s/infinite/baz/' % httpd.server_port}]} job = brozzler.new_job(frontier, job_conf) assert job.id sites = list(frontier.job_sites(job.id)) assert not sites[0].stop_requested assert not sites[1].stop_requested # request crawl stop for one site using the command line entrypoint brozzler.cli.brozzler_stop_crawl([ 'brozzler-stop-crawl', '--site=%s' % sites[0].id]) sites[0].refresh() assert sites[0].stop_requested # stop request should be honored quickly start = time.time() while not sites[0].status.startswith( 'FINISHED') and time.time() - start < 120: time.sleep(0.5) sites[0].refresh() assert sites[0].status == 'FINISHED_STOP_REQUESTED' # but the other sites and the job as a whole should still be crawling sites[1].refresh() assert sites[1].status == 'ACTIVE' sites[2].refresh() assert sites[2].status == 'ACTIVE' job.refresh() assert job.status == 'ACTIVE' # request crawl stop for the job using the command line entrypoint brozzler.cli.brozzler_stop_crawl([ 'brozzler-stop-crawl', '--job=%s' % job.id]) job.refresh() assert job.stop_requested # stop request should be honored quickly start = time.time() while not job.status.startswith( 'FINISHED') and time.time() - start < 120: time.sleep(0.5) job.refresh() assert job.status == 'FINISHED' # the other sites should also be FINISHED_STOP_REQUESTED sites[0].refresh() assert sites[0].status == 'FINISHED_STOP_REQUESTED' sites[1].refresh() assert sites[1].status == 'FINISHED_STOP_REQUESTED' sites[2].refresh() assert sites[2].status == 'FINISHED_STOP_REQUESTED' def test_warcprox_outage_resiliency(httpd): ''' Tests resiliency to warcprox outage. If no instances of warcprox are healthy when starting to crawl a site, brozzler-worker should sit there and wait until a healthy instance appears. If an instance goes down, sites assigned to that instance should bounce over to a healthy instance. If all instances of warcprox go down, brozzler-worker should sit and wait. ''' rr = doublethink.Rethinker('localhost', db='brozzler') frontier = brozzler.RethinkDbFrontier(rr) svcreg = doublethink.ServiceRegistry(rr) # run two instances of warcprox opts = warcprox.Options() opts.address = '0.0.0.0' opts.port = 0 opts.rethinkdb_services_url = 'rethinkdb://localhost/brozzler/services' warcprox1 = warcprox.controller.WarcproxController(opts) warcprox2 = warcprox.controller.WarcproxController(opts) warcprox1_thread = threading.Thread( target=warcprox1.run_until_shutdown, name='warcprox1') warcprox2_thread = threading.Thread( target=warcprox2.run_until_shutdown, name='warcprox2') # put together a site to crawl test_id = 'test_warcprox_death-%s' % datetime.datetime.utcnow().isoformat() site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/infinite/' % httpd.server_port, 'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}}) try: # we manage warcprox instances ourselves, so stop the one running on # the system, if any try: stop_service('warcprox') except Exception as e: logging.warn('problem stopping warcprox service: %s', e) # queue the site for brozzling brozzler.new_site(frontier, site) # check that nothing happens # XXX tail brozzler-worker.log or something? time.sleep(30) site.refresh() assert site.status == 'ACTIVE' assert not site.proxy assert len(list(frontier.site_pages(site.id))) == 1 # start one instance of warcprox warcprox1_thread.start() # check that it started using that instance start = time.time() while not site.proxy and time.time() - start < 30: time.sleep(0.5) site.refresh() assert site.proxy.endswith(':%s' % warcprox1.proxy.server_port) # check that the site accumulates pages in the frontier, confirming # that crawling is really happening start = time.time() while (len(list(frontier.site_pages(site.id))) <= 1 and time.time() - start < 60): time.sleep(0.5) site.refresh() assert len(list(frontier.site_pages(site.id))) > 1 # stop warcprox #1, start warcprox #2 warcprox2_thread.start() warcprox1.stop.set() warcprox1_thread.join() # check that it switched over to warcprox #2 start = time.time() while ((not site.proxy or not site.proxy.endswith(':%s' % warcprox2.proxy.server_port)) and time.time() - start < 30): time.sleep(0.5) site.refresh() assert site.proxy.endswith(':%s' % warcprox2.proxy.server_port) # stop warcprox #2 warcprox2.stop.set() warcprox2_thread.join() page_count = len(list(frontier.site_pages(site.id))) assert page_count > 1 # check that it is waiting for a warcprox to appear time.sleep(30) site.refresh() assert site.status == 'ACTIVE' assert not site.proxy assert len(list(frontier.site_pages(site.id))) == page_count # stop crawling the site, else it can pollute subsequent test runs brozzler.cli.brozzler_stop_crawl([ 'brozzler-stop-crawl', '--site=%s' % site.id]) site.refresh() assert site.stop_requested # stop request should be honored quickly start = time.time() while not site.status.startswith( 'FINISHED') and time.time() - start < 120: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED_STOP_REQUESTED' finally: warcprox1.stop.set() warcprox2.stop.set() warcprox1_thread.join() warcprox2_thread.join() start_service('warcprox') def test_time_limit(httpd): test_id = 'test_time_limit-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') frontier = brozzler.RethinkDbFrontier(rr) # create a new job with one seed that could be crawled forever job_conf = {'seeds': [{ 'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port, 'time_limit': 20}]} job = brozzler.new_job(frontier, job_conf) assert job.id sites = list(frontier.job_sites(job.id)) assert len(sites) == 1 site = sites[0] # time limit should be enforced pretty soon start = time.time() while not sites[0].status.startswith( 'FINISHED') and time.time() - start < 120: time.sleep(0.5) sites[0].refresh() assert sites[0].status == 'FINISHED_TIME_LIMIT' # all sites finished so job should be finished too start = time.time() job.refresh() while not job.status == 'FINISHED' and time.time() - start < 10: time.sleep(0.5) job.refresh() assert job.status == 'FINISHED' def test_ydl_stitching(httpd): test_id = 'test_ydl_stitching-%s' % datetime.datetime.utcnow().isoformat() rr = doublethink.Rethinker('localhost', db='brozzler') frontier = brozzler.RethinkDbFrontier(rr) site = brozzler.Site(rr, { 'seed': 'http://localhost:%s/site10/' % httpd.server_port, 'warcprox_meta': { 'warc-prefix': 'test_ydl_stitching', 'captures-table-extra-fields': {'test_id':test_id}}}) brozzler.new_site(frontier, site) # the site should be brozzled fairly quickly start = time.time() while site.status != 'FINISHED' and time.time() - start < 300: time.sleep(0.5) site.refresh() assert site.status == 'FINISHED' # check page.videos pages = list(frontier.site_pages(site.id)) assert len(pages) == 1 page = pages[0] assert len(page.videos) == 6 stitched_url = 'youtube-dl:00001:http://localhost:%s/site10/' % httpd.server_port assert { 'blame': 'youtube-dl', 'content-length': 267900, 'content-type': 'video/mp4', 'response_code': 204, 'url': stitched_url, } in page.videos time.sleep(2) # in case warcprox hasn't finished processing urls # take a look at the captures table captures = list(rr.table('captures').filter({'test_id':test_id}).run()) l = [c for c in captures if c['url'] == stitched_url] assert len(l) == 1 c = l[0] assert c['filename'].startswith('test_ydl_stitching') assert c['content_type'] == 'video/mp4' assert c['http_method'] == 'WARCPROX_WRITE_RECORD'