Merge branch 'master' into qa

* master:
  new command line utility brozzler-stop-crawl, with tests
This commit is contained in:
Noah Levitt 2017-04-14 18:06:24 -07:00
commit 929f046ebb
7 changed files with 178 additions and 11 deletions

View File

@ -2,7 +2,7 @@
brozzler/__init__.py - __init__.py for brozzler package, contains some common
code
Copyright (C) 2014-2016 Internet Archive
Copyright (C) 2014-2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -26,7 +26,7 @@ class ShutdownRequested(Exception):
class NothingToClaim(Exception):
pass
class CrawlJobStopped(Exception):
class CrawlStopped(Exception):
pass
class ReachedLimit(Exception):

View File

@ -635,3 +635,39 @@ def brozzler_list_captures(argv=None):
for result in results:
print(json.dumps(result, cls=Jsonner, indent=2))
def brozzler_stop_crawl(argv=None):
argv = argv or sys.argv
arg_parser = argparse.ArgumentParser(
prog=os.path.basename(argv[0]),
formatter_class=BetterArgumentDefaultsHelpFormatter)
group = arg_parser.add_mutually_exclusive_group(required=True)
add_rethinkdb_options(arg_parser)
group.add_argument(
'--job', dest='job_id', metavar='JOB_ID', help=(
'request crawl stop for the specified job'))
group.add_argument(
'--site', dest='site_id', metavar='SITE_ID', help=(
'request crawl stop for the specified site'))
add_common_options(arg_parser, argv)
args = arg_parser.parse_args(args=argv[1:])
configure_logging(args)
rr = rethinker(args)
if args.job_id:
try:
job_id = int(args.job_id)
except ValueError:
job_id = args.job_id
job = brozzler.Job.load(rr, job_id)
job.stop_requested = doublethink.utcnow()
job.save()
elif args.site_id:
try:
site_id = int(args.site_id)
except ValueError:
site_id = args.site_id
site = brozzler.Site.load(rr, site_id)
site.stop_requested = doublethink.utcnow()
site.save()

View File

@ -183,12 +183,20 @@ class RethinkDbFrontier:
for result in results:
yield brozzler.Job(self.rr, result)
def honor_stop_request(self, job_id):
"""Raises brozzler.CrawlJobStopped if stop has been requested."""
job = brozzler.Job.load(self.rr, job_id)
if job and job.get('stop_requested'):
self.logger.info("stop requested for job %s", job_id)
raise brozzler.CrawlJobStopped
def honor_stop_request(self, site):
"""Raises brozzler.CrawlStopped if stop has been requested."""
site.refresh()
if (site.stop_requested
and site.stop_requested <= doublethink.utcnow()):
self.logger.info("stop requested for site %s", site.id)
raise brozzler.CrawlStopped
if site.job_id:
job = brozzler.Job.load(self.rr, site.job_id)
if (job and job.stop_requested
and job.stop_requested <= doublethink.utcnow()):
self.logger.info("stop requested for job %s", site.job_id)
raise brozzler.CrawlStopped
def _maybe_finish_job(self, job_id):
"""Returns True if job is finished."""

View File

@ -406,7 +406,7 @@ class BrozzlerWorker:
start = time.time()
while time.time() - start < 7 * 60:
site.refresh()
self._frontier.honor_stop_request(site.job_id)
self._frontier.honor_stop_request(site)
page = self._frontier.claim_page(site, "%s:%s" % (
socket.gethostname(), browser.chrome.port))
@ -431,7 +431,7 @@ class BrozzlerWorker:
self.logger.info("no pages left for site %s", site)
except brozzler.ReachedLimit as e:
self._frontier.reached_limit(site, e)
except brozzler.CrawlJobStopped:
except brozzler.CrawlStopped:
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
# except brozzler.browser.BrowsingAborted:
# self.logger.info("{} shut down".format(browser))

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b11.dev228',
version='1.1b11.dev229',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',
@ -56,6 +56,7 @@ setuptools.setup(
'brozzler-list-jobs=brozzler.cli:brozzler_list_jobs',
'brozzler-list-sites=brozzler.cli:brozzler_list_sites',
'brozzler-list-pages=brozzler.cli:brozzler_list_pages',
'brozzler-stop-crawl=brozzler.cli:brozzler_stop_crawl',
'brozzler-dashboard=brozzler.dashboard:main',
'brozzler-easy=brozzler.easy:main',
'brozzler-wayback=brozzler.pywb:main',

View File

@ -50,6 +50,24 @@ def httpd(request):
self.send_header('Location', '/site5/destination/')
self.end_headers()
self.wfile.write(b'')
elif self.path.startswith('/infinite/'):
payload = b'''
<html>
<head>
<title>infinite site</title>
</head>
<body>
<a href='a/'>a/</a> <a href='b/'>b/</a> <a href='c/'>c/</a>
<a href='d/'>d/</a> <a href='e/'>e/</a> <a href='f/'>f/</a>
<a href='g/'>g/</a> <a href='h/'>h/</a> <a href='i/'>i/</a>
</body>
</html>
'''
self.send_response(200, 'OK')
self.send_header('Connection', 'close')
self.send_header('Content-Length', len(payload))
self.end_headers()
self.wfile.write(payload)
else:
super().do_GET()
@ -500,3 +518,64 @@ def test_hashtags(httpd):
assert 'screenshot:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
assert 'thumbnail:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
def test_stop_crawl(httpd):
test_id = 'test_stop_crawl_job-%s' % datetime.datetime.utcnow().isoformat()
rr = doublethink.Rethinker('localhost', db='brozzler')
frontier = brozzler.RethinkDbFrontier(rr)
# create a new job with three sites that could be crawled forever
job_conf = {'seeds': [
{'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port},
{'url': 'http://localhost:%s/infinite/bar/' % httpd.server_port},
{'url': 'http://localhost:%s/infinite/baz/' % httpd.server_port}]}
job = brozzler.new_job(frontier, job_conf)
assert job.id
sites = list(frontier.job_sites(job.id))
assert not sites[0].stop_requested
assert not sites[1].stop_requested
# request crawl stop for one site using the command line entrypoint
brozzler.cli.brozzler_stop_crawl([
'brozzler-stop-crawl', '--site=%s' % sites[0].id])
sites[0].refresh()
assert sites[0].stop_requested
# stop request should be honored quickly
start = time.time()
while not sites[0].status.startswith(
'FINISHED') and time.time() - start < 120:
time.sleep(0.5)
sites[0].refresh()
assert sites[0].status == 'FINISHED_STOP_REQUESTED'
# but the other sites and the job as a whole should still be crawling
sites[1].refresh()
assert sites[1].status == 'ACTIVE'
sites[2].refresh()
assert sites[2].status == 'ACTIVE'
job.refresh()
assert job.status == 'ACTIVE'
# request crawl stop for the job using the command line entrypoint
brozzler.cli.brozzler_stop_crawl([
'brozzler-stop-crawl', '--job=%s' % job.id])
job.refresh()
assert job.stop_requested
# stop request should be honored quickly
start = time.time()
while not job.status.startswith(
'FINISHED') and time.time() - start < 120:
time.sleep(0.5)
job.refresh()
assert job.status == 'FINISHED'
# the other sites should also be FINISHED_STOP_REQUESTED
sites[0].refresh()
assert sites[0].status == 'FINISHED_STOP_REQUESTED'
sites[1].refresh()
assert sites[1].status == 'FINISHED_STOP_REQUESTED'
sites[2].refresh()
assert sites[2].status == 'FINISHED_STOP_REQUESTED'

View File

@ -24,6 +24,8 @@ import argparse
import doublethink
import time
import datetime
import uuid
import pytest
args = argparse.Namespace()
args.log_level = logging.INFO
@ -655,3 +657,44 @@ def test_hashtag_links():
assert pages[2].hashtags == ['#buh']
assert pages[2].priority == 12
def test_honor_stop_request():
rr = doublethink.Rethinker('localhost', db='ignoreme')
frontier = brozzler.RethinkDbFrontier(rr)
# 1. test stop request on job
job_conf = {'seeds': [{'url': 'http://example.com'}]}
job = brozzler.new_job(frontier, job_conf)
assert job.id
sites = list(frontier.job_sites(job.id))
assert len(sites) == 1
site = sites[0]
assert site.job_id == job.id
# does not raise exception
frontier.honor_stop_request(site)
# set job.stop_requested
job.stop_requested = datetime.datetime.utcnow().replace(
tzinfo=doublethink.UTC)
job.save()
with pytest.raises(brozzler.CrawlStopped):
frontier.honor_stop_request(site)
# 2. test stop request on site
job_conf = {'seeds': [{'url': 'http://example.com'}]}
job = brozzler.new_job(frontier, job_conf)
assert job.id
sites = list(frontier.job_sites(job.id))
assert len(sites) == 1
site = sites[0]
assert site.job_id == job.id
# does not raise exception
frontier.honor_stop_request(site)
# set site.stop_requested
site.stop_requested = doublethink.utcnow()
site.save()
with pytest.raises(brozzler.CrawlStopped):
frontier.honor_stop_request(site)