mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-04-21 08:06:27 -04:00
Merge branch 'master' into qa
* master: fix oversight including username/password in site config when starting a new job restore BrozzlerWorker built-in support for managing its own thread restore handling of 420 Reached limit, with a rudimentary test add import missing from test restore support for on_response and on_request, with an automated test for on_response
This commit is contained in:
commit
4e7f9f8690
@ -119,6 +119,10 @@ class WebsockReceiverThread(threading.Thread):
|
||||
|
||||
self.is_open = False
|
||||
self.got_page_load_event = None
|
||||
self.reached_limit = None
|
||||
|
||||
self.on_request = None
|
||||
self.on_response = None
|
||||
|
||||
self._result_messages = {}
|
||||
|
||||
@ -178,14 +182,39 @@ class WebsockReceiverThread(threading.Thread):
|
||||
# resume execution
|
||||
self.websock.send(json.dumps(dict(id=0, method='Debugger.resume')))
|
||||
|
||||
def _network_response_received(self, message):
|
||||
if (message['params']['response']['status'] == 420
|
||||
and 'Warcprox-Meta' in CaseInsensitiveDict(
|
||||
message['params']['response']['headers'])):
|
||||
if not self.reached_limit:
|
||||
warcprox_meta = json.loads(CaseInsensitiveDict(
|
||||
message['params']['response']['headers'])['Warcprox-Meta'])
|
||||
self.reached_limit = brozzler.ReachedLimit(
|
||||
warcprox_meta=warcprox_meta)
|
||||
self.logger.info('reached limit %s', self.reached_limit)
|
||||
brozzler.thread_raise(
|
||||
self.calling_thread, brozzler.ReachedLimit)
|
||||
else:
|
||||
self.logger.info(
|
||||
'reached limit but self.reached_limit is already set, '
|
||||
'assuming the calling thread is already handling this',
|
||||
self.reached_limit)
|
||||
if self.on_response:
|
||||
self.on_response(message)
|
||||
|
||||
def _handle_message(self, websock, json_message):
|
||||
message = json.loads(json_message)
|
||||
if 'method' in message:
|
||||
if message['method'] == 'Page.loadEventFired':
|
||||
self.got_page_load_event = datetime.datetime.utcnow()
|
||||
elif message['method'] == 'Network.responseReceived':
|
||||
self._network_response_received(message)
|
||||
elif message['method'] == 'Network.requestWillBeSent':
|
||||
if self.on_request:
|
||||
self.on_request(message)
|
||||
elif message['method'] == 'Debugger.paused':
|
||||
self._debugger_paused(message)
|
||||
elif message["method"] == "Inspector.targetCrashed":
|
||||
elif message['method'] == 'Inspector.targetCrashed':
|
||||
self.logger.error(
|
||||
'''chrome tab went "aw snap" or "he's dead jim"!''')
|
||||
brozzler.thread_raise(self.calling_thread, BrowsingException)
|
||||
@ -375,6 +404,10 @@ class Browser:
|
||||
if self.is_browsing:
|
||||
raise BrowsingException('browser is already busy browsing a page')
|
||||
self.is_browsing = True
|
||||
if on_request:
|
||||
self.websock_thread.on_request = on_request
|
||||
if on_response:
|
||||
self.websock_thread.on_response = on_response
|
||||
try:
|
||||
self.navigate_to_page(
|
||||
page_url, extra_headers=extra_headers,
|
||||
@ -397,11 +430,17 @@ class Browser:
|
||||
## outlinks += retrieve_outlinks (60 sec)
|
||||
final_page_url = self.url()
|
||||
return final_page_url, outlinks
|
||||
except brozzler.ReachedLimit:
|
||||
# websock_thread has stashed the ReachedLimit exception with
|
||||
# more information, raise that one
|
||||
raise self.websock_thread.reached_limit
|
||||
except websocket.WebSocketConnectionClosedException as e:
|
||||
self.logger.error('websocket closed, did chrome die?')
|
||||
raise BrowsingException(e)
|
||||
finally:
|
||||
self.is_browsing = False
|
||||
self.websock_thread.on_request = None
|
||||
self.websock_thread.on_response = None
|
||||
|
||||
def navigate_to_page(
|
||||
self, page_url, extra_headers=None, user_agent=None, timeout=300):
|
||||
|
@ -75,8 +75,6 @@ def new_job(frontier, job_conf):
|
||||
sites = []
|
||||
for seed_conf in job_conf["seeds"]:
|
||||
merged_conf = merge(seed_conf, job_conf)
|
||||
if "login" in merged_conf and "metadata" in merged_conf:
|
||||
merged_conf["metadata"]["login"] = merged_conf["login"]
|
||||
site = brozzler.Site(
|
||||
job_id=job.id, seed=merged_conf["url"],
|
||||
scope=merged_conf.get("scope"),
|
||||
@ -89,7 +87,9 @@ def new_job(frontier, job_conf):
|
||||
metadata=merged_conf.get("metadata"),
|
||||
remember_outlinks=merged_conf.get("remember_outlinks"),
|
||||
user_agent=merged_conf.get("user_agent"),
|
||||
behavior_parameters=merged_conf.get("behavior_parameters"))
|
||||
behavior_parameters=merged_conf.get("behavior_parameters"),
|
||||
username=merged_conf.get("username"),
|
||||
password=merged_conf.get("password"))
|
||||
sites.append(site)
|
||||
|
||||
# insert all the sites into database before the job
|
||||
|
@ -109,6 +109,9 @@ class BrozzlerWorker:
|
||||
self._browsing_threads = set()
|
||||
self._browsing_threads_lock = threading.Lock()
|
||||
|
||||
self._thread = None
|
||||
self._start_stop_lock = threading.Lock()
|
||||
|
||||
def _proxy(self, site):
|
||||
if site.proxy:
|
||||
return site.proxy
|
||||
@ -458,3 +461,28 @@ class BrozzlerWorker:
|
||||
for th in thredz:
|
||||
th.join()
|
||||
|
||||
def start(self):
|
||||
with self._start_stop_lock:
|
||||
if self._thread:
|
||||
self.logger.warn(
|
||||
'ignoring start request because self._thread is '
|
||||
'not None')
|
||||
return
|
||||
self._thread = threading.Thread(
|
||||
target=self.run, name="BrozzlerWorker")
|
||||
self._thread.start()
|
||||
|
||||
def shutdown_now(self):
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
with self._start_stop_lock:
|
||||
if self._thread and self._thread.is_alive():
|
||||
self.logger.info("brozzler worker shutting down")
|
||||
brozzler.thread_raise(self._thread, brozzler.ShutdownRequested)
|
||||
self._thread.join()
|
||||
self._thread = None
|
||||
|
||||
def is_alive(self):
|
||||
return self._thread and self._thread.is_alive()
|
||||
|
||||
|
2
setup.py
2
setup.py
@ -32,7 +32,7 @@ def find_package_data(package):
|
||||
|
||||
setuptools.setup(
|
||||
name='brozzler',
|
||||
version='1.1b9.dev154',
|
||||
version='1.1b9.dev158',
|
||||
description='Distributed web crawling with browsers',
|
||||
url='https://github.com/internetarchive/brozzler',
|
||||
author='Noah Levitt',
|
||||
|
1
tests/htdocs/site3/brozzler.svg
Symbolic link
1
tests/htdocs/site3/brozzler.svg
Symbolic link
@ -0,0 +1 @@
|
||||
../../../brozzler/dashboard/static/brozzler.svg
|
9
tests/htdocs/site3/page.html
Normal file
9
tests/htdocs/site3/page.html
Normal file
@ -0,0 +1,9 @@
|
||||
<html>
|
||||
<head>
|
||||
<title> some simple html </title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>an image</h1>
|
||||
<img src='brozzler.svg'>
|
||||
</body>
|
||||
</html>
|
@ -19,9 +19,110 @@ limitations under the License.
|
||||
|
||||
import pytest
|
||||
import brozzler
|
||||
import logging
|
||||
import os
|
||||
import http.server
|
||||
import threading
|
||||
import argparse
|
||||
import urllib
|
||||
import json
|
||||
|
||||
args = argparse.Namespace()
|
||||
args.log_level = logging.INFO
|
||||
brozzler.cli._configure_logging(args)
|
||||
|
||||
WARCPROX_META_420 = {
|
||||
'stats': {
|
||||
'test_limits_bucket': {
|
||||
'total': {'urls': 0, 'wire_bytes': 0},
|
||||
'new': {'urls': 0, 'wire_bytes': 0},
|
||||
'revisit': {'urls': 0, 'wire_bytes': 0},
|
||||
'bucket': 'test_limits_bucket'
|
||||
}
|
||||
},
|
||||
'reached-limit': {'test_limits_bucket/total/urls': 0}
|
||||
}
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def httpd(request):
|
||||
class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == '/420':
|
||||
self.send_response(420, 'Reached limit')
|
||||
self.send_header('Connection', 'close')
|
||||
self.send_header('Warcprox-Meta', json.dumps(WARCPROX_META_420))
|
||||
payload = b'request rejected by warcprox: reached limit test_limits_bucket/total/urls=0\n'
|
||||
self.send_header('Content-Type', 'text/plain;charset=utf-8')
|
||||
self.send_header('Content-Length', len(payload))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
else:
|
||||
super().do_GET()
|
||||
|
||||
# SimpleHTTPRequestHandler always uses CWD so we have to chdir
|
||||
os.chdir(os.path.join(os.path.dirname(__file__), 'htdocs'))
|
||||
|
||||
httpd = http.server.HTTPServer(('localhost', 0), RequestHandler)
|
||||
httpd_thread = threading.Thread(name='httpd', target=httpd.serve_forever)
|
||||
httpd_thread.start()
|
||||
|
||||
def fin():
|
||||
httpd.shutdown()
|
||||
httpd.server_close()
|
||||
httpd_thread.join()
|
||||
request.addfinalizer(fin)
|
||||
|
||||
return httpd
|
||||
|
||||
def test_httpd(httpd):
|
||||
'''
|
||||
Tests that our http server is working as expected, and that two fetches
|
||||
of the same url return the same payload, proving it can be used to test
|
||||
deduplication.
|
||||
'''
|
||||
payload1 = content2 = None
|
||||
url = 'http://localhost:%s/site1/file1.txt' % httpd.server_port
|
||||
with urllib.request.urlopen(url) as response:
|
||||
assert response.status == 200
|
||||
payload1 = response.read()
|
||||
assert payload1
|
||||
|
||||
with urllib.request.urlopen(url) as response:
|
||||
assert response.status == 200
|
||||
payload2 = response.read()
|
||||
assert payload2
|
||||
|
||||
assert payload1 == payload2
|
||||
|
||||
url = 'http://localhost:%s/420' % httpd.server_port
|
||||
with pytest.raises(urllib.error.HTTPError) as excinfo:
|
||||
urllib.request.urlopen(url)
|
||||
assert excinfo.value.getcode() == 420
|
||||
|
||||
def test_aw_snap_hes_dead_jim():
|
||||
chrome_exe = brozzler.suggest_default_chrome_exe()
|
||||
with brozzler.Browser(chrome_exe=chrome_exe) as browser:
|
||||
with pytest.raises(brozzler.BrowsingException):
|
||||
browser.browse_page('chrome://crash')
|
||||
|
||||
def test_on_response(httpd):
|
||||
response_urls = []
|
||||
def on_response(msg):
|
||||
response_urls.append(msg['params']['response']['url'])
|
||||
|
||||
chrome_exe = brozzler.suggest_default_chrome_exe()
|
||||
url = 'http://localhost:%s/site3/page.html' % httpd.server_port
|
||||
with brozzler.Browser(chrome_exe=chrome_exe) as browser:
|
||||
browser.browse_page(url, on_response=on_response)
|
||||
browser.browse_page(url)
|
||||
assert response_urls[0] == 'http://localhost:%s/site3/page.html' % httpd.server_port
|
||||
assert response_urls[1] == 'http://localhost:%s/site3/brozzler.svg' % httpd.server_port
|
||||
assert response_urls[2] == 'http://localhost:%s/favicon.ico' % httpd.server_port
|
||||
|
||||
def test_420(httpd):
|
||||
chrome_exe = brozzler.suggest_default_chrome_exe()
|
||||
url = 'http://localhost:%s/420' % httpd.server_port
|
||||
with brozzler.Browser(chrome_exe=chrome_exe) as browser:
|
||||
with pytest.raises(brozzler.ReachedLimit) as excinfo:
|
||||
browser.browse_page(url)
|
||||
assert excinfo.value.warcprox_meta == WARCPROX_META_420
|
||||
|
Loading…
x
Reference in New Issue
Block a user