brozzler/tests/test_cluster.py
2024-02-08 12:07:41 -08:00

984 lines
33 KiB
Python

#!/usr/bin/env python
"""
test_cluster.py - integration tests for a brozzler cluster, expects brozzler,
warcprox, pywb, rethinkdb and other dependencies to be running already
Copyright (C) 2016-2018 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import pytest
import http.server
import threading
import urllib.request
import os
import socket
import doublethink
import time
import brozzler
import datetime
import requests
import subprocess
import http.server
import logging
import sys
import warcprox
# https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
def _local_address():
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("10.255.255.255", 1)) # ip doesn't need to be reachable
return s.getsockname()[0]
except:
return "127.0.0.1"
finally:
s.close()
local_address = _local_address()
def start_service(service):
subprocess.check_call(["sudo", "svc", "-u", "/etc/service/" + service])
def stop_service(service):
subprocess.check_call(["sudo", "svc", "-d", "/etc/service/" + service])
while True:
status = subprocess.check_output(["sudo", "svstat", "/etc/service/" + service])
if b" down " in status:
break
time.sleep(0.5)
@pytest.fixture(scope="module")
def httpd(request):
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
logging.info("\n%s\n%s", self.requestline, self.headers)
self.do_GET()
def do_GET(self):
logging.info("\n%s\n%s", self.requestline, self.headers)
if self.path == "/site5/redirect/":
self.send_response(303, "See other")
self.send_header("Connection", "close")
self.send_header("Content-Length", 0)
self.send_header("Location", "/site5/destination/")
self.end_headers()
self.wfile.write(b"")
elif self.path == "/site9/redirect.html":
self.send_response(303, "See other")
self.send_header("Connection", "close")
self.send_header("Content-Length", 0)
self.send_header("Location", "/site9/destination.html")
self.end_headers()
self.wfile.write(b"")
elif self.path.startswith("/infinite/"):
payload = b"""
<html>
<head>
<title>infinite site</title>
</head>
<body>
<a href='a/'>a/</a> <a href='b/'>b/</a> <a href='c/'>c/</a>
<a href='d/'>d/</a> <a href='e/'>e/</a> <a href='f/'>f/</a>
<a href='g/'>g/</a> <a href='h/'>h/</a> <a href='i/'>i/</a>
</body>
</html>
"""
self.send_response(200, "OK")
self.send_header("Connection", "close")
self.send_header("Content-Length", len(payload))
self.end_headers()
self.wfile.write(payload)
else:
super().do_GET()
# SimpleHTTPRequestHandler always uses CWD so we have to chdir
os.chdir(os.path.join(os.path.dirname(__file__), "htdocs"))
httpd = http.server.HTTPServer((local_address, 0), RequestHandler)
httpd_thread = threading.Thread(name="httpd", target=httpd.serve_forever)
httpd_thread.start()
def fin():
httpd.shutdown()
httpd.server_close()
httpd_thread.join()
request.addfinalizer(fin)
return httpd
def make_url(httpd, rel_url):
return "http://%s:%s%s" % (local_address, httpd.server_port, rel_url)
def test_httpd(httpd):
"""
Tests that our http server is working as expected, and that two fetches
of the same url return the same payload, proving it can be used to test
deduplication.
"""
payload1 = content2 = None
url = make_url(httpd, "/site1/file1.txt")
with urllib.request.urlopen(url) as response:
assert response.status == 200
payload1 = response.read()
assert payload1
with urllib.request.urlopen(url) as response:
assert response.status == 200
payload2 = response.read()
assert payload2
assert payload1 == payload2
def test_services_up():
"""Check that the expected services are up and running."""
# check that rethinkdb is listening and looks sane
rr = doublethink.Rethinker(db="rethinkdb") # built-in db
tbls = rr.table_list().run()
assert len(tbls) > 10
# check that warcprox is listening
with socket.socket() as s:
# if the connect fails an exception is raised and the test fails
s.connect(("localhost", 8000))
# check that pywb is listening
with socket.socket() as s:
# if the connect fails an exception is raised and the test fails
s.connect(("localhost", 8880))
# check that brozzler dashboard is listening
with socket.socket() as s:
# if the connect fails an exception is raised and the test fails
s.connect(("localhost", 8881))
def test_brozzle_site(httpd):
test_id = "test_brozzle_site-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/site1/"),
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
# the two pages we expect to be crawled
page1 = make_url(httpd, "/site1/")
page2 = make_url(httpd, "/site1/file1.txt")
robots = make_url(httpd, "/robots.txt")
# so we can examine rethinkdb before it does anything
try:
stop_service("brozzler-worker")
assert site.id is None
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
assert site.id is not None
assert len(list(frontier.site_pages(site.id))) == 1
finally:
start_service("brozzler-worker")
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# check that we got the two pages we expected
pages = list(frontier.site_pages(site.id))
assert len(pages) == 2
assert {page.url for page in pages} == {
make_url(httpd, "/site1/"),
make_url(httpd, "/site1/file1.txt"),
}
time.sleep(2) # in case warcprox hasn't finished processing urls
# take a look at the captures table
captures = rr.table("captures").filter({"test_id": test_id}).run()
captures_by_url = {c["url"]: c for c in captures if c["http_method"] != "HEAD"}
assert robots in captures_by_url
assert page1 in captures_by_url
assert page2 in captures_by_url
assert "screenshot:%s" % page1 in captures_by_url
assert "thumbnail:%s" % page1 in captures_by_url
# no screenshots of plaintext
# check pywb
t14 = captures_by_url[page2]["timestamp"].strftime("%Y%m%d%H%M%S")
wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, page2)
expected_payload = open(
os.path.join(os.path.dirname(__file__), "htdocs", "site1", "file1.txt"), "rb"
).read()
assert requests.get(wb_url).content == expected_payload
url = "screenshot:%s" % page1
t14 = captures_by_url[url]["timestamp"].strftime("%Y%m%d%H%M%S")
wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, url)
response = requests.get(wb_url)
assert response.status_code == 200
assert response.headers["content-type"] == "image/jpeg"
url = "thumbnail:%s" % page1
t14 = captures_by_url[url]["timestamp"].strftime("%Y%m%d%H%M%S")
wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, url)
response = requests.get(wb_url)
assert response.status_code == 200
assert response.headers["content-type"] == "image/jpeg"
def test_proxy_warcprox(httpd):
"""Test --proxy with proxy that happens to be warcprox"""
try:
stop_service("brozzler-worker")
_test_proxy_setting(
httpd, proxy="localhost:8000", warcprox_auto=False, is_warcprox=True
)
finally:
start_service("brozzler-worker")
def test_proxy_non_warcprox(httpd):
"""Test --proxy with proxy that happens not to be warcprox"""
class DumbProxyRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_HEAD(self):
if not hasattr(self.server, "requests"):
self.server.requests = []
logging.info("%s %s", self.command, self.path)
self.server.requests.append("%s %s" % (self.command, self.path))
response = urllib.request.urlopen(self.path)
self.wfile.write(
("HTTP/1.0 %s %s\r\n" % (response.code, response.reason)).encode(
"ascii"
)
)
for header in response.getheaders():
self.wfile.write(
("%s: %s\r\n" % (header[0], header[1])).encode("ascii")
)
self.wfile.write(b"\r\n")
return response
def do_GET(self):
response = self.do_HEAD()
self.copyfile(response, self.wfile)
def do_WARCPROX_WRITE_RECORD(self):
if not hasattr(self.server, "requests"):
self.server.requests = []
logging.info("%s %s", self.command, self.path)
self.send_error(400)
proxy = http.server.HTTPServer(("localhost", 0), DumbProxyRequestHandler)
th = threading.Thread(name="dumb-proxy", target=proxy.serve_forever)
th.start()
try:
stop_service("brozzler-worker")
_test_proxy_setting(
httpd,
proxy="localhost:%s" % proxy.server_port,
warcprox_auto=False,
is_warcprox=False,
)
finally:
start_service("brozzler-worker")
assert len(proxy.requests) <= 15
assert proxy.requests.count("GET /status") == 1
assert ("GET %s" % make_url(httpd, "/site1/")) in proxy.requests
assert ("GET %s" % make_url(httpd, "/site1/file1.txt")) in proxy.requests
assert [
req for req in proxy.requests if req.startswith("WARCPROX_WRITE_RECORD")
] == []
proxy.shutdown()
th.join()
def test_no_proxy(httpd):
try:
stop_service("brozzler-worker")
_test_proxy_setting(httpd, proxy=None, warcprox_auto=False, is_warcprox=False)
finally:
start_service("brozzler-worker")
# XXX how to check that no proxy was used?
def test_warcprox_auto(httpd):
"""Test --warcprox-auto"""
try:
stop_service("brozzler-worker")
_test_proxy_setting(httpd, proxy=None, warcprox_auto=True, is_warcprox=True)
finally:
start_service("brozzler-worker")
def test_proxy_conflict():
with pytest.raises(AssertionError) as excinfo:
worker = brozzler.worker.BrozzlerWorker(
None, None, warcprox_auto=True, proxy="localhost:12345"
)
def _test_proxy_setting(httpd, proxy=None, warcprox_auto=False, is_warcprox=False):
test_id = "test_proxy=%s_warcprox_auto=%s_is_warcprox=%s-%s" % (
proxy,
warcprox_auto,
is_warcprox,
datetime.datetime.utcnow().isoformat(),
)
# the two pages we expect to be crawled
page1 = make_url(httpd, "/site1/")
page2 = make_url(httpd, "/site1/file1.txt")
robots = make_url(httpd, "/robots.txt")
rr = doublethink.Rethinker("localhost", db="brozzler")
service_registry = doublethink.ServiceRegistry(rr)
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/site1/"),
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
assert site.id is None
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
assert site.id is not None
assert len(list(frontier.site_pages(site.id))) == 1
worker = brozzler.worker.BrozzlerWorker(
frontier,
service_registry,
max_browsers=1,
chrome_exe=brozzler.suggest_default_chrome_exe(),
warcprox_auto=warcprox_auto,
proxy=proxy,
)
browser = worker._browser_pool.acquire()
worker.brozzle_site(browser, site)
worker._browser_pool.release(browser)
# check proxy is set
assert site.status == "FINISHED"
if warcprox_auto:
assert site.proxy[-5:] == ":8000"
else:
assert not site.proxy
site.refresh() # check that these things were persisted
assert site.status == "FINISHED"
if warcprox_auto:
assert site.proxy[-5:] == ":8000"
else:
assert not site.proxy
# check that we got the two pages we expected
pages = list(frontier.site_pages(site.id))
assert len(pages) == 2
assert {page.url for page in pages} == {
make_url(httpd, "/site1/"),
make_url(httpd, "/site1/file1.txt"),
}
time.sleep(2) # in case warcprox hasn't finished processing urls
# take a look at the captures table
captures = rr.table("captures").filter({"test_id": test_id}).run()
captures_by_url = {c["url"]: c for c in captures if c["http_method"] != "HEAD"}
if is_warcprox:
assert robots in captures_by_url
assert page1 in captures_by_url
assert page2 in captures_by_url
assert "screenshot:%s" % page1 in captures_by_url
assert "thumbnail:%s" % page1 in captures_by_url
# check pywb
t14 = captures_by_url[page2]["timestamp"].strftime("%Y%m%d%H%M%S")
wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, page2)
expected_payload = open(
os.path.join(os.path.dirname(__file__), "htdocs", "site1", "file1.txt"),
"rb",
).read()
assert requests.get(wb_url).content == expected_payload
else:
assert captures_by_url == {}
def test_obey_robots(httpd):
test_id = "test_obey_robots-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/site1/"),
"user_agent": "im a badbot", # robots.txt blocks badbot
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
# so we can examine rethinkdb before it does anything
try:
stop_service("brozzler-worker")
assert site.id is None
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
assert site.id is not None
site_pages = list(frontier.site_pages(site.id))
assert len(site_pages) == 1
assert site_pages[0].url == site.seed
assert site_pages[0].needs_robots_check
finally:
start_service("brozzler-worker")
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# check that only the one page is in rethinkdb
pages = list(frontier.site_pages(site.id))
assert len(pages) == 1
page = pages[0]
assert page.url == make_url(httpd, "/site1/")
assert page.blocked_by_robots
# take a look at the captures table
time.sleep(2) # in case warcprox hasn't finished processing urls
robots_url = make_url(httpd, "/robots.txt")
captures = list(rr.table("captures").filter({"test_id": test_id}).run())
assert len(captures) == 1
assert captures[0]["url"] == robots_url
# check pywb
t14 = captures[0]["timestamp"].strftime("%Y%m%d%H%M%S")
wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, robots_url)
expected_payload = open(
os.path.join(os.path.dirname(__file__), "htdocs", "robots.txt"), "rb"
).read()
assert requests.get(wb_url, allow_redirects=False).content == expected_payload
def test_login(httpd):
test_id = "test_login-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/site2/"),
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
"username": "test_username",
"password": "test_password",
},
)
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# take a look at the captures table
time.sleep(2) # in case warcprox hasn't finished processing urls
robots_url = make_url(httpd, "/robots.txt")
captures = list(
rr.table("captures").filter({"test_id": test_id}).order_by("timestamp").run()
)
meth_url = ["%s %s" % (c["http_method"], c["url"]) for c in captures]
# there are several forms in in htdocs/site2/login.html but only one
# that brozzler's heuristic should match and try to submit, and it has
# action='00', so we can check for that here
assert ("POST %s" % make_url(httpd, "/site2/00")) in meth_url
# sanity check the rest of the crawl
assert ("GET %s" % make_url(httpd, "/robots.txt")) in meth_url
assert ("GET %s" % make_url(httpd, "/site2/")) in meth_url
assert (
"WARCPROX_WRITE_RECORD screenshot:%s" % make_url(httpd, "/site2/")
) in meth_url
assert (
"WARCPROX_WRITE_RECORD thumbnail:%s" % make_url(httpd, "/site2/")
) in meth_url
assert ("GET %s" % make_url(httpd, "/site2/login.html")) in meth_url
assert (
"WARCPROX_WRITE_RECORD screenshot:%s" % make_url(httpd, "/site2/login.html")
) in meth_url
assert (
"WARCPROX_WRITE_RECORD thumbnail:%s" % make_url(httpd, "/site2/login.html")
) in meth_url
def test_seed_redirect(httpd):
test_id = "test_seed_redirect-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
seed_url = make_url(httpd, "/site5/redirect/")
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/site5/redirect/"),
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
assert site.scope == {
"accepts": [
{
"ssurt": "%s//%s:http:/site5/redirect/"
% (local_address, httpd.server_port)
}
]
}
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
assert site.id
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# take a look at the pages table
pages = list(frontier.site_pages(site.id))
assert len(pages) == 2
pages.sort(key=lambda page: page.hops_from_seed)
assert pages[0].hops_from_seed == 0
assert pages[0].url == seed_url
assert pages[0].redirect_url == make_url(httpd, "/site5/destination/")
assert pages[1].hops_from_seed == 1
assert pages[1].url == make_url(httpd, "/site5/destination/page2.html")
# check that scope has been updated properly
assert site.scope == {
"accepts": [
{
"ssurt": "%s//%s:http:/site5/redirect/"
% (local_address, httpd.server_port)
},
{
"ssurt": "%s//%s:http:/site5/destination/"
% (local_address, httpd.server_port)
},
]
}
def test_hashtags(httpd):
test_id = "test_hashtags-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
seed_url = make_url(httpd, "/site7/")
site = brozzler.Site(
rr,
{
"seed": seed_url,
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
assert site.id
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# check that we the page we expected
pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url)
assert len(pages) == 2
assert pages[0].url == seed_url
assert pages[0].hops_from_seed == 0
assert pages[0].brozzle_count == 1
assert pages[0].outlinks["accepted"] == [make_url(httpd, "/site7/foo.html")]
assert not pages[0].hashtags
assert pages[1].url == make_url(httpd, "/site7/foo.html")
assert pages[1].hops_from_seed == 1
assert pages[1].brozzle_count == 1
assert sorted(pages[1].hashtags) == [
"#boosh",
"#ignored",
"#whee",
]
time.sleep(2) # in case warcprox hasn't finished processing urls
# take a look at the captures table
captures = rr.table("captures").filter({"test_id": test_id}).run()
captures_by_url = {c["url"]: c for c in captures if c["http_method"] != "HEAD"}
assert seed_url in captures_by_url
assert make_url(httpd, "/site7/foo.html") in captures_by_url
assert make_url(httpd, "/site7/whee.txt") in captures_by_url
assert make_url(httpd, "/site7/boosh.txt") in captures_by_url
assert "screenshot:%s" % seed_url in captures_by_url
assert "thumbnail:%s" % seed_url in captures_by_url
assert "screenshot:%s" % make_url(httpd, "/site7/foo.html") in captures_by_url
assert "thumbnail:%s" % make_url(httpd, "/site7/foo.html") in captures_by_url
def test_redirect_hashtags(httpd):
test_id = "test_hashtags-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
seed_url = make_url(httpd, "/site9/")
site = brozzler.Site(
rr,
{
"seed": seed_url,
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
frontier = brozzler.RethinkDbFrontier(rr)
brozzler.new_site(frontier, site)
assert site.id
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# check that we the page we expected
pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url)
assert len(pages) == 2
assert pages[0].url == seed_url
assert pages[0].hops_from_seed == 0
assert pages[0].brozzle_count == 1
assert pages[0].outlinks["accepted"] == [make_url(httpd, "/site9/redirect.html")]
assert not pages[0].hashtags
assert pages[1].url == make_url(httpd, "/site9/redirect.html")
assert pages[1].hops_from_seed == 1
assert pages[1].brozzle_count == 1
assert sorted(pages[1].hashtags) == [
"#hash1",
"#hash2",
]
time.sleep(2) # in case warcprox hasn't finished processing urls
# take a look at the captures table
captures = rr.table("captures").filter({"test_id": test_id}).run()
redirect_captures = [
c
for c in captures
if c["url"] == make_url(httpd, "/site9/redirect.html")
and c["http_method"] == "GET"
]
assert len(redirect_captures) == 2 # youtube-dl + browser, no hashtags
# === expected captures ===
# 1. GET http://localhost:41243/favicon.ico
# 2. GET http://localhost:41243/robots.txt
# 3. GET http://localhost:41243/site9/
# 4. GET http://localhost:41243/site9/
# 5. GET http://localhost:41243/site9/destination.html
# 6. GET http://localhost:41243/site9/destination.html
# 7. GET http://localhost:41243/site9/redirect.html
# 8. GET http://localhost:41243/site9/redirect.html
# 9. HEAD http://localhost:41243/site9/
# 10. HEAD http://localhost:41243/site9/redirect.html
# 11. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/
# 12. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/redirect.html
# 13. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/
# 14. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/redirect.html
def test_stop_crawl(httpd):
test_id = "test_stop_crawl_job-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
frontier = brozzler.RethinkDbFrontier(rr)
# create a new job with three sites that could be crawled forever
job_conf = {
"seeds": [
{"url": make_url(httpd, "/infinite/foo/")},
{"url": make_url(httpd, "/infinite/bar/")},
{"url": make_url(httpd, "/infinite/baz/")},
]
}
job = brozzler.new_job(frontier, job_conf)
assert job.id
sites = list(frontier.job_sites(job.id))
assert not sites[0].stop_requested
assert not sites[1].stop_requested
# request crawl stop for one site using the command line entrypoint
brozzler.cli.brozzler_stop_crawl(["brozzler-stop-crawl", "--site=%s" % sites[0].id])
sites[0].refresh()
assert sites[0].stop_requested
# stop request should be honored quickly
start = time.time()
while not sites[0].status.startswith("FINISHED") and time.time() - start < 120:
time.sleep(0.5)
sites[0].refresh()
assert sites[0].status == "FINISHED_STOP_REQUESTED"
# but the other sites and the job as a whole should still be crawling
sites[1].refresh()
assert sites[1].status == "ACTIVE"
sites[2].refresh()
assert sites[2].status == "ACTIVE"
job.refresh()
assert job.status == "ACTIVE"
# request crawl stop for the job using the command line entrypoint
brozzler.cli.brozzler_stop_crawl(["brozzler-stop-crawl", "--job=%s" % job.id])
job.refresh()
assert job.stop_requested
# stop request should be honored quickly
start = time.time()
while not job.status.startswith("FINISHED") and time.time() - start < 120:
time.sleep(0.5)
job.refresh()
assert job.status == "FINISHED"
# the other sites should also be FINISHED_STOP_REQUESTED
sites[0].refresh()
assert sites[0].status == "FINISHED_STOP_REQUESTED"
sites[1].refresh()
assert sites[1].status == "FINISHED_STOP_REQUESTED"
sites[2].refresh()
assert sites[2].status == "FINISHED_STOP_REQUESTED"
def test_warcprox_outage_resiliency(httpd):
"""
Tests resiliency to warcprox outage.
If no instances of warcprox are healthy when starting to crawl a site,
brozzler-worker should sit there and wait until a healthy instance appears.
If an instance goes down, sites assigned to that instance should bounce
over to a healthy instance.
If all instances of warcprox go down, brozzler-worker should sit and wait.
"""
rr = doublethink.Rethinker("localhost", db="brozzler")
frontier = brozzler.RethinkDbFrontier(rr)
svcreg = doublethink.ServiceRegistry(rr)
# run two instances of warcprox
opts = warcprox.Options()
opts.address = "0.0.0.0"
opts.port = 0
opts.rethinkdb_services_url = "rethinkdb://localhost/brozzler/services"
warcprox1 = warcprox.controller.WarcproxController(opts)
warcprox2 = warcprox.controller.WarcproxController(opts)
warcprox1_thread = threading.Thread(
target=warcprox1.run_until_shutdown, name="warcprox1"
)
warcprox2_thread = threading.Thread(
target=warcprox2.run_until_shutdown, name="warcprox2"
)
# put together a site to crawl
test_id = "test_warcprox_death-%s" % datetime.datetime.utcnow().isoformat()
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/infinite/"),
"warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
},
)
try:
# we manage warcprox instances ourselves, so stop the one running on
# the system, if any
try:
stop_service("warcprox")
except Exception as e:
logging.warning("problem stopping warcprox service: %s", e)
# queue the site for brozzling
brozzler.new_site(frontier, site)
# check that nothing happens
# XXX tail brozzler-worker.log or something?
time.sleep(30)
site.refresh()
assert site.status == "ACTIVE"
assert not site.proxy
assert len(list(frontier.site_pages(site.id))) == 1
# start one instance of warcprox
warcprox1_thread.start()
# check that it started using that instance
start = time.time()
while not site.proxy and time.time() - start < 30:
time.sleep(0.5)
site.refresh()
assert site.proxy.endswith(":%s" % warcprox1.proxy.server_port)
# check that the site accumulates pages in the frontier, confirming
# that crawling is really happening
start = time.time()
while len(list(frontier.site_pages(site.id))) <= 1 and time.time() - start < 60:
time.sleep(0.5)
site.refresh()
assert len(list(frontier.site_pages(site.id))) > 1
# stop warcprox #1, start warcprox #2
warcprox2_thread.start()
warcprox1.stop.set()
warcprox1_thread.join()
# check that it switched over to warcprox #2
start = time.time()
while (
not site.proxy
or not site.proxy.endswith(":%s" % warcprox2.proxy.server_port)
) and time.time() - start < 30:
time.sleep(0.5)
site.refresh()
assert site.proxy.endswith(":%s" % warcprox2.proxy.server_port)
# stop warcprox #2
warcprox2.stop.set()
warcprox2_thread.join()
page_count = len(list(frontier.site_pages(site.id)))
assert page_count > 1
# check that it is waiting for a warcprox to appear
time.sleep(30)
site.refresh()
assert site.status == "ACTIVE"
assert not site.proxy
assert len(list(frontier.site_pages(site.id))) == page_count
# stop crawling the site, else it can pollute subsequent test runs
brozzler.cli.brozzler_stop_crawl(["brozzler-stop-crawl", "--site=%s" % site.id])
site.refresh()
assert site.stop_requested
# stop request should be honored quickly
start = time.time()
while not site.status.startswith("FINISHED") and time.time() - start < 120:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED_STOP_REQUESTED"
finally:
warcprox1.stop.set()
warcprox2.stop.set()
warcprox1_thread.join()
warcprox2_thread.join()
start_service("warcprox")
def test_time_limit(httpd):
test_id = "test_time_limit-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
frontier = brozzler.RethinkDbFrontier(rr)
# create a new job with one seed that could be crawled forever
job_conf = {"seeds": [{"url": make_url(httpd, "/infinite/foo/"), "time_limit": 20}]}
job = brozzler.new_job(frontier, job_conf)
assert job.id
sites = list(frontier.job_sites(job.id))
assert len(sites) == 1
site = sites[0]
# time limit should be enforced pretty soon
start = time.time()
while not sites[0].status.startswith("FINISHED") and time.time() - start < 120:
time.sleep(0.5)
sites[0].refresh()
assert sites[0].status == "FINISHED_TIME_LIMIT"
# all sites finished so job should be finished too
start = time.time()
job.refresh()
while not job.status == "FINISHED" and time.time() - start < 10:
time.sleep(0.5)
job.refresh()
assert job.status == "FINISHED"
def test_ydl_stitching(httpd):
test_id = "test_ydl_stitching-%s" % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker("localhost", db="brozzler")
frontier = brozzler.RethinkDbFrontier(rr)
site = brozzler.Site(
rr,
{
"seed": make_url(httpd, "/site10/"),
"warcprox_meta": {
"warc-prefix": "test_ydl_stitching",
"captures-table-extra-fields": {"test_id": test_id},
},
},
)
brozzler.new_site(frontier, site)
# the site should be brozzled fairly quickly
start = time.time()
while site.status != "FINISHED" and time.time() - start < 300:
time.sleep(0.5)
site.refresh()
assert site.status == "FINISHED"
# check page.videos
pages = list(frontier.site_pages(site.id))
assert len(pages) == 1
page = pages[0]
assert len(page.videos) == 6
stitched_url = "youtube-dl:00001:%s" % make_url(httpd, "/site10/")
assert {
"blame": "youtube-dl",
"content-length": 267900,
"content-type": "video/mp4",
"response_code": 204,
"url": stitched_url,
} in page.videos
time.sleep(2) # in case warcprox hasn't finished processing urls
# take a look at the captures table
captures = list(rr.table("captures").filter({"test_id": test_id}).run())
l = [c for c in captures if c["url"] == stitched_url]
assert len(l) == 1
c = l[0]
assert c["filename"].startswith("test_ydl_stitching")
assert c["content_type"] == "video/mp4"
assert c["http_method"] == "WARCPROX_WRITE_RECORD"