mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-23 16:19:49 -05:00
840 lines
33 KiB
Python
840 lines
33 KiB
Python
#!/usr/bin/env python
|
|
'''
|
|
test_cluster.py - integration tests for a brozzler cluster, expects brozzler,
|
|
warcprox, pywb, rethinkdb and other dependencies to be running already
|
|
|
|
Copyright (C) 2016-2018 Internet Archive
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
'''
|
|
|
|
import pytest
|
|
import http.server
|
|
import threading
|
|
import urllib.request
|
|
import os
|
|
import socket
|
|
import doublethink
|
|
import time
|
|
import brozzler
|
|
import datetime
|
|
import requests
|
|
import subprocess
|
|
import http.server
|
|
import logging
|
|
import warcprox
|
|
|
|
def start_service(service):
|
|
subprocess.check_call(['sudo', 'svc', '-u', '/etc/service/' + service])
|
|
|
|
def stop_service(service):
|
|
subprocess.check_call(['sudo', 'svc', '-d', '/etc/service/' + service])
|
|
|
|
@pytest.fixture(scope='module')
|
|
def httpd(request):
|
|
class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == '/site5/redirect/':
|
|
self.send_response(303, 'See other')
|
|
self.send_header('Connection', 'close')
|
|
self.send_header('Content-Length', 0)
|
|
self.send_header('Location', '/site5/destination/')
|
|
self.end_headers()
|
|
self.wfile.write(b'')
|
|
elif self.path == '/site9/redirect.html':
|
|
self.send_response(303, 'See other')
|
|
self.send_header('Connection', 'close')
|
|
self.send_header('Content-Length', 0)
|
|
self.send_header('Location', '/site9/destination.html')
|
|
self.end_headers()
|
|
self.wfile.write(b'')
|
|
elif self.path.startswith('/infinite/'):
|
|
payload = b'''
|
|
<html>
|
|
<head>
|
|
<title>infinite site</title>
|
|
</head>
|
|
<body>
|
|
<a href='a/'>a/</a> <a href='b/'>b/</a> <a href='c/'>c/</a>
|
|
<a href='d/'>d/</a> <a href='e/'>e/</a> <a href='f/'>f/</a>
|
|
<a href='g/'>g/</a> <a href='h/'>h/</a> <a href='i/'>i/</a>
|
|
</body>
|
|
</html>
|
|
'''
|
|
self.send_response(200, 'OK')
|
|
self.send_header('Connection', 'close')
|
|
self.send_header('Content-Length', len(payload))
|
|
self.end_headers()
|
|
self.wfile.write(payload)
|
|
else:
|
|
super().do_GET()
|
|
|
|
# SimpleHTTPRequestHandler always uses CWD so we have to chdir
|
|
os.chdir(os.path.join(os.path.dirname(__file__), 'htdocs'))
|
|
|
|
httpd = http.server.HTTPServer(('localhost', 0), RequestHandler)
|
|
httpd_thread = threading.Thread(name='httpd', target=httpd.serve_forever)
|
|
httpd_thread.start()
|
|
|
|
def fin():
|
|
httpd.shutdown()
|
|
httpd.server_close()
|
|
httpd_thread.join()
|
|
request.addfinalizer(fin)
|
|
|
|
return httpd
|
|
|
|
def test_httpd(httpd):
|
|
'''
|
|
Tests that our http server is working as expected, and that two fetches
|
|
of the same url return the same payload, proving it can be used to test
|
|
deduplication.
|
|
'''
|
|
payload1 = content2 = None
|
|
url = 'http://localhost:%s/site1/file1.txt' % httpd.server_port
|
|
with urllib.request.urlopen(url) as response:
|
|
assert response.status == 200
|
|
payload1 = response.read()
|
|
assert payload1
|
|
|
|
with urllib.request.urlopen(url) as response:
|
|
assert response.status == 200
|
|
payload2 = response.read()
|
|
assert payload2
|
|
|
|
assert payload1 == payload2
|
|
|
|
def test_services_up():
|
|
'''Check that the expected services are up and running.'''
|
|
# check that rethinkdb is listening and looks sane
|
|
rr = doublethink.Rethinker(db='rethinkdb') # built-in db
|
|
tbls = rr.table_list().run()
|
|
assert len(tbls) > 10
|
|
|
|
# check that warcprox is listening
|
|
with socket.socket() as s:
|
|
# if the connect fails an exception is raised and the test fails
|
|
s.connect(('localhost', 8000))
|
|
|
|
# check that pywb is listening
|
|
with socket.socket() as s:
|
|
# if the connect fails an exception is raised and the test fails
|
|
s.connect(('localhost', 8880))
|
|
|
|
# check that brozzler dashboard is listening
|
|
with socket.socket() as s:
|
|
# if the connect fails an exception is raised and the test fails
|
|
s.connect(('localhost', 8881))
|
|
|
|
def test_brozzle_site(httpd):
|
|
test_id = 'test_brozzle_site-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/site1/' % httpd.server_port,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
|
|
# the two pages we expect to be crawled
|
|
page1 = 'http://localhost:%s/site1/' % httpd.server_port
|
|
page2 = 'http://localhost:%s/site1/file1.txt' % httpd.server_port
|
|
robots = 'http://localhost:%s/robots.txt' % httpd.server_port
|
|
|
|
# so we can examine rethinkdb before it does anything
|
|
try:
|
|
stop_service('brozzler-worker')
|
|
|
|
assert site.id is None
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
assert site.id is not None
|
|
assert len(list(frontier.site_pages(site.id))) == 1
|
|
finally:
|
|
start_service('brozzler-worker')
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# check that we got the two pages we expected
|
|
pages = list(frontier.site_pages(site.id))
|
|
assert len(pages) == 2
|
|
assert {page.url for page in pages} == {
|
|
'http://localhost:%s/site1/' % httpd.server_port,
|
|
'http://localhost:%s/site1/file1.txt' % httpd.server_port}
|
|
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
# take a look at the captures table
|
|
captures = rr.table('captures').filter({'test_id':test_id}).run()
|
|
captures_by_url = {
|
|
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
|
assert robots in captures_by_url
|
|
assert page1 in captures_by_url
|
|
assert page2 in captures_by_url
|
|
assert 'screenshot:%s' % page1 in captures_by_url
|
|
assert 'thumbnail:%s' % page1 in captures_by_url
|
|
# no screenshots of plaintext
|
|
|
|
# check pywb
|
|
t14 = captures_by_url[page2]['timestamp'].strftime('%Y%m%d%H%M%S')
|
|
wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, page2)
|
|
expected_payload = open(os.path.join(
|
|
os.path.dirname(__file__), 'htdocs', 'site1', 'file1.txt'), 'rb').read()
|
|
assert requests.get(wb_url).content == expected_payload
|
|
|
|
url = 'screenshot:%s' % page1
|
|
t14 = captures_by_url[url]['timestamp'].strftime('%Y%m%d%H%M%S')
|
|
wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, url)
|
|
response = requests.get(wb_url)
|
|
assert response.status_code == 200
|
|
assert response.headers['content-type'] == 'image/jpeg'
|
|
|
|
url = 'thumbnail:%s' % page1
|
|
t14 = captures_by_url[url]['timestamp'].strftime('%Y%m%d%H%M%S')
|
|
wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, url)
|
|
response = requests.get(wb_url)
|
|
assert response.status_code == 200
|
|
assert response.headers['content-type'] == 'image/jpeg'
|
|
|
|
def test_proxy_warcprox(httpd):
|
|
'''Test --proxy with proxy that happens to be warcprox'''
|
|
try:
|
|
stop_service('brozzler-worker')
|
|
_test_proxy_setting(
|
|
httpd, proxy='localhost:8000', warcprox_auto=False,
|
|
is_warcprox=True)
|
|
finally:
|
|
start_service('brozzler-worker')
|
|
|
|
def test_proxy_non_warcprox(httpd):
|
|
'''Test --proxy with proxy that happens not to be warcprox'''
|
|
class DumbProxyRequestHandler(http.server.SimpleHTTPRequestHandler):
|
|
def do_HEAD(self):
|
|
if not hasattr(self.server, 'requests'):
|
|
self.server.requests = []
|
|
logging.info('%s %s', self.command, self.path)
|
|
self.server.requests.append('%s %s' % (self.command, self.path))
|
|
response = urllib.request.urlopen(self.path)
|
|
self.wfile.write(('HTTP/1.0 %s %s\r\n' % (
|
|
response.code, response.reason)).encode('ascii'))
|
|
for header in response.getheaders():
|
|
self.wfile.write(('%s: %s\r\n' % (
|
|
header[0], header[1])).encode('ascii'))
|
|
self.wfile.write(b'\r\n')
|
|
return response
|
|
def do_GET(self):
|
|
response = self.do_HEAD()
|
|
self.copyfile(response, self.wfile)
|
|
def do_WARCPROX_WRITE_RECORD(self):
|
|
if not hasattr(self.server, 'requests'):
|
|
self.server.requests = []
|
|
logging.info('%s %s', self.command, self.path)
|
|
self.send_error(400)
|
|
|
|
proxy = http.server.HTTPServer(('localhost', 0), DumbProxyRequestHandler)
|
|
th = threading.Thread(name='dumb-proxy', target=proxy.serve_forever)
|
|
th.start()
|
|
|
|
try:
|
|
stop_service('brozzler-worker')
|
|
_test_proxy_setting(
|
|
httpd, proxy='localhost:%s' % proxy.server_port,
|
|
warcprox_auto=False, is_warcprox=False)
|
|
finally:
|
|
start_service('brozzler-worker')
|
|
assert len(proxy.requests) <= 15
|
|
assert proxy.requests.count('GET /status') == 1
|
|
assert ('GET http://localhost:%s/site1/' % httpd.server_port) in proxy.requests
|
|
assert ('GET http://localhost:%s/site1/file1.txt' % httpd.server_port) in proxy.requests
|
|
assert [req for req in proxy.requests if req.startswith('WARCPROX_WRITE_RECORD')] == []
|
|
|
|
proxy.shutdown()
|
|
th.join()
|
|
|
|
def test_no_proxy(httpd):
|
|
try:
|
|
stop_service('brozzler-worker')
|
|
_test_proxy_setting(
|
|
httpd, proxy=None, warcprox_auto=False, is_warcprox=False)
|
|
finally:
|
|
start_service('brozzler-worker')
|
|
# XXX how to check that no proxy was used?
|
|
|
|
def test_warcprox_auto(httpd):
|
|
'''Test --warcprox-auto'''
|
|
try:
|
|
stop_service('brozzler-worker')
|
|
_test_proxy_setting(
|
|
httpd, proxy=None, warcprox_auto=True, is_warcprox=True)
|
|
finally:
|
|
start_service('brozzler-worker')
|
|
|
|
def test_proxy_conflict():
|
|
with pytest.raises(AssertionError) as excinfo:
|
|
worker = brozzler.worker.BrozzlerWorker(
|
|
None, None, warcprox_auto=True, proxy='localhost:12345')
|
|
|
|
def _test_proxy_setting(
|
|
httpd, proxy=None, warcprox_auto=False, is_warcprox=False):
|
|
test_id = 'test_proxy=%s_warcprox_auto=%s_is_warcprox=%s-%s' % (
|
|
proxy, warcprox_auto, is_warcprox,
|
|
datetime.datetime.utcnow().isoformat())
|
|
|
|
# the two pages we expect to be crawled
|
|
page1 = 'http://localhost:%s/site1/' % httpd.server_port
|
|
page2 = 'http://localhost:%s/site1/file1.txt' % httpd.server_port
|
|
robots = 'http://localhost:%s/robots.txt' % httpd.server_port
|
|
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
service_registry = doublethink.ServiceRegistry(rr)
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/site1/' % httpd.server_port,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
assert site.id is None
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
assert site.id is not None
|
|
assert len(list(frontier.site_pages(site.id))) == 1
|
|
|
|
worker = brozzler.worker.BrozzlerWorker(
|
|
frontier, service_registry, max_browsers=1,
|
|
chrome_exe=brozzler.suggest_default_chrome_exe(),
|
|
warcprox_auto=warcprox_auto, proxy=proxy)
|
|
browser = worker._browser_pool.acquire()
|
|
worker.brozzle_site(browser, site)
|
|
worker._browser_pool.release(browser)
|
|
|
|
# check proxy is set
|
|
assert site.status == 'FINISHED'
|
|
if warcprox_auto:
|
|
assert site.proxy[-5:] == ':8000'
|
|
else:
|
|
assert not site.proxy
|
|
site.refresh() # check that these things were persisted
|
|
assert site.status == 'FINISHED'
|
|
if warcprox_auto:
|
|
assert site.proxy[-5:] == ':8000'
|
|
else:
|
|
assert not site.proxy
|
|
|
|
# check that we got the two pages we expected
|
|
pages = list(frontier.site_pages(site.id))
|
|
assert len(pages) == 2
|
|
assert {page.url for page in pages} == {
|
|
'http://localhost:%s/site1/' % httpd.server_port,
|
|
'http://localhost:%s/site1/file1.txt' % httpd.server_port}
|
|
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
# take a look at the captures table
|
|
captures = rr.table('captures').filter({'test_id':test_id}).run()
|
|
captures_by_url = {
|
|
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
|
if is_warcprox:
|
|
assert robots in captures_by_url
|
|
assert page1 in captures_by_url
|
|
assert page2 in captures_by_url
|
|
assert 'screenshot:%s' % page1 in captures_by_url
|
|
assert 'thumbnail:%s' % page1 in captures_by_url
|
|
|
|
# check pywb
|
|
t14 = captures_by_url[page2]['timestamp'].strftime('%Y%m%d%H%M%S')
|
|
wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, page2)
|
|
expected_payload = open(os.path.join(
|
|
os.path.dirname(__file__), 'htdocs', 'site1', 'file1.txt'), 'rb').read()
|
|
assert requests.get(wb_url).content == expected_payload
|
|
else:
|
|
assert captures_by_url == {}
|
|
|
|
def test_obey_robots(httpd):
|
|
test_id = 'test_obey_robots-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/site1/' % httpd.server_port,
|
|
'user_agent': 'im a badbot', # robots.txt blocks badbot
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
|
|
# so we can examine rethinkdb before it does anything
|
|
try:
|
|
stop_service('brozzler-worker')
|
|
|
|
assert site.id is None
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
assert site.id is not None
|
|
site_pages = list(frontier.site_pages(site.id))
|
|
assert len(site_pages) == 1
|
|
assert site_pages[0].url == site.seed
|
|
assert site_pages[0].needs_robots_check
|
|
finally:
|
|
start_service('brozzler-worker')
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# check that only the one page is in rethinkdb
|
|
pages = list(frontier.site_pages(site.id))
|
|
assert len(pages) == 1
|
|
page = pages[0]
|
|
assert page.url == 'http://localhost:%s/site1/' % httpd.server_port
|
|
assert page.blocked_by_robots
|
|
|
|
# take a look at the captures table
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port
|
|
captures = list(rr.table('captures').filter({'test_id':test_id}).run())
|
|
assert len(captures) == 1
|
|
assert captures[0]['url'] == robots_url
|
|
|
|
# check pywb
|
|
t14 = captures[0]['timestamp'].strftime('%Y%m%d%H%M%S')
|
|
wb_url = 'http://localhost:8880/brozzler/%s/%s' % (t14, robots_url)
|
|
expected_payload = open(os.path.join(
|
|
os.path.dirname(__file__), 'htdocs', 'robots.txt'), 'rb').read()
|
|
assert requests.get(
|
|
wb_url, allow_redirects=False).content == expected_payload
|
|
|
|
def test_login(httpd):
|
|
test_id = 'test_login-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/site2/' % httpd.server_port,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}},
|
|
'username': 'test_username', 'password': 'test_password'})
|
|
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# take a look at the captures table
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port
|
|
captures = list(rr.table('captures').filter(
|
|
{'test_id':test_id}).order_by('timestamp').run())
|
|
meth_url = ['%s %s' % (c['http_method'], c['url']) for c in captures]
|
|
|
|
# there are several forms in in htdocs/site2/login.html but only one
|
|
# that brozzler's heuristic should match and try to submit, and it has
|
|
# action='00', so we can check for that here
|
|
assert ('POST http://localhost:%s/site2/00' % httpd.server_port) in meth_url
|
|
|
|
# sanity check the rest of the crawl
|
|
assert ('GET http://localhost:%s/robots.txt' % httpd.server_port) in meth_url
|
|
assert ('GET http://localhost:%s/site2/' % httpd.server_port) in meth_url
|
|
assert ('WARCPROX_WRITE_RECORD screenshot:http://localhost:%s/site2/' % httpd.server_port) in meth_url
|
|
assert ('WARCPROX_WRITE_RECORD thumbnail:http://localhost:%s/site2/' % httpd.server_port) in meth_url
|
|
assert ('GET http://localhost:%s/site2/login.html' % httpd.server_port) in meth_url
|
|
assert ('WARCPROX_WRITE_RECORD screenshot:http://localhost:%s/site2/login.html' % httpd.server_port) in meth_url
|
|
assert ('WARCPROX_WRITE_RECORD thumbnail:http://localhost:%s/site2/login.html' % httpd.server_port) in meth_url
|
|
|
|
def test_seed_redirect(httpd):
|
|
test_id = 'test_seed_redirect-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
seed_url = 'http://localhost:%s/site5/redirect/' % httpd.server_port
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/site5/redirect/' % httpd.server_port,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
assert site.scope == {'accepts': [{'ssurt': 'localhost,//%s:http:/site5/redirect/' % httpd.server_port}]}
|
|
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
assert site.id
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# take a look at the pages table
|
|
pages = list(frontier.site_pages(site.id))
|
|
assert len(pages) == 2
|
|
pages.sort(key=lambda page: page.hops_from_seed)
|
|
assert pages[0].hops_from_seed == 0
|
|
assert pages[0].url == seed_url
|
|
assert pages[0].redirect_url == 'http://localhost:%s/site5/destination/' % httpd.server_port
|
|
assert pages[1].hops_from_seed == 1
|
|
assert pages[1].url == 'http://localhost:%s/site5/destination/page2.html' % httpd.server_port
|
|
|
|
# check that scope has been updated properly
|
|
assert site.scope == {'accepts': [
|
|
{'ssurt': 'localhost,//%s:http:/site5/redirect/' % httpd.server_port},
|
|
{'ssurt': 'localhost,//%s:http:/site5/destination/' % httpd.server_port}]}
|
|
|
|
def test_hashtags(httpd):
|
|
test_id = 'test_hashtags-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
seed_url = 'http://localhost:%s/site7/' % httpd.server_port
|
|
site = brozzler.Site(rr, {
|
|
'seed': seed_url,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
assert site.id
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# check that we the page we expected
|
|
pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url)
|
|
assert len(pages) == 2
|
|
assert pages[0].url == seed_url
|
|
assert pages[0].hops_from_seed == 0
|
|
assert pages[0].brozzle_count == 1
|
|
assert pages[0].outlinks['accepted'] == ['http://localhost:%s/site7/foo.html' % httpd.server_port]
|
|
assert not pages[0].hashtags
|
|
assert pages[1].url == 'http://localhost:%s/site7/foo.html' % httpd.server_port
|
|
assert pages[1].hops_from_seed == 1
|
|
assert pages[1].brozzle_count == 1
|
|
assert sorted(pages[1].hashtags) == ['#boosh','#ignored','#whee',]
|
|
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
# take a look at the captures table
|
|
captures = rr.table('captures').filter({'test_id':test_id}).run()
|
|
captures_by_url = {
|
|
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
|
assert seed_url in captures_by_url
|
|
assert 'http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
|
assert 'http://localhost:%s/site7/whee.txt' % httpd.server_port in captures_by_url
|
|
assert 'http://localhost:%s/site7/boosh.txt' % httpd.server_port in captures_by_url
|
|
assert 'screenshot:%s' % seed_url in captures_by_url
|
|
assert 'thumbnail:%s' % seed_url in captures_by_url
|
|
assert 'screenshot:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
|
assert 'thumbnail:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
|
|
|
def test_redirect_hashtags(httpd):
|
|
test_id = 'test_hashtags-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
seed_url = 'http://localhost:%s/site9/' % httpd.server_port
|
|
site = brozzler.Site(rr, {
|
|
'seed': seed_url,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
brozzler.new_site(frontier, site)
|
|
assert site.id
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# check that we the page we expected
|
|
pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url)
|
|
assert len(pages) == 2
|
|
assert pages[0].url == seed_url
|
|
assert pages[0].hops_from_seed == 0
|
|
assert pages[0].brozzle_count == 1
|
|
assert pages[0].outlinks['accepted'] == ['http://localhost:%s/site9/redirect.html' % httpd.server_port]
|
|
assert not pages[0].hashtags
|
|
assert pages[1].url == 'http://localhost:%s/site9/redirect.html' % httpd.server_port
|
|
assert pages[1].hops_from_seed == 1
|
|
assert pages[1].brozzle_count == 1
|
|
assert sorted(pages[1].hashtags) == ['#hash1','#hash2',]
|
|
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
# take a look at the captures table
|
|
captures = rr.table('captures').filter({'test_id':test_id}).run()
|
|
redirect_captures = [c for c in captures if c['url'] == 'http://localhost:%s/site9/redirect.html' % httpd.server_port and c['http_method'] == 'GET']
|
|
assert len(redirect_captures) == 2 # youtube-dl + browser, no hashtags
|
|
|
|
# === expected captures ===
|
|
# 1. GET http://localhost:41243/favicon.ico
|
|
# 2. GET http://localhost:41243/robots.txt
|
|
# 3. GET http://localhost:41243/site9/
|
|
# 4. GET http://localhost:41243/site9/
|
|
# 5. GET http://localhost:41243/site9/destination.html
|
|
# 6. GET http://localhost:41243/site9/destination.html
|
|
# 7. GET http://localhost:41243/site9/redirect.html
|
|
# 8. GET http://localhost:41243/site9/redirect.html
|
|
# 9. HEAD http://localhost:41243/site9/
|
|
# 10. HEAD http://localhost:41243/site9/redirect.html
|
|
# 11. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/
|
|
# 12. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/redirect.html
|
|
# 13. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/
|
|
# 14. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/redirect.html
|
|
|
|
def test_stop_crawl(httpd):
|
|
test_id = 'test_stop_crawl_job-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
|
|
# create a new job with three sites that could be crawled forever
|
|
job_conf = {'seeds': [
|
|
{'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port},
|
|
{'url': 'http://localhost:%s/infinite/bar/' % httpd.server_port},
|
|
{'url': 'http://localhost:%s/infinite/baz/' % httpd.server_port}]}
|
|
job = brozzler.new_job(frontier, job_conf)
|
|
assert job.id
|
|
|
|
sites = list(frontier.job_sites(job.id))
|
|
assert not sites[0].stop_requested
|
|
assert not sites[1].stop_requested
|
|
|
|
# request crawl stop for one site using the command line entrypoint
|
|
brozzler.cli.brozzler_stop_crawl([
|
|
'brozzler-stop-crawl', '--site=%s' % sites[0].id])
|
|
sites[0].refresh()
|
|
assert sites[0].stop_requested
|
|
|
|
# stop request should be honored quickly
|
|
start = time.time()
|
|
while not sites[0].status.startswith(
|
|
'FINISHED') and time.time() - start < 120:
|
|
time.sleep(0.5)
|
|
sites[0].refresh()
|
|
assert sites[0].status == 'FINISHED_STOP_REQUESTED'
|
|
|
|
# but the other sites and the job as a whole should still be crawling
|
|
sites[1].refresh()
|
|
assert sites[1].status == 'ACTIVE'
|
|
sites[2].refresh()
|
|
assert sites[2].status == 'ACTIVE'
|
|
job.refresh()
|
|
assert job.status == 'ACTIVE'
|
|
|
|
# request crawl stop for the job using the command line entrypoint
|
|
brozzler.cli.brozzler_stop_crawl([
|
|
'brozzler-stop-crawl', '--job=%s' % job.id])
|
|
job.refresh()
|
|
assert job.stop_requested
|
|
|
|
# stop request should be honored quickly
|
|
start = time.time()
|
|
while not job.status.startswith(
|
|
'FINISHED') and time.time() - start < 120:
|
|
time.sleep(0.5)
|
|
job.refresh()
|
|
assert job.status == 'FINISHED'
|
|
|
|
# the other sites should also be FINISHED_STOP_REQUESTED
|
|
sites[0].refresh()
|
|
assert sites[0].status == 'FINISHED_STOP_REQUESTED'
|
|
sites[1].refresh()
|
|
assert sites[1].status == 'FINISHED_STOP_REQUESTED'
|
|
sites[2].refresh()
|
|
assert sites[2].status == 'FINISHED_STOP_REQUESTED'
|
|
|
|
def test_warcprox_outage_resiliency(httpd):
|
|
'''
|
|
Tests resiliency to warcprox outage.
|
|
|
|
If no instances of warcprox are healthy when starting to crawl a site,
|
|
brozzler-worker should sit there and wait until a healthy instance appears.
|
|
|
|
If an instance goes down, sites assigned to that instance should bounce
|
|
over to a healthy instance.
|
|
|
|
If all instances of warcprox go down, brozzler-worker should sit and wait.
|
|
'''
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
svcreg = doublethink.ServiceRegistry(rr)
|
|
|
|
# run two instances of warcprox
|
|
opts = warcprox.Options()
|
|
opts.address = '0.0.0.0'
|
|
opts.port = 0
|
|
opts.rethinkdb_services_url = 'rethinkdb://localhost/brozzler/services'
|
|
|
|
warcprox1 = warcprox.controller.WarcproxController(opts)
|
|
warcprox2 = warcprox.controller.WarcproxController(opts)
|
|
warcprox1_thread = threading.Thread(
|
|
target=warcprox1.run_until_shutdown, name='warcprox1')
|
|
warcprox2_thread = threading.Thread(
|
|
target=warcprox2.run_until_shutdown, name='warcprox2')
|
|
|
|
# put together a site to crawl
|
|
test_id = 'test_warcprox_death-%s' % datetime.datetime.utcnow().isoformat()
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/infinite/' % httpd.server_port,
|
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
|
|
|
try:
|
|
# we manage warcprox instances ourselves, so stop the one running on
|
|
# the system, if any
|
|
try:
|
|
stop_service('warcprox')
|
|
except Exception as e:
|
|
logging.warning('problem stopping warcprox service: %s', e)
|
|
|
|
# queue the site for brozzling
|
|
brozzler.new_site(frontier, site)
|
|
|
|
# check that nothing happens
|
|
# XXX tail brozzler-worker.log or something?
|
|
time.sleep(30)
|
|
site.refresh()
|
|
assert site.status == 'ACTIVE'
|
|
assert not site.proxy
|
|
assert len(list(frontier.site_pages(site.id))) == 1
|
|
|
|
# start one instance of warcprox
|
|
warcprox1_thread.start()
|
|
|
|
# check that it started using that instance
|
|
start = time.time()
|
|
while not site.proxy and time.time() - start < 30:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.proxy.endswith(':%s' % warcprox1.proxy.server_port)
|
|
|
|
# check that the site accumulates pages in the frontier, confirming
|
|
# that crawling is really happening
|
|
start = time.time()
|
|
while (len(list(frontier.site_pages(site.id))) <= 1
|
|
and time.time() - start < 60):
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert len(list(frontier.site_pages(site.id))) > 1
|
|
|
|
# stop warcprox #1, start warcprox #2
|
|
warcprox2_thread.start()
|
|
warcprox1.stop.set()
|
|
warcprox1_thread.join()
|
|
|
|
# check that it switched over to warcprox #2
|
|
start = time.time()
|
|
while ((not site.proxy
|
|
or not site.proxy.endswith(':%s' % warcprox2.proxy.server_port))
|
|
and time.time() - start < 30):
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.proxy.endswith(':%s' % warcprox2.proxy.server_port)
|
|
|
|
# stop warcprox #2
|
|
warcprox2.stop.set()
|
|
warcprox2_thread.join()
|
|
|
|
page_count = len(list(frontier.site_pages(site.id)))
|
|
assert page_count > 1
|
|
|
|
# check that it is waiting for a warcprox to appear
|
|
time.sleep(30)
|
|
site.refresh()
|
|
assert site.status == 'ACTIVE'
|
|
assert not site.proxy
|
|
assert len(list(frontier.site_pages(site.id))) == page_count
|
|
|
|
# stop crawling the site, else it can pollute subsequent test runs
|
|
brozzler.cli.brozzler_stop_crawl([
|
|
'brozzler-stop-crawl', '--site=%s' % site.id])
|
|
site.refresh()
|
|
assert site.stop_requested
|
|
|
|
# stop request should be honored quickly
|
|
start = time.time()
|
|
while not site.status.startswith(
|
|
'FINISHED') and time.time() - start < 120:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED_STOP_REQUESTED'
|
|
finally:
|
|
warcprox1.stop.set()
|
|
warcprox2.stop.set()
|
|
warcprox1_thread.join()
|
|
warcprox2_thread.join()
|
|
start_service('warcprox')
|
|
|
|
def test_time_limit(httpd):
|
|
test_id = 'test_time_limit-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
|
|
# create a new job with one seed that could be crawled forever
|
|
job_conf = {'seeds': [{
|
|
'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port,
|
|
'time_limit': 20}]}
|
|
job = brozzler.new_job(frontier, job_conf)
|
|
assert job.id
|
|
|
|
sites = list(frontier.job_sites(job.id))
|
|
assert len(sites) == 1
|
|
site = sites[0]
|
|
|
|
# time limit should be enforced pretty soon
|
|
start = time.time()
|
|
while not sites[0].status.startswith(
|
|
'FINISHED') and time.time() - start < 120:
|
|
time.sleep(0.5)
|
|
sites[0].refresh()
|
|
assert sites[0].status == 'FINISHED_TIME_LIMIT'
|
|
|
|
# all sites finished so job should be finished too
|
|
start = time.time()
|
|
job.refresh()
|
|
while not job.status == 'FINISHED' and time.time() - start < 10:
|
|
time.sleep(0.5)
|
|
job.refresh()
|
|
assert job.status == 'FINISHED'
|
|
|
|
def test_ydl_stitching(httpd):
|
|
test_id = 'test_ydl_stitching-%s' % datetime.datetime.utcnow().isoformat()
|
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
|
frontier = brozzler.RethinkDbFrontier(rr)
|
|
site = brozzler.Site(rr, {
|
|
'seed': 'http://localhost:%s/site10/' % httpd.server_port,
|
|
'warcprox_meta': {
|
|
'warc-prefix': 'test_ydl_stitching',
|
|
'captures-table-extra-fields': {'test_id':test_id}}})
|
|
brozzler.new_site(frontier, site)
|
|
|
|
# the site should be brozzled fairly quickly
|
|
start = time.time()
|
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
|
time.sleep(0.5)
|
|
site.refresh()
|
|
assert site.status == 'FINISHED'
|
|
|
|
# check page.videos
|
|
pages = list(frontier.site_pages(site.id))
|
|
assert len(pages) == 1
|
|
page = pages[0]
|
|
assert len(page.videos) == 6
|
|
stitched_url = 'youtube-dl:00001:http://localhost:%s/site10/' % httpd.server_port
|
|
assert {
|
|
'blame': 'youtube-dl',
|
|
'content-length': 267900,
|
|
'content-type': 'video/mp4',
|
|
'response_code': 204,
|
|
'url': stitched_url,
|
|
} in page.videos
|
|
|
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
|
# take a look at the captures table
|
|
captures = list(rr.table('captures').filter({'test_id':test_id}).run())
|
|
l = [c for c in captures if c['url'] == stitched_url]
|
|
assert len(l) == 1
|
|
c = l[0]
|
|
assert c['filename'].startswith('test_ydl_stitching')
|
|
assert c['content_type'] == 'video/mp4'
|
|
assert c['http_method'] == 'WARCPROX_WRITE_RECORD'
|