mirror of
				https://github.com/internetarchive/brozzler.git
				synced 2025-10-24 19:16:02 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			1025 lines
		
	
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1025 lines
		
	
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| """
 | |
| test_cluster.py - integration tests for a brozzler cluster, expects brozzler,
 | |
| warcprox, pywb, rethinkdb and other dependencies to be running already
 | |
| 
 | |
| Copyright (C) 2016-2018 Internet Archive
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| """
 | |
| 
 | |
| import datetime
 | |
| import http.server
 | |
| import os
 | |
| import socket
 | |
| import subprocess
 | |
| import threading
 | |
| import time
 | |
| import urllib.request
 | |
| 
 | |
| import doublethink
 | |
| import pytest
 | |
| import requests
 | |
| import structlog
 | |
| import warcprox
 | |
| 
 | |
| import brozzler.cli
 | |
| 
 | |
| logger = structlog.get_logger(logger_name=__name__)
 | |
| 
 | |
| 
 | |
| # https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
 | |
| def _local_address():
 | |
|     import socket
 | |
| 
 | |
|     s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | |
|     try:
 | |
|         s.connect(("10.255.255.255", 1))  # ip doesn't need to be reachable
 | |
|         return s.getsockname()[0]
 | |
|     except:  # noqa: E722
 | |
|         return "127.0.0.1"
 | |
|     finally:
 | |
|         s.close()
 | |
| 
 | |
| 
 | |
| local_address = _local_address()
 | |
| 
 | |
| 
 | |
| def start_service(service):
 | |
|     subprocess.check_call(["sudo", "svc", "-u", "/etc/service/" + service])
 | |
| 
 | |
| 
 | |
| def stop_service(service):
 | |
|     subprocess.check_call(["sudo", "svc", "-d", "/etc/service/" + service])
 | |
|     while True:
 | |
|         status = subprocess.check_output(["sudo", "svstat", "/etc/service/" + service])
 | |
|         if b" down " in status:
 | |
|             break
 | |
|         time.sleep(0.5)
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="module")
 | |
| def rethinker(request):
 | |
|     db = request.param if hasattr(request, "param") else "ignoreme"
 | |
|     servers = os.environ.get("BROZZLER_RETHINKDB_SERVERS", "localhost")
 | |
|     return doublethink.Rethinker(db=db, servers=servers.split(","))
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="module")
 | |
| def httpd(request):
 | |
|     class RequestHandler(http.server.SimpleHTTPRequestHandler):
 | |
|         def do_POST(self):
 | |
|             logger.info(
 | |
|                 "RequestHandler.do_POST",
 | |
|                 requestline=self.requestline,
 | |
|                 headers=self.headers,
 | |
|             )
 | |
|             self.do_GET()
 | |
| 
 | |
|         def do_GET(self):
 | |
|             logger.info(
 | |
|                 "RequestHandler.do_GET",
 | |
|                 requestline=self.requestline,
 | |
|                 headers=self.headers,
 | |
|             )
 | |
|             if self.path == "/site5/redirect/":
 | |
|                 self.send_response(303, "See other")
 | |
|                 self.send_header("Connection", "close")
 | |
|                 self.send_header("Content-Length", 0)
 | |
|                 self.send_header("Location", "/site5/destination/")
 | |
|                 self.end_headers()
 | |
|                 self.wfile.write(b"")
 | |
|             elif self.path == "/site9/redirect.html":
 | |
|                 self.send_response(303, "See other")
 | |
|                 self.send_header("Connection", "close")
 | |
|                 self.send_header("Content-Length", 0)
 | |
|                 self.send_header("Location", "/site9/destination.html")
 | |
|                 self.end_headers()
 | |
|                 self.wfile.write(b"")
 | |
|             elif self.path.startswith("/infinite/"):
 | |
|                 payload = b"""
 | |
| <html>
 | |
|  <head>
 | |
|   <title>infinite site</title>
 | |
|  </head>
 | |
|  <body>
 | |
|   <a href='a/'>a/</a> <a href='b/'>b/</a> <a href='c/'>c/</a>
 | |
|   <a href='d/'>d/</a> <a href='e/'>e/</a> <a href='f/'>f/</a>
 | |
|   <a href='g/'>g/</a> <a href='h/'>h/</a> <a href='i/'>i/</a>
 | |
|  </body>
 | |
| </html>
 | |
| """
 | |
|                 self.send_response(200, "OK")
 | |
|                 self.send_header("Connection", "close")
 | |
|                 self.send_header("Content-Length", len(payload))
 | |
|                 self.end_headers()
 | |
|                 self.wfile.write(payload)
 | |
|             else:
 | |
|                 super().do_GET()
 | |
| 
 | |
|     # SimpleHTTPRequestHandler always uses CWD so we have to chdir
 | |
|     os.chdir(os.path.join(os.path.dirname(__file__), "htdocs"))
 | |
| 
 | |
|     httpd = http.server.HTTPServer((local_address, 0), RequestHandler)
 | |
|     httpd_thread = threading.Thread(name="httpd", target=httpd.serve_forever)
 | |
|     httpd_thread.start()
 | |
| 
 | |
|     def fin():
 | |
|         httpd.shutdown()
 | |
|         httpd.server_close()
 | |
|         httpd_thread.join()
 | |
| 
 | |
|     request.addfinalizer(fin)
 | |
| 
 | |
|     return httpd
 | |
| 
 | |
| 
 | |
| def make_url(httpd, rel_url):
 | |
|     return "http://%s:%s%s" % (local_address, httpd.server_port, rel_url)
 | |
| 
 | |
| 
 | |
| def test_httpd(httpd):
 | |
|     """
 | |
|     Tests that our http server is working as expected, and that two fetches
 | |
|     of the same url return the same payload, proving it can be used to test
 | |
|     deduplication.
 | |
|     """
 | |
|     payload1 = None
 | |
|     url = make_url(httpd, "/site1/file1.txt")
 | |
|     with urllib.request.urlopen(url) as response:
 | |
|         assert response.status == 200
 | |
|         payload1 = response.read()
 | |
|         assert payload1
 | |
| 
 | |
|     with urllib.request.urlopen(url) as response:
 | |
|         assert response.status == 200
 | |
|         payload2 = response.read()
 | |
|         assert payload2
 | |
| 
 | |
|     assert payload1 == payload2
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip()
 | |
| def test_services_up(rethinker):
 | |
|     """Check that the expected services are up and running."""
 | |
|     # check that rethinkdb is listening and looks sane
 | |
|     rr = rethinker
 | |
|     tbls = rr.table_list().run()
 | |
|     assert len(tbls) > 10
 | |
| 
 | |
|     # check that warcprox is listening
 | |
|     with socket.socket() as s:
 | |
|         # if the connect fails an exception is raised and the test fails
 | |
|         s.connect(("localhost", 8000))
 | |
| 
 | |
|     # check that pywb is listening
 | |
|     with socket.socket() as s:
 | |
|         # if the connect fails an exception is raised and the test fails
 | |
|         s.connect(("localhost", 8880))
 | |
| 
 | |
|     # check that brozzler dashboard is listening
 | |
|     with socket.socket() as s:
 | |
|         # if the connect fails an exception is raised and the test fails
 | |
|         s.connect(("localhost", 8881))
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_brozzle_site(httpd, rethinker):
 | |
|     test_id = "test_brozzle_site-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/site1/"),
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     # the two pages we expect to be crawled
 | |
|     page1 = make_url(httpd, "/site1/")
 | |
|     page2 = make_url(httpd, "/site1/file1.txt")
 | |
|     robots = make_url(httpd, "/robots.txt")
 | |
| 
 | |
|     # so we can examine rethinkdb before it does anything
 | |
|     try:
 | |
|         stop_service("brozzler-worker")
 | |
| 
 | |
|         assert site.id is None
 | |
|         frontier = brozzler.RethinkDbFrontier(rr)
 | |
|         brozzler.new_site(frontier, site)
 | |
|         assert site.id is not None
 | |
|         assert len(list(frontier.site_pages(site.id))) == 1
 | |
|     finally:
 | |
|         start_service("brozzler-worker")
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # check that we got the two pages we expected
 | |
|     pages = list(frontier.site_pages(site.id))
 | |
|     assert len(pages) == 2
 | |
|     assert {page.url for page in pages} == {
 | |
|         make_url(httpd, "/site1/"),
 | |
|         make_url(httpd, "/site1/file1.txt"),
 | |
|     }
 | |
| 
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     # take a look at the captures table
 | |
|     captures = rr.table("captures").filter({"test_id": test_id}).run()
 | |
|     captures_by_url = {c["url"]: c for c in captures if c["http_method"] != "HEAD"}
 | |
|     assert robots in captures_by_url
 | |
|     assert page1 in captures_by_url
 | |
|     assert page2 in captures_by_url
 | |
|     assert "screenshot:%s" % page1 in captures_by_url
 | |
|     assert "thumbnail:%s" % page1 in captures_by_url
 | |
|     # no screenshots of plaintext
 | |
| 
 | |
|     # check pywb
 | |
|     t14 = captures_by_url[page2]["timestamp"].strftime("%Y%m%d%H%M%S")
 | |
|     wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, page2)
 | |
|     expected_payload = open(
 | |
|         os.path.join(os.path.dirname(__file__), "htdocs", "site1", "file1.txt"), "rb"
 | |
|     ).read()
 | |
|     assert requests.get(wb_url).content == expected_payload
 | |
| 
 | |
|     url = "screenshot:%s" % page1
 | |
|     t14 = captures_by_url[url]["timestamp"].strftime("%Y%m%d%H%M%S")
 | |
|     wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, url)
 | |
|     response = requests.get(wb_url)
 | |
|     assert response.status_code == 200
 | |
|     assert response.headers["content-type"] == "image/jpeg"
 | |
| 
 | |
|     url = "thumbnail:%s" % page1
 | |
|     t14 = captures_by_url[url]["timestamp"].strftime("%Y%m%d%H%M%S")
 | |
|     wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, url)
 | |
|     response = requests.get(wb_url)
 | |
|     assert response.status_code == 200
 | |
|     assert response.headers["content-type"] == "image/jpeg"
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip(reason="expects warcprox daemon running")
 | |
| def test_proxy_warcprox(httpd):
 | |
|     """Test --proxy with proxy that happens to be warcprox"""
 | |
|     try:
 | |
|         stop_service("brozzler-worker")
 | |
|         _test_proxy_setting(
 | |
|             httpd, proxy="localhost:8000", warcprox_auto=False, is_warcprox=True
 | |
|         )
 | |
|     finally:
 | |
|         start_service("brozzler-worker")
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip(reason="expects warcprox daemon running")
 | |
| def test_proxy_non_warcprox(httpd):
 | |
|     """Test --proxy with proxy that happens not to be warcprox"""
 | |
| 
 | |
|     class DumbProxyRequestHandler(http.server.SimpleHTTPRequestHandler):
 | |
|         def do_HEAD(self):
 | |
|             if not hasattr(self.server, "requests"):
 | |
|                 self.server.requests = []
 | |
|             logger.info("%s %s", self.command, self.path)
 | |
|             self.server.requests.append("%s %s" % (self.command, self.path))
 | |
|             response = urllib.request.urlopen(self.path)
 | |
|             self.wfile.write(
 | |
|                 ("HTTP/1.0 %s %s\r\n" % (response.code, response.reason)).encode(
 | |
|                     "ascii"
 | |
|                 )
 | |
|             )
 | |
|             for header in response.getheaders():
 | |
|                 self.wfile.write(
 | |
|                     ("%s: %s\r\n" % (header[0], header[1])).encode("ascii")
 | |
|                 )
 | |
|             self.wfile.write(b"\r\n")
 | |
|             return response
 | |
| 
 | |
|         def do_GET(self):
 | |
|             response = self.do_HEAD()
 | |
|             self.copyfile(response, self.wfile)
 | |
| 
 | |
|         def do_WARCPROX_WRITE_RECORD(self):
 | |
|             if not hasattr(self.server, "requests"):
 | |
|                 self.server.requests = []
 | |
|             logger.info("%s %s", self.command, self.path)
 | |
|             self.send_error(400)
 | |
| 
 | |
|     proxy = http.server.HTTPServer(("localhost", 0), DumbProxyRequestHandler)
 | |
|     th = threading.Thread(name="dumb-proxy", target=proxy.serve_forever)
 | |
|     th.start()
 | |
| 
 | |
|     try:
 | |
|         stop_service("brozzler-worker")
 | |
|         _test_proxy_setting(
 | |
|             httpd,
 | |
|             proxy="localhost:%s" % proxy.server_port,
 | |
|             warcprox_auto=False,
 | |
|             is_warcprox=False,
 | |
|         )
 | |
|     finally:
 | |
|         start_service("brozzler-worker")
 | |
|     assert len(proxy.requests) <= 15
 | |
|     assert proxy.requests.count("GET /status") == 1
 | |
|     assert ("GET %s" % make_url(httpd, "/site1/")) in proxy.requests
 | |
|     assert ("GET %s" % make_url(httpd, "/site1/file1.txt")) in proxy.requests
 | |
|     assert [
 | |
|         req for req in proxy.requests if req.startswith("WARCPROX_WRITE_RECORD")
 | |
|     ] == []
 | |
| 
 | |
|     proxy.shutdown()
 | |
|     th.join()
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip()
 | |
| def test_no_proxy(httpd):
 | |
|     try:
 | |
|         stop_service("brozzler-worker")
 | |
|         _test_proxy_setting(httpd, proxy=None, warcprox_auto=False, is_warcprox=False)
 | |
|     finally:
 | |
|         start_service("brozzler-worker")
 | |
|     # XXX how to check that no proxy was used?
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip()
 | |
| def test_warcprox_auto(httpd):
 | |
|     """Test --warcprox-auto"""
 | |
|     try:
 | |
|         stop_service("brozzler-worker")
 | |
|         _test_proxy_setting(httpd, proxy=None, warcprox_auto=True, is_warcprox=True)
 | |
|     finally:
 | |
|         start_service("brozzler-worker")
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip()
 | |
| def test_proxy_conflict():
 | |
|     with pytest.raises(AssertionError):
 | |
|         brozzler.worker.BrozzlerWorker(
 | |
|             None, None, warcprox_auto=True, proxy="localhost:12345"
 | |
|         )
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip()
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| def _test_proxy_setting(
 | |
|     httpd, rethinker, proxy=None, warcprox_auto=False, is_warcprox=False
 | |
| ):
 | |
|     test_id = "test_proxy=%s_warcprox_auto=%s_is_warcprox=%s-%s" % (
 | |
|         proxy,
 | |
|         warcprox_auto,
 | |
|         is_warcprox,
 | |
|         datetime.datetime.utcnow().isoformat(),
 | |
|     )
 | |
| 
 | |
|     # the two pages we expect to be crawled
 | |
|     page1 = make_url(httpd, "/site1/")
 | |
|     page2 = make_url(httpd, "/site1/file1.txt")
 | |
|     robots = make_url(httpd, "/robots.txt")
 | |
| 
 | |
|     rr = rethinker
 | |
|     service_registry = doublethink.ServiceRegistry(rr)
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/site1/"),
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
|     assert site.id is None
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
|     brozzler.new_site(frontier, site)
 | |
|     assert site.id is not None
 | |
|     assert len(list(frontier.site_pages(site.id))) == 1
 | |
| 
 | |
|     worker = brozzler.worker.BrozzlerWorker(
 | |
|         frontier,
 | |
|         service_registry,
 | |
|         max_browsers=1,
 | |
|         chrome_exe=brozzler.suggest_default_chrome_exe(),
 | |
|         warcprox_auto=warcprox_auto,
 | |
|         proxy=proxy,
 | |
|     )
 | |
|     browser = worker._browser_pool.acquire()
 | |
|     worker.brozzle_site(browser, site)
 | |
|     worker._browser_pool.release(browser)
 | |
| 
 | |
|     # check proxy is set
 | |
|     assert site.status == "FINISHED"
 | |
|     if warcprox_auto:
 | |
|         assert site.proxy[-5:] == ":8000"
 | |
|     else:
 | |
|         assert not site.proxy
 | |
|     site.refresh()  # check that these things were persisted
 | |
|     assert site.status == "FINISHED"
 | |
|     if warcprox_auto:
 | |
|         assert site.proxy[-5:] == ":8000"
 | |
|     else:
 | |
|         assert not site.proxy
 | |
| 
 | |
|     # check that we got the two pages we expected
 | |
|     pages = list(frontier.site_pages(site.id))
 | |
|     assert len(pages) == 2
 | |
|     assert {page.url for page in pages} == {
 | |
|         make_url(httpd, "/site1/"),
 | |
|         make_url(httpd, "/site1/file1.txt"),
 | |
|     }
 | |
| 
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     # take a look at the captures table
 | |
|     captures = rr.table("captures").filter({"test_id": test_id}).run()
 | |
|     captures_by_url = {c["url"]: c for c in captures if c["http_method"] != "HEAD"}
 | |
|     if is_warcprox:
 | |
|         assert robots in captures_by_url
 | |
|         assert page1 in captures_by_url
 | |
|         assert page2 in captures_by_url
 | |
|         assert "screenshot:%s" % page1 in captures_by_url
 | |
|         assert "thumbnail:%s" % page1 in captures_by_url
 | |
| 
 | |
|         # check pywb
 | |
|         t14 = captures_by_url[page2]["timestamp"].strftime("%Y%m%d%H%M%S")
 | |
|         wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, page2)
 | |
|         expected_payload = open(
 | |
|             os.path.join(os.path.dirname(__file__), "htdocs", "site1", "file1.txt"),
 | |
|             "rb",
 | |
|         ).read()
 | |
|         assert requests.get(wb_url).content == expected_payload
 | |
|     else:
 | |
|         assert captures_by_url == {}
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_obey_robots(httpd, rethinker):
 | |
|     test_id = "test_obey_robots-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/site1/"),
 | |
|             "user_agent": "im a badbot",  # robots.txt blocks badbot
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     # so we can examine rethinkdb before it does anything
 | |
|     try:
 | |
|         stop_service("brozzler-worker")
 | |
| 
 | |
|         assert site.id is None
 | |
|         frontier = brozzler.RethinkDbFrontier(rr)
 | |
|         brozzler.new_site(frontier, site)
 | |
|         assert site.id is not None
 | |
|         site_pages = list(frontier.site_pages(site.id))
 | |
|         assert len(site_pages) == 1
 | |
|         assert site_pages[0].url == site.seed
 | |
|         assert site_pages[0].needs_robots_check
 | |
|     finally:
 | |
|         start_service("brozzler-worker")
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # check that only the one page is in rethinkdb
 | |
|     pages = list(frontier.site_pages(site.id))
 | |
|     assert len(pages) == 1
 | |
|     page = pages[0]
 | |
|     assert page.url == make_url(httpd, "/site1/")
 | |
|     assert page.blocked_by_robots
 | |
| 
 | |
|     # take a look at the captures table
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     robots_url = make_url(httpd, "/robots.txt")
 | |
|     captures = list(rr.table("captures").filter({"test_id": test_id}).run())
 | |
|     assert len(captures) == 1
 | |
|     assert captures[0]["url"] == robots_url
 | |
| 
 | |
|     # check pywb
 | |
|     t14 = captures[0]["timestamp"].strftime("%Y%m%d%H%M%S")
 | |
|     wb_url = "http://localhost:8880/brozzler/%s/%s" % (t14, robots_url)
 | |
|     expected_payload = open(
 | |
|         os.path.join(os.path.dirname(__file__), "htdocs", "robots.txt"), "rb"
 | |
|     ).read()
 | |
|     assert requests.get(wb_url, allow_redirects=False).content == expected_payload
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_login(httpd, rethinker):
 | |
|     test_id = "test_login-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/site2/"),
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|             "username": "test_username",
 | |
|             "password": "test_password",
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
|     brozzler.new_site(frontier, site)
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # take a look at the captures table
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     captures = list(
 | |
|         rr.table("captures").filter({"test_id": test_id}).order_by("timestamp").run()
 | |
|     )
 | |
|     meth_url = ["%s %s" % (c["http_method"], c["url"]) for c in captures]
 | |
| 
 | |
|     # there are several forms in in htdocs/site2/login.html but only one
 | |
|     # that brozzler's heuristic should match and try to submit, and it has
 | |
|     # action='00', so we can check for that here
 | |
|     assert ("POST %s" % make_url(httpd, "/site2/00")) in meth_url
 | |
| 
 | |
|     # sanity check the rest of the crawl
 | |
|     assert ("GET %s" % make_url(httpd, "/robots.txt")) in meth_url
 | |
|     assert ("GET %s" % make_url(httpd, "/site2/")) in meth_url
 | |
|     assert (
 | |
|         "WARCPROX_WRITE_RECORD screenshot:%s" % make_url(httpd, "/site2/")
 | |
|     ) in meth_url
 | |
|     assert (
 | |
|         "WARCPROX_WRITE_RECORD thumbnail:%s" % make_url(httpd, "/site2/")
 | |
|     ) in meth_url
 | |
|     assert ("GET %s" % make_url(httpd, "/site2/login.html")) in meth_url
 | |
|     assert (
 | |
|         "WARCPROX_WRITE_RECORD screenshot:%s" % make_url(httpd, "/site2/login.html")
 | |
|     ) in meth_url
 | |
|     assert (
 | |
|         "WARCPROX_WRITE_RECORD thumbnail:%s" % make_url(httpd, "/site2/login.html")
 | |
|     ) in meth_url
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_seed_redirect(httpd, rethinker):
 | |
|     test_id = "test_seed_redirect-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     seed_url = make_url(httpd, "/site5/redirect/")
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/site5/redirect/"),
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
|     assert site.scope == {
 | |
|         "accepts": [
 | |
|             {
 | |
|                 "ssurt": "%s//%s:http:/site5/redirect/"
 | |
|                 % (local_address, httpd.server_port)
 | |
|             }
 | |
|         ]
 | |
|     }
 | |
| 
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
|     brozzler.new_site(frontier, site)
 | |
|     assert site.id
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # take a look at the pages table
 | |
|     pages = list(frontier.site_pages(site.id))
 | |
|     assert len(pages) == 2
 | |
|     pages.sort(key=lambda page: page.hops_from_seed)
 | |
|     assert pages[0].hops_from_seed == 0
 | |
|     assert pages[0].url == seed_url
 | |
|     assert pages[0].redirect_url == make_url(httpd, "/site5/destination/")
 | |
|     assert pages[1].hops_from_seed == 1
 | |
|     assert pages[1].url == make_url(httpd, "/site5/destination/page2.html")
 | |
| 
 | |
|     # check that scope has been updated properly
 | |
|     assert site.scope == {
 | |
|         "accepts": [
 | |
|             {
 | |
|                 "ssurt": "%s//%s:http:/site5/redirect/"
 | |
|                 % (local_address, httpd.server_port)
 | |
|             },
 | |
|             {
 | |
|                 "ssurt": "%s//%s:http:/site5/destination/"
 | |
|                 % (local_address, httpd.server_port)
 | |
|             },
 | |
|         ]
 | |
|     }
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_hashtags(httpd, rethinker):
 | |
|     test_id = "test_hashtags-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     seed_url = make_url(httpd, "/site7/")
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": seed_url,
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
|     brozzler.new_site(frontier, site)
 | |
|     assert site.id
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # check that we the page we expected
 | |
|     pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url)
 | |
|     assert len(pages) == 2
 | |
|     assert pages[0].url == seed_url
 | |
|     assert pages[0].hops_from_seed == 0
 | |
|     assert pages[0].brozzle_count == 1
 | |
|     assert pages[0].outlinks["accepted"] == [make_url(httpd, "/site7/foo.html")]
 | |
|     assert not pages[0].hashtags
 | |
|     assert pages[1].url == make_url(httpd, "/site7/foo.html")
 | |
|     assert pages[1].hops_from_seed == 1
 | |
|     assert pages[1].brozzle_count == 1
 | |
|     assert sorted(pages[1].hashtags) == [
 | |
|         "#boosh",
 | |
|         "#ignored",
 | |
|         "#whee",
 | |
|     ]
 | |
| 
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     # take a look at the captures table
 | |
|     captures = rr.table("captures").filter({"test_id": test_id}).run()
 | |
|     captures_by_url = {c["url"]: c for c in captures if c["http_method"] != "HEAD"}
 | |
|     assert seed_url in captures_by_url
 | |
|     assert make_url(httpd, "/site7/foo.html") in captures_by_url
 | |
|     assert make_url(httpd, "/site7/whee.txt") in captures_by_url
 | |
|     assert make_url(httpd, "/site7/boosh.txt") in captures_by_url
 | |
|     assert "screenshot:%s" % seed_url in captures_by_url
 | |
|     assert "thumbnail:%s" % seed_url in captures_by_url
 | |
|     assert "screenshot:%s" % make_url(httpd, "/site7/foo.html") in captures_by_url
 | |
|     assert "thumbnail:%s" % make_url(httpd, "/site7/foo.html") in captures_by_url
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_redirect_hashtags(httpd, rethinker):
 | |
|     test_id = "test_hashtags-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     seed_url = make_url(httpd, "/site9/")
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": seed_url,
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
|     brozzler.new_site(frontier, site)
 | |
|     assert site.id
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # check that we the page we expected
 | |
|     pages = sorted(list(frontier.site_pages(site.id)), key=lambda p: p.url)
 | |
|     assert len(pages) == 2
 | |
|     assert pages[0].url == seed_url
 | |
|     assert pages[0].hops_from_seed == 0
 | |
|     assert pages[0].brozzle_count == 1
 | |
|     assert pages[0].outlinks["accepted"] == [make_url(httpd, "/site9/redirect.html")]
 | |
|     assert not pages[0].hashtags
 | |
|     assert pages[1].url == make_url(httpd, "/site9/redirect.html")
 | |
|     assert pages[1].hops_from_seed == 1
 | |
|     assert pages[1].brozzle_count == 1
 | |
|     assert sorted(pages[1].hashtags) == [
 | |
|         "#hash1",
 | |
|         "#hash2",
 | |
|     ]
 | |
| 
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     # take a look at the captures table
 | |
|     captures = rr.table("captures").filter({"test_id": test_id}).run()
 | |
|     redirect_captures = [
 | |
|         c
 | |
|         for c in captures
 | |
|         if c["url"] == make_url(httpd, "/site9/redirect.html")
 | |
|         and c["http_method"] == "GET"
 | |
|     ]
 | |
|     assert len(redirect_captures) == 2  # youtube-dl + browser, no hashtags
 | |
| 
 | |
|     # === expected captures ===
 | |
|     #  1. GET http://localhost:41243/favicon.ico
 | |
|     #  2. GET http://localhost:41243/robots.txt
 | |
|     #  3. GET http://localhost:41243/site9/
 | |
|     #  4. GET http://localhost:41243/site9/
 | |
|     #  5. GET http://localhost:41243/site9/destination.html
 | |
|     #  6. GET http://localhost:41243/site9/destination.html
 | |
|     #  7. GET http://localhost:41243/site9/redirect.html
 | |
|     #  8. GET http://localhost:41243/site9/redirect.html
 | |
|     #  9. HEAD http://localhost:41243/site9/
 | |
|     # 10. HEAD http://localhost:41243/site9/redirect.html
 | |
|     # 11. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/
 | |
|     # 12. WARCPROX_WRITE_RECORD screenshot:http://localhost:41243/site9/redirect.html
 | |
|     # 13. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/
 | |
|     # 14. WARCPROX_WRITE_RECORD thumbnail:http://localhost:41243/site9/redirect.html
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_stop_crawl(httpd, rethinker):
 | |
|     rr = rethinker
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
| 
 | |
|     # create a new job with three sites that could be crawled forever
 | |
|     job_conf = {
 | |
|         "seeds": [
 | |
|             {"url": make_url(httpd, "/infinite/foo/")},
 | |
|             {"url": make_url(httpd, "/infinite/bar/")},
 | |
|             {"url": make_url(httpd, "/infinite/baz/")},
 | |
|         ]
 | |
|     }
 | |
|     job = brozzler.new_job(frontier, job_conf)
 | |
|     assert job.id
 | |
| 
 | |
|     sites = list(frontier.job_sites(job.id))
 | |
|     assert not sites[0].stop_requested
 | |
|     assert not sites[1].stop_requested
 | |
| 
 | |
|     # request crawl stop for one site using the command line entrypoint
 | |
|     brozzler.cli.brozzler_stop_crawl(["brozzler-stop-crawl", "--site=%s" % sites[0].id])
 | |
|     sites[0].refresh()
 | |
|     assert sites[0].stop_requested
 | |
| 
 | |
|     # stop request should be honored quickly
 | |
|     start = time.time()
 | |
|     while not sites[0].status.startswith("FINISHED") and time.time() - start < 120:
 | |
|         time.sleep(0.5)
 | |
|         sites[0].refresh()
 | |
|     assert sites[0].status == "FINISHED_STOP_REQUESTED"
 | |
| 
 | |
|     # but the other sites and the job as a whole should still be crawling
 | |
|     sites[1].refresh()
 | |
|     assert sites[1].status == "ACTIVE"
 | |
|     sites[2].refresh()
 | |
|     assert sites[2].status == "ACTIVE"
 | |
|     job.refresh()
 | |
|     assert job.status == "ACTIVE"
 | |
| 
 | |
|     # request crawl stop for the job using the command line entrypoint
 | |
|     brozzler.cli.brozzler_stop_crawl(["brozzler-stop-crawl", "--job=%s" % job.id])
 | |
|     job.refresh()
 | |
|     assert job.stop_requested
 | |
| 
 | |
|     # stop request should be honored quickly
 | |
|     start = time.time()
 | |
|     while not job.status.startswith("FINISHED") and time.time() - start < 120:
 | |
|         time.sleep(0.5)
 | |
|         job.refresh()
 | |
|     assert job.status == "FINISHED"
 | |
| 
 | |
|     # the other sites should also be FINISHED_STOP_REQUESTED
 | |
|     sites[0].refresh()
 | |
|     assert sites[0].status == "FINISHED_STOP_REQUESTED"
 | |
|     sites[1].refresh()
 | |
|     assert sites[1].status == "FINISHED_STOP_REQUESTED"
 | |
|     sites[2].refresh()
 | |
|     assert sites[2].status == "FINISHED_STOP_REQUESTED"
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_warcprox_outage_resiliency(httpd, rethinker):
 | |
|     """
 | |
|     Tests resiliency to warcprox outage.
 | |
| 
 | |
|     If no instances of warcprox are healthy when starting to crawl a site,
 | |
|     brozzler-worker should sit there and wait until a healthy instance appears.
 | |
| 
 | |
|     If an instance goes down, sites assigned to that instance should bounce
 | |
|     over to a healthy instance.
 | |
| 
 | |
|     If all instances of warcprox go down, brozzler-worker should sit and wait.
 | |
|     """
 | |
|     rr = rethinker
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
| 
 | |
|     # run two instances of warcprox
 | |
|     opts = warcprox.Options()
 | |
|     opts.address = "0.0.0.0"
 | |
|     opts.port = 0
 | |
|     opts.rethinkdb_services_url = "rethinkdb://localhost/brozzler/services"
 | |
| 
 | |
|     warcprox1 = warcprox.controller.WarcproxController(opts)
 | |
|     warcprox2 = warcprox.controller.WarcproxController(opts)
 | |
|     warcprox1_thread = threading.Thread(
 | |
|         target=warcprox1.run_until_shutdown, name="warcprox1"
 | |
|     )
 | |
|     warcprox2_thread = threading.Thread(
 | |
|         target=warcprox2.run_until_shutdown, name="warcprox2"
 | |
|     )
 | |
| 
 | |
|     # put together a site to crawl
 | |
|     test_id = "test_warcprox_death-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/infinite/"),
 | |
|             "warcprox_meta": {"captures-table-extra-fields": {"test_id": test_id}},
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     try:
 | |
|         # we manage warcprox instances ourselves, so stop the one running on
 | |
|         # the system, if any
 | |
|         try:
 | |
|             stop_service("warcprox")
 | |
|         except Exception:
 | |
|             logger.warning("problem stopping warcprox service: %s", exc_info=True)
 | |
| 
 | |
|         # queue the site for brozzling
 | |
|         brozzler.new_site(frontier, site)
 | |
| 
 | |
|         # check that nothing happens
 | |
|         # XXX tail brozzler-worker.log or something?
 | |
|         time.sleep(30)
 | |
|         site.refresh()
 | |
|         assert site.status == "ACTIVE"
 | |
|         assert not site.proxy
 | |
|         assert len(list(frontier.site_pages(site.id))) == 1
 | |
| 
 | |
|         # start one instance of warcprox
 | |
|         warcprox1_thread.start()
 | |
| 
 | |
|         # check that it started using that instance
 | |
|         start = time.time()
 | |
|         while not site.proxy and time.time() - start < 30:
 | |
|             time.sleep(0.5)
 | |
|             site.refresh()
 | |
|         assert site.proxy.endswith(":%s" % warcprox1.proxy.server_port)
 | |
| 
 | |
|         # check that the site accumulates pages in the frontier, confirming
 | |
|         # that crawling is really happening
 | |
|         start = time.time()
 | |
|         while len(list(frontier.site_pages(site.id))) <= 1 and time.time() - start < 60:
 | |
|             time.sleep(0.5)
 | |
|             site.refresh()
 | |
|         assert len(list(frontier.site_pages(site.id))) > 1
 | |
| 
 | |
|         # stop warcprox #1, start warcprox #2
 | |
|         warcprox2_thread.start()
 | |
|         warcprox1.stop.set()
 | |
|         warcprox1_thread.join()
 | |
| 
 | |
|         # check that it switched over to warcprox #2
 | |
|         start = time.time()
 | |
|         while (
 | |
|             not site.proxy
 | |
|             or not site.proxy.endswith(":%s" % warcprox2.proxy.server_port)
 | |
|         ) and time.time() - start < 30:
 | |
|             time.sleep(0.5)
 | |
|             site.refresh()
 | |
|         assert site.proxy.endswith(":%s" % warcprox2.proxy.server_port)
 | |
| 
 | |
|         # stop warcprox #2
 | |
|         warcprox2.stop.set()
 | |
|         warcprox2_thread.join()
 | |
| 
 | |
|         page_count = len(list(frontier.site_pages(site.id)))
 | |
|         assert page_count > 1
 | |
| 
 | |
|         # check that it is waiting for a warcprox to appear
 | |
|         time.sleep(30)
 | |
|         site.refresh()
 | |
|         assert site.status == "ACTIVE"
 | |
|         assert not site.proxy
 | |
|         assert len(list(frontier.site_pages(site.id))) == page_count
 | |
| 
 | |
|         # stop crawling the site, else it can pollute subsequent test runs
 | |
|         brozzler.cli.brozzler_stop_crawl(["brozzler-stop-crawl", "--site=%s" % site.id])
 | |
|         site.refresh()
 | |
|         assert site.stop_requested
 | |
| 
 | |
|         # stop request should be honored quickly
 | |
|         start = time.time()
 | |
|         while not site.status.startswith("FINISHED") and time.time() - start < 120:
 | |
|             time.sleep(0.5)
 | |
|             site.refresh()
 | |
|         assert site.status == "FINISHED_STOP_REQUESTED"
 | |
|     finally:
 | |
|         warcprox1.stop.set()
 | |
|         warcprox2.stop.set()
 | |
|         warcprox1_thread.join()
 | |
|         warcprox2_thread.join()
 | |
|         start_service("warcprox")
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_time_limit(httpd, rethinker):
 | |
|     rr = rethinker
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
| 
 | |
|     # create a new job with one seed that could be crawled forever
 | |
|     job_conf = {"seeds": [{"url": make_url(httpd, "/infinite/foo/"), "time_limit": 20}]}
 | |
|     job = brozzler.new_job(frontier, job_conf)
 | |
|     assert job.id
 | |
| 
 | |
|     sites = list(frontier.job_sites(job.id))
 | |
|     assert len(sites) == 1
 | |
| 
 | |
|     # time limit should be enforced pretty soon
 | |
|     start = time.time()
 | |
|     while not sites[0].status.startswith("FINISHED") and time.time() - start < 120:
 | |
|         time.sleep(0.5)
 | |
|         sites[0].refresh()
 | |
|     assert sites[0].status == "FINISHED_TIME_LIMIT"
 | |
| 
 | |
|     # all sites finished so job should be finished too
 | |
|     start = time.time()
 | |
|     job.refresh()
 | |
|     while not job.status == "FINISHED" and time.time() - start < 10:
 | |
|         time.sleep(0.5)
 | |
|         job.refresh()
 | |
|     assert job.status == "FINISHED"
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("rethinker", ["brozzler"], indirect=True)
 | |
| @pytest.mark.skip(reason="expects brozzler worker daemon running")
 | |
| def test_ydl_stitching(httpd, rethinker):
 | |
|     test_id = "test_ydl_stitching-%s" % datetime.datetime.utcnow().isoformat()
 | |
|     rr = rethinker
 | |
|     frontier = brozzler.RethinkDbFrontier(rr)
 | |
|     site = brozzler.Site(
 | |
|         rr,
 | |
|         {
 | |
|             "seed": make_url(httpd, "/site10/"),
 | |
|             "warcprox_meta": {
 | |
|                 "warc-prefix": "test_ydl_stitching",
 | |
|                 "captures-table-extra-fields": {"test_id": test_id},
 | |
|             },
 | |
|         },
 | |
|     )
 | |
|     brozzler.new_site(frontier, site)
 | |
| 
 | |
|     # the site should be brozzled fairly quickly
 | |
|     start = time.time()
 | |
|     while site.status != "FINISHED" and time.time() - start < 300:
 | |
|         time.sleep(0.5)
 | |
|         site.refresh()
 | |
|     assert site.status == "FINISHED"
 | |
| 
 | |
|     # check page.videos
 | |
|     pages = list(frontier.site_pages(site.id))
 | |
|     assert len(pages) == 1
 | |
|     page = pages[0]
 | |
|     assert len(page.videos) == 6
 | |
|     stitched_url = "youtube-dl:00001:%s" % make_url(httpd, "/site10/")
 | |
|     assert {
 | |
|         "blame": "youtube-dl",
 | |
|         "content-length": 267900,
 | |
|         "content-type": "video/mp4",
 | |
|         "response_code": 204,
 | |
|         "url": stitched_url,
 | |
|     } in page.videos
 | |
| 
 | |
|     time.sleep(2)  # in case warcprox hasn't finished processing urls
 | |
|     # take a look at the captures table
 | |
|     captures = list(rr.table("captures").filter({"test_id": test_id}).run())
 | |
|     l = [c for c in captures if c["url"] == stitched_url]  # noqa: E741
 | |
|     assert len(l) == 1
 | |
|     c = l[0]
 | |
|     assert c["filename"].startswith("test_ydl_stitching")
 | |
|     assert c["content_type"] == "video/mp4"
 | |
|     assert c["http_method"] == "WARCPROX_WRITE_RECORD"
 | 
