mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-06-20 04:44:12 -04:00
new command line utility brozzler-stop-crawl, with tests
This commit is contained in:
parent
fae60e9960
commit
df7734f2ca
7 changed files with 178 additions and 11 deletions
|
@ -2,7 +2,7 @@
|
||||||
brozzler/__init__.py - __init__.py for brozzler package, contains some common
|
brozzler/__init__.py - __init__.py for brozzler package, contains some common
|
||||||
code
|
code
|
||||||
|
|
||||||
Copyright (C) 2014-2016 Internet Archive
|
Copyright (C) 2014-2017 Internet Archive
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
@ -26,7 +26,7 @@ class ShutdownRequested(Exception):
|
||||||
class NothingToClaim(Exception):
|
class NothingToClaim(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class CrawlJobStopped(Exception):
|
class CrawlStopped(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class ReachedLimit(Exception):
|
class ReachedLimit(Exception):
|
||||||
|
|
|
@ -635,3 +635,39 @@ def brozzler_list_captures(argv=None):
|
||||||
for result in results:
|
for result in results:
|
||||||
print(json.dumps(result, cls=Jsonner, indent=2))
|
print(json.dumps(result, cls=Jsonner, indent=2))
|
||||||
|
|
||||||
|
def brozzler_stop_crawl(argv=None):
|
||||||
|
argv = argv or sys.argv
|
||||||
|
arg_parser = argparse.ArgumentParser(
|
||||||
|
prog=os.path.basename(argv[0]),
|
||||||
|
formatter_class=BetterArgumentDefaultsHelpFormatter)
|
||||||
|
group = arg_parser.add_mutually_exclusive_group(required=True)
|
||||||
|
add_rethinkdb_options(arg_parser)
|
||||||
|
group.add_argument(
|
||||||
|
'--job', dest='job_id', metavar='JOB_ID', help=(
|
||||||
|
'request crawl stop for the specified job'))
|
||||||
|
group.add_argument(
|
||||||
|
'--site', dest='site_id', metavar='SITE_ID', help=(
|
||||||
|
'request crawl stop for the specified site'))
|
||||||
|
add_common_options(arg_parser, argv)
|
||||||
|
|
||||||
|
args = arg_parser.parse_args(args=argv[1:])
|
||||||
|
configure_logging(args)
|
||||||
|
|
||||||
|
rr = rethinker(args)
|
||||||
|
if args.job_id:
|
||||||
|
try:
|
||||||
|
job_id = int(args.job_id)
|
||||||
|
except ValueError:
|
||||||
|
job_id = args.job_id
|
||||||
|
job = brozzler.Job.load(rr, job_id)
|
||||||
|
job.stop_requested = doublethink.utcnow()
|
||||||
|
job.save()
|
||||||
|
elif args.site_id:
|
||||||
|
try:
|
||||||
|
site_id = int(args.site_id)
|
||||||
|
except ValueError:
|
||||||
|
site_id = args.site_id
|
||||||
|
site = brozzler.Site.load(rr, site_id)
|
||||||
|
site.stop_requested = doublethink.utcnow()
|
||||||
|
site.save()
|
||||||
|
|
||||||
|
|
|
@ -183,12 +183,20 @@ class RethinkDbFrontier:
|
||||||
for result in results:
|
for result in results:
|
||||||
yield brozzler.Job(self.rr, result)
|
yield brozzler.Job(self.rr, result)
|
||||||
|
|
||||||
def honor_stop_request(self, job_id):
|
def honor_stop_request(self, site):
|
||||||
"""Raises brozzler.CrawlJobStopped if stop has been requested."""
|
"""Raises brozzler.CrawlStopped if stop has been requested."""
|
||||||
job = brozzler.Job.load(self.rr, job_id)
|
site.refresh()
|
||||||
if job and job.get('stop_requested'):
|
if (site.stop_requested
|
||||||
self.logger.info("stop requested for job %s", job_id)
|
and site.stop_requested <= doublethink.utcnow()):
|
||||||
raise brozzler.CrawlJobStopped
|
self.logger.info("stop requested for site %s", site.id)
|
||||||
|
raise brozzler.CrawlStopped
|
||||||
|
|
||||||
|
if site.job_id:
|
||||||
|
job = brozzler.Job.load(self.rr, site.job_id)
|
||||||
|
if (job and job.stop_requested
|
||||||
|
and job.stop_requested <= doublethink.utcnow()):
|
||||||
|
self.logger.info("stop requested for job %s", site.job_id)
|
||||||
|
raise brozzler.CrawlStopped
|
||||||
|
|
||||||
def _maybe_finish_job(self, job_id):
|
def _maybe_finish_job(self, job_id):
|
||||||
"""Returns True if job is finished."""
|
"""Returns True if job is finished."""
|
||||||
|
|
|
@ -406,7 +406,7 @@ class BrozzlerWorker:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while time.time() - start < 7 * 60:
|
while time.time() - start < 7 * 60:
|
||||||
site.refresh()
|
site.refresh()
|
||||||
self._frontier.honor_stop_request(site.job_id)
|
self._frontier.honor_stop_request(site)
|
||||||
page = self._frontier.claim_page(site, "%s:%s" % (
|
page = self._frontier.claim_page(site, "%s:%s" % (
|
||||||
socket.gethostname(), browser.chrome.port))
|
socket.gethostname(), browser.chrome.port))
|
||||||
|
|
||||||
|
@ -431,7 +431,7 @@ class BrozzlerWorker:
|
||||||
self.logger.info("no pages left for site %s", site)
|
self.logger.info("no pages left for site %s", site)
|
||||||
except brozzler.ReachedLimit as e:
|
except brozzler.ReachedLimit as e:
|
||||||
self._frontier.reached_limit(site, e)
|
self._frontier.reached_limit(site, e)
|
||||||
except brozzler.CrawlJobStopped:
|
except brozzler.CrawlStopped:
|
||||||
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
||||||
# except brozzler.browser.BrowsingAborted:
|
# except brozzler.browser.BrowsingAborted:
|
||||||
# self.logger.info("{} shut down".format(browser))
|
# self.logger.info("{} shut down".format(browser))
|
||||||
|
|
3
setup.py
3
setup.py
|
@ -32,7 +32,7 @@ def find_package_data(package):
|
||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='brozzler',
|
name='brozzler',
|
||||||
version='1.1b11.dev228',
|
version='1.1b11.dev229',
|
||||||
description='Distributed web crawling with browsers',
|
description='Distributed web crawling with browsers',
|
||||||
url='https://github.com/internetarchive/brozzler',
|
url='https://github.com/internetarchive/brozzler',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
@ -56,6 +56,7 @@ setuptools.setup(
|
||||||
'brozzler-list-jobs=brozzler.cli:brozzler_list_jobs',
|
'brozzler-list-jobs=brozzler.cli:brozzler_list_jobs',
|
||||||
'brozzler-list-sites=brozzler.cli:brozzler_list_sites',
|
'brozzler-list-sites=brozzler.cli:brozzler_list_sites',
|
||||||
'brozzler-list-pages=brozzler.cli:brozzler_list_pages',
|
'brozzler-list-pages=brozzler.cli:brozzler_list_pages',
|
||||||
|
'brozzler-stop-crawl=brozzler.cli:brozzler_stop_crawl',
|
||||||
'brozzler-dashboard=brozzler.dashboard:main',
|
'brozzler-dashboard=brozzler.dashboard:main',
|
||||||
'brozzler-easy=brozzler.easy:main',
|
'brozzler-easy=brozzler.easy:main',
|
||||||
'brozzler-wayback=brozzler.pywb:main',
|
'brozzler-wayback=brozzler.pywb:main',
|
||||||
|
|
|
@ -50,6 +50,24 @@ def httpd(request):
|
||||||
self.send_header('Location', '/site5/destination/')
|
self.send_header('Location', '/site5/destination/')
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(b'')
|
self.wfile.write(b'')
|
||||||
|
elif self.path.startswith('/infinite/'):
|
||||||
|
payload = b'''
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<title>infinite site</title>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<a href='a/'>a/</a> <a href='b/'>b/</a> <a href='c/'>c/</a>
|
||||||
|
<a href='d/'>d/</a> <a href='e/'>e/</a> <a href='f/'>f/</a>
|
||||||
|
<a href='g/'>g/</a> <a href='h/'>h/</a> <a href='i/'>i/</a>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
'''
|
||||||
|
self.send_response(200, 'OK')
|
||||||
|
self.send_header('Connection', 'close')
|
||||||
|
self.send_header('Content-Length', len(payload))
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(payload)
|
||||||
else:
|
else:
|
||||||
super().do_GET()
|
super().do_GET()
|
||||||
|
|
||||||
|
@ -500,3 +518,64 @@ def test_hashtags(httpd):
|
||||||
assert 'screenshot:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
assert 'screenshot:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
||||||
assert 'thumbnail:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
assert 'thumbnail:http://localhost:%s/site7/foo.html' % httpd.server_port in captures_by_url
|
||||||
|
|
||||||
|
def test_stop_crawl(httpd):
|
||||||
|
test_id = 'test_stop_crawl_job-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
|
# create a new job with three sites that could be crawled forever
|
||||||
|
job_conf = {'seeds': [
|
||||||
|
{'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port},
|
||||||
|
{'url': 'http://localhost:%s/infinite/bar/' % httpd.server_port},
|
||||||
|
{'url': 'http://localhost:%s/infinite/baz/' % httpd.server_port}]}
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
assert job.id
|
||||||
|
|
||||||
|
sites = list(frontier.job_sites(job.id))
|
||||||
|
assert not sites[0].stop_requested
|
||||||
|
assert not sites[1].stop_requested
|
||||||
|
|
||||||
|
# request crawl stop for one site using the command line entrypoint
|
||||||
|
brozzler.cli.brozzler_stop_crawl([
|
||||||
|
'brozzler-stop-crawl', '--site=%s' % sites[0].id])
|
||||||
|
sites[0].refresh()
|
||||||
|
assert sites[0].stop_requested
|
||||||
|
|
||||||
|
# stop request should be honored quickly
|
||||||
|
start = time.time()
|
||||||
|
while not sites[0].status.startswith(
|
||||||
|
'FINISHED') and time.time() - start < 120:
|
||||||
|
time.sleep(0.5)
|
||||||
|
sites[0].refresh()
|
||||||
|
assert sites[0].status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
|
||||||
|
# but the other sites and the job as a whole should still be crawling
|
||||||
|
sites[1].refresh()
|
||||||
|
assert sites[1].status == 'ACTIVE'
|
||||||
|
sites[2].refresh()
|
||||||
|
assert sites[2].status == 'ACTIVE'
|
||||||
|
job.refresh()
|
||||||
|
assert job.status == 'ACTIVE'
|
||||||
|
|
||||||
|
# request crawl stop for the job using the command line entrypoint
|
||||||
|
brozzler.cli.brozzler_stop_crawl([
|
||||||
|
'brozzler-stop-crawl', '--job=%s' % job.id])
|
||||||
|
job.refresh()
|
||||||
|
assert job.stop_requested
|
||||||
|
|
||||||
|
# stop request should be honored quickly
|
||||||
|
start = time.time()
|
||||||
|
while not job.status.startswith(
|
||||||
|
'FINISHED') and time.time() - start < 120:
|
||||||
|
time.sleep(0.5)
|
||||||
|
job.refresh()
|
||||||
|
assert job.status == 'FINISHED'
|
||||||
|
|
||||||
|
# the other sites should also be FINISHED_STOP_REQUESTED
|
||||||
|
sites[0].refresh()
|
||||||
|
assert sites[0].status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
sites[1].refresh()
|
||||||
|
assert sites[1].status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
sites[2].refresh()
|
||||||
|
assert sites[2].status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,8 @@ import argparse
|
||||||
import doublethink
|
import doublethink
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
|
import uuid
|
||||||
|
import pytest
|
||||||
|
|
||||||
args = argparse.Namespace()
|
args = argparse.Namespace()
|
||||||
args.log_level = logging.INFO
|
args.log_level = logging.INFO
|
||||||
|
@ -655,3 +657,44 @@ def test_hashtag_links():
|
||||||
assert pages[2].hashtags == ['#buh']
|
assert pages[2].hashtags == ['#buh']
|
||||||
assert pages[2].priority == 12
|
assert pages[2].priority == 12
|
||||||
|
|
||||||
|
def test_honor_stop_request():
|
||||||
|
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
|
# 1. test stop request on job
|
||||||
|
job_conf = {'seeds': [{'url': 'http://example.com'}]}
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
assert job.id
|
||||||
|
sites = list(frontier.job_sites(job.id))
|
||||||
|
assert len(sites) == 1
|
||||||
|
site = sites[0]
|
||||||
|
assert site.job_id == job.id
|
||||||
|
|
||||||
|
# does not raise exception
|
||||||
|
frontier.honor_stop_request(site)
|
||||||
|
|
||||||
|
# set job.stop_requested
|
||||||
|
job.stop_requested = datetime.datetime.utcnow().replace(
|
||||||
|
tzinfo=doublethink.UTC)
|
||||||
|
job.save()
|
||||||
|
with pytest.raises(brozzler.CrawlStopped):
|
||||||
|
frontier.honor_stop_request(site)
|
||||||
|
|
||||||
|
# 2. test stop request on site
|
||||||
|
job_conf = {'seeds': [{'url': 'http://example.com'}]}
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
assert job.id
|
||||||
|
sites = list(frontier.job_sites(job.id))
|
||||||
|
assert len(sites) == 1
|
||||||
|
site = sites[0]
|
||||||
|
assert site.job_id == job.id
|
||||||
|
|
||||||
|
# does not raise exception
|
||||||
|
frontier.honor_stop_request(site)
|
||||||
|
|
||||||
|
# set site.stop_requested
|
||||||
|
site.stop_requested = doublethink.utcnow()
|
||||||
|
site.save()
|
||||||
|
with pytest.raises(brozzler.CrawlStopped):
|
||||||
|
frontier.honor_stop_request(site)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue