brozzler/tests/test_units.py
2024-02-08 12:07:41 -08:00

609 lines
18 KiB
Python

#!/usr/bin/env python
"""
test_units.py - some unit tests for parts of brozzler amenable to that
Copyright (C) 2016-2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import pytest
import http.server
import threading
import os
import brozzler
import brozzler.chrome
import brozzler.ydl
import logging
import yaml
import datetime
import requests
import tempfile
import uuid
import socket
import time
import sys
import threading
from unittest import mock
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format=(
"%(asctime)s %(process)d %(levelname)s %(threadName)s "
"%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s"
),
)
@pytest.fixture(scope="module")
def httpd(request):
# SimpleHTTPRequestHandler always uses CWD so we have to chdir
os.chdir(os.path.join(os.path.dirname(__file__), "htdocs"))
httpd = http.server.HTTPServer(
("localhost", 0), http.server.SimpleHTTPRequestHandler
)
httpd_thread = threading.Thread(name="httpd", target=httpd.serve_forever)
httpd_thread.start()
def fin():
httpd.shutdown()
httpd.server_close()
httpd_thread.join()
request.addfinalizer(fin)
return httpd
def test_robots(httpd):
"""
Basic test of robots.txt user-agent substring matching.
"""
url = "http://localhost:%s/" % httpd.server_port
site = brozzler.Site(None, {"seed": url, "user_agent": "im/a/GoOdbot/yep"})
assert brozzler.is_permitted_by_robots(site, url)
site = brozzler.Site(None, {"seed": url, "user_agent": "im/a bAdBOt/uh huh"})
assert not brozzler.is_permitted_by_robots(site, url)
def test_robots_http_statuses():
for status in (
200,
204,
400,
401,
402,
403,
404,
405,
500,
501,
502,
503,
504,
505,
):
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
response = (
(
"HTTP/1.1 %s Meaningless message\r\n"
+ "Content-length: 0\r\n"
+ "\r\n"
)
% status
).encode("utf-8")
self.connection.sendall(response)
# self.send_response(status)
# self.end_headers()
httpd = http.server.HTTPServer(("localhost", 0), Handler)
httpd_thread = threading.Thread(name="httpd", target=httpd.serve_forever)
httpd_thread.start()
try:
url = "http://localhost:%s/" % httpd.server_port
site = brozzler.Site(None, {"seed": url})
assert brozzler.is_permitted_by_robots(site, url)
finally:
httpd.shutdown()
httpd.server_close()
httpd_thread.join()
def test_robots_empty_response():
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()
httpd = http.server.HTTPServer(("localhost", 0), Handler)
httpd_thread = threading.Thread(name="httpd", target=httpd.serve_forever)
httpd_thread.start()
try:
url = "http://localhost:%s/" % httpd.server_port
site = brozzler.Site(None, {"seed": url})
assert brozzler.is_permitted_by_robots(site, url)
finally:
httpd.shutdown()
httpd.server_close()
httpd_thread.join()
def test_robots_socket_timeout():
stop_hanging = threading.Event()
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
stop_hanging.wait(60)
self.connection.sendall(b"HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n")
orig_timeout = brozzler.robots._SessionRaiseOn420.timeout
httpd = http.server.HTTPServer(("localhost", 0), Handler)
httpd_thread = threading.Thread(name="httpd", target=httpd.serve_forever)
httpd_thread.start()
try:
url = "http://localhost:%s/" % httpd.server_port
site = brozzler.Site(None, {"seed": url})
brozzler.robots._SessionRaiseOn420.timeout = 2
assert brozzler.is_permitted_by_robots(site, url)
finally:
brozzler.robots._SessionRaiseOn420.timeout = orig_timeout
stop_hanging.set()
httpd.shutdown()
httpd.server_close()
httpd_thread.join()
def test_robots_dns_failure():
# .invalid. is guaranteed nonexistent per rfc 6761
url = "http://whatever.invalid./"
site = brozzler.Site(None, {"seed": url})
assert brozzler.is_permitted_by_robots(site, url)
def test_robots_connection_failure():
# .invalid. is guaranteed nonexistent per rfc 6761
url = "http://localhost:4/" # nobody listens on port 4
site = brozzler.Site(None, {"seed": url})
assert brozzler.is_permitted_by_robots(site, url)
def test_scoping():
test_scope = yaml.safe_load(
"""
max_hops: 100
accepts:
- url_match: REGEX_MATCH
value: ^.*/audio_file/.*\.mp3$
- url_match: SURT_MATCH
value: http://(com,vimeocdn,
- url_match: STRING_MATCH
value: ec-media.soundcloud.com
- regex: ^https?://twitter\.com.*$
- substring: facebook.com
- regex: ^https?://(www.)?youtube.com/watch?.*$
parent_url_regex: ^https?://(www.)?youtube.com/user/.*$
blocks:
- domain: twitter.com
url_match: REGEX_MATCH
value: ^.*lang=(?!en).*$
"""
)
site = brozzler.Site(
None,
{
"id": 1,
"seed": "http://example.com/foo/bar?baz=quux#monkey",
"scope": test_scope,
},
)
page = brozzler.Page(
None, {"url": "http://example.com/foo/bar?baz=quux#monkey", "site_id": site.id}
)
assert site.accept_reject_or_neither("http://example.com/foo/bar", page) is True
assert site.accept_reject_or_neither("http://example.com/foo/baz", page) is None
assert site.accept_reject_or_neither("http://foo.com/some.mp3", page) is None
assert (
site.accept_reject_or_neither("http://foo.com/blah/audio_file/some.mp3", page)
is True
)
assert (
site.accept_reject_or_neither("http://a.b.vimeocdn.com/blahblah", page) is True
)
assert (
site.accept_reject_or_neither("https://a.b.vimeocdn.com/blahblah", page) is None
)
assert site.accept_reject_or_neither("https://twitter.com/twit", page) is True
assert (
site.accept_reject_or_neither("https://twitter.com/twit?lang=en", page) is True
)
assert (
site.accept_reject_or_neither("https://twitter.com/twit?lang=es", page) is False
)
assert (
site.accept_reject_or_neither("https://www.facebook.com/whatevz", page) is True
)
assert (
site.accept_reject_or_neither(
"https://www.youtube.com/watch?v=dUIn5OAPS5s", page
)
is None
)
yt_user_page = brozzler.Page(
None,
{
"url": "https://www.youtube.com/user/SonoraSantaneraVEVO",
"site_id": site.id,
"hops_from_seed": 10,
},
)
assert (
site.accept_reject_or_neither(
"https://www.youtube.com/watch?v=dUIn5OAPS5s", yt_user_page
)
is True
)
def test_proxy_down():
"""
Test all fetching scenarios raise `brozzler.ProxyError` when proxy is down.
This test needs to cover every possible fetch through the proxy other than
fetches from the browser. For that, see test_brozzling.py.
Tests two different kinds of connection error:
- nothing listening the port (nobody listens on on port 4 :))
- port bound but not accepting connections
"""
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
for not_listening_proxy in ("127.0.0.1:4", "127.0.0.1:%s" % sock.getsockname()[1]):
worker = brozzler.BrozzlerWorker(frontier=None, proxy=not_listening_proxy)
site = brozzler.Site(
None, {"id": str(uuid.uuid4()), "seed": "http://example.com/"}
)
page = brozzler.Page(None, {"url": "http://example.com/"})
# robots.txt fetch
with pytest.raises(brozzler.ProxyError):
brozzler.is_permitted_by_robots(
site, "http://example.com/", proxy=not_listening_proxy
)
# youtube-dl fetch
with tempfile.TemporaryDirectory(prefix="brzl-ydl-") as tempdir:
with pytest.raises(brozzler.ProxyError):
brozzler.ydl.do_youtube_dl(worker, site, page)
# raw fetch
with pytest.raises(brozzler.ProxyError):
worker._fetch_url(site, page=page)
# WARCPROX_WRITE_RECORD
with pytest.raises(brozzler.ProxyError):
worker._warcprox_write_record(
warcprox_address=not_listening_proxy,
url="test://proxy_down/warcprox_write_record",
warc_type="metadata",
content_type="text/plain",
payload=b"""payload doesn't matter here""",
)
def test_start_stop_backwards_compat():
site = brozzler.Site(None, {"seed": "http://example.com/"})
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]["start"]
assert site.starts_and_stops[0]["stop"] is None
assert not "start_time" in site
site = brozzler.Site(
None,
{"seed": "http://example.com/", "start_time": datetime.datetime(2017, 1, 1)},
)
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]["start"] == datetime.datetime(2017, 1, 1)
assert site.starts_and_stops[0]["stop"] is None
assert not "start_time" in site
job = brozzler.Job(None, {"seeds": [{"url": "https://example.com/"}]})
assert job.starts_and_stops[0]["start"]
assert job.starts_and_stops[0]["stop"] is None
assert not "started" in job
assert not "finished" in job
job = brozzler.Job(
None,
{
"seeds": [{"url": "https://example.com/"}],
"started": datetime.datetime(2017, 1, 1),
"finished": datetime.datetime(2017, 1, 2),
},
)
assert job.starts_and_stops[0]["start"] == datetime.datetime(2017, 1, 1)
assert job.starts_and_stops[0]["stop"] == datetime.datetime(2017, 1, 2)
assert not "started" in job
assert not "finished" in job
class Exception1(Exception):
pass
class Exception2(Exception):
pass
def test_thread_raise_not_accept():
def never_accept():
try:
brozzler.sleep(2)
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test that thread_raise does not raise exception in a thread that has no
# `with thread_exception_gate()` block
thread_caught_exception = None
th = threading.Thread(target=never_accept)
th.start()
brozzler.thread_raise(th, Exception1)
th.join()
assert thread_caught_exception is None
def test_thread_raise_immediate():
def accept_immediately():
try:
with brozzler.thread_accept_exceptions():
brozzler.sleep(2)
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test immediate exception raise
thread_caught_exception = None
th = threading.Thread(target=accept_immediately)
th.start()
brozzler.thread_raise(th, Exception1)
start = time.time()
th.join()
assert thread_caught_exception
assert isinstance(thread_caught_exception, Exception1)
assert time.time() - start < 1.0
def test_thread_raise_safe_exit():
def delay_context_exit():
gate = brozzler.thread_accept_exceptions()
orig_exit = type(gate).__exit__
try:
type(gate).__exit__ = lambda self, et, ev, t: (
brozzler.sleep(2),
orig_exit(self, et, ev, t),
False,
)[-1]
with brozzler.thread_accept_exceptions() as gate:
brozzler.sleep(2)
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
finally:
type(gate).__exit__ = orig_exit
# test that a second thread_raise() doesn't result in an exception in
# ThreadExceptionGate.__exit__
thread_caught_exception = None
th = threading.Thread(target=delay_context_exit)
th.start()
time.sleep(0.2)
brozzler.thread_raise(th, Exception1)
time.sleep(0.2)
brozzler.thread_raise(th, Exception2)
th.join()
assert thread_caught_exception
assert isinstance(thread_caught_exception, Exception1)
def test_thread_raise_pending_exception():
def accept_eventually():
try:
brozzler.sleep(2)
with brozzler.thread_accept_exceptions():
pass
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test exception that has to wait for `with thread_exception_gate()` block
thread_caught_exception = None
th = threading.Thread(target=accept_eventually)
th.start()
brozzler.thread_raise(th, Exception1)
start = time.time()
th.join()
assert isinstance(thread_caught_exception, Exception1)
assert time.time() - start > 1.0
def test_thread_raise_second_with_block():
def two_with_blocks():
try:
with brozzler.thread_accept_exceptions():
time.sleep(2)
return # test fails
except Exception1 as e:
pass
except:
return # fail test
try:
with brozzler.thread_accept_exceptions():
brozzler.sleep(2)
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test that second `with` block gets second exception raised during first
# `with` block
thread_caught_exception = None
th = threading.Thread(target=two_with_blocks)
th.start()
brozzler.thread_raise(th, Exception1)
brozzler.thread_raise(th, Exception2)
th.join()
assert isinstance(thread_caught_exception, Exception2)
def test_needs_browsing():
# only one test case here right now, which exposed a bug
class ConvenientHeaders(http.client.HTTPMessage):
def __init__(self, headers):
http.client.HTTPMessage.__init__(self)
for k, v in headers.items():
self.add_header(k, v)
page = brozzler.Page(None, {"url": "http://example.com/a"})
spy = brozzler.ydl.YoutubeDLSpy()
spy.fetches.append(
{
"url": "http://example.com/a",
"method": "HEAD",
"response_code": 301,
"response_headers": ConvenientHeaders({"Location": "/b"}),
}
)
spy.fetches.append(
{
"url": "http://example.com/b",
"method": "GET",
"response_code": 200,
"response_headers": ConvenientHeaders({"Content-Type": "application/pdf"}),
}
)
assert not brozzler.worker.BrozzlerWorker._needs_browsing(None, page, spy.fetches)
def test_seed_redirect():
site = brozzler.Site(None, {"seed": "http://foo.com/"})
site.note_seed_redirect("https://foo.com/a/b/c")
assert site.scope == {
"accepts": [
{
"ssurt": "com,foo,//http:/",
},
{
"ssurt": "com,foo,//https:/",
},
]
}
site = brozzler.Site(None, {"seed": "https://foo.com/"})
site.note_seed_redirect("http://foo.com/a/b/c")
assert site.scope == {
"accepts": [
{
"ssurt": "com,foo,//https:/",
},
{
"ssurt": "com,foo,//http:/",
},
]
}
site = brozzler.Site(None, {"seed": "http://foo.com/"})
site.note_seed_redirect("https://bar.com/a/b/c")
assert site.scope == {
"accepts": [
{
"ssurt": "com,foo,//http:/",
},
{
"ssurt": "com,bar,//https:/a/b/c",
},
]
}
def test_limit_failures():
page = mock.Mock()
page.failed_attempts = None
page.brozzle_count = 0
site = mock.Mock()
site.status = "ACTIVE"
site.active_brozzling_time = 0
site.starts_and_stops = [{"start": datetime.datetime.utcnow()}]
rr = mock.Mock()
rr.servers = [mock.Mock()]
rethink_query = mock.Mock(run=mock.Mock(return_value=[]))
rr.db_list = mock.Mock(return_value=rethink_query)
rr.table_list = mock.Mock(return_value=rethink_query)
rr.table = mock.Mock(
return_value=mock.Mock(
between=mock.Mock(
return_value=mock.Mock(limit=mock.Mock(return_value=rethink_query))
)
)
)
assert rr.table().between().limit().run() == []
frontier = brozzler.RethinkDbFrontier(rr)
frontier.enforce_time_limit = mock.Mock()
frontier.honor_stop_request = mock.Mock()
frontier.claim_page = mock.Mock(return_value=page)
frontier._maybe_finish_job = mock.Mock()
browser = mock.Mock()
worker = brozzler.BrozzlerWorker(frontier)
worker.brozzle_page = mock.Mock(side_effect=Exception)
assert page.failed_attempts is None
assert page.brozzle_count == 0
assert site.status == "ACTIVE"
worker.brozzle_site(browser, site)
assert page.failed_attempts == 1
assert page.brozzle_count == 0
assert site.status == "ACTIVE"
worker.brozzle_site(browser, site)
assert page.failed_attempts == 2
assert page.brozzle_count == 0
assert site.status == "ACTIVE"
worker.brozzle_site(browser, site)
assert page.failed_attempts == 3
assert page.brozzle_count == 1
assert site.status == "FINISHED"