mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-07-23 15:00:36 -04:00
move time limit enforcement
now it's next to stop request enforcement which makes more sense and supports more timely action
This commit is contained in:
parent
b438cdd33e
commit
d7512fbeb6
6 changed files with 47 additions and 17 deletions
|
@ -32,6 +32,9 @@ class CrawlStopped(Exception):
|
||||||
class ProxyError(Exception):
|
class ProxyError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class ReachedTimeLimit(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
class ReachedLimit(Exception):
|
class ReachedLimit(Exception):
|
||||||
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
|
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
|
||||||
import json
|
import json
|
||||||
|
|
|
@ -131,28 +131,21 @@ class RethinkDbFrontier:
|
||||||
sites.append(site)
|
sites.append(site)
|
||||||
if not sites:
|
if not sites:
|
||||||
raise brozzler.NothingToClaim
|
raise brozzler.NothingToClaim
|
||||||
# XXX This is the only place we enforce time limit for now. Worker
|
|
||||||
# loop should probably check time limit. Maybe frontier needs a
|
|
||||||
# housekeeping thread to ensure that time limits get enforced in a
|
|
||||||
# timely fashion.
|
|
||||||
for site in list(sites):
|
|
||||||
if self._enforce_time_limit(site):
|
|
||||||
sites.remove(site)
|
|
||||||
if sites:
|
if sites:
|
||||||
return sites
|
return sites
|
||||||
# else try again
|
# else try again
|
||||||
|
|
||||||
def _enforce_time_limit(self, site):
|
def enforce_time_limit(self, site):
|
||||||
|
'''
|
||||||
|
Raises `brozzler.ReachedTimeLimit` if appropriate.
|
||||||
|
'''
|
||||||
if (site.time_limit and site.time_limit > 0
|
if (site.time_limit and site.time_limit > 0
|
||||||
and (site.active_brozzling_time or 0) > site.time_limit):
|
and (site.active_brozzling_time or 0) > site.time_limit):
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"site FINISHED_TIME_LIMIT! time_limit=%s "
|
"site FINISHED_TIME_LIMIT! time_limit=%s "
|
||||||
"active_brozzling_time=%s %s", site.time_limit,
|
"active_brozzling_time=%s %s", site.time_limit,
|
||||||
site.active_brozzling_time, site)
|
site.active_brozzling_time, site)
|
||||||
self.finished(site, "FINISHED_TIME_LIMIT")
|
raise brozzler.ReachedTimeLimit
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def claim_page(self, site, worker_id):
|
def claim_page(self, site, worker_id):
|
||||||
# ignores the "claimed" field of the page, because only one
|
# ignores the "claimed" field of the page, because only one
|
||||||
|
|
|
@ -506,12 +506,12 @@ class BrozzlerWorker:
|
||||||
site.save()
|
site.save()
|
||||||
start = time.time()
|
start = time.time()
|
||||||
page = None
|
page = None
|
||||||
self._frontier.honor_stop_request(site)
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"brozzling site (proxy=%r) %r",
|
"brozzling site (proxy=%r) %r",
|
||||||
self._proxy_for(site), site)
|
self._proxy_for(site), site)
|
||||||
while time.time() - start < self.SITE_SESSION_MINUTES * 60:
|
while time.time() - start < self.SITE_SESSION_MINUTES * 60:
|
||||||
site.refresh()
|
site.refresh()
|
||||||
|
self._frontier.enforce_time_limit(site)
|
||||||
self._frontier.honor_stop_request(site)
|
self._frontier.honor_stop_request(site)
|
||||||
page = self._frontier.claim_page(site, "%s:%s" % (
|
page = self._frontier.claim_page(site, "%s:%s" % (
|
||||||
socket.gethostname(), browser.chrome.port))
|
socket.gethostname(), browser.chrome.port))
|
||||||
|
@ -539,6 +539,8 @@ class BrozzlerWorker:
|
||||||
self.logger.info("no pages left for site %s", site)
|
self.logger.info("no pages left for site %s", site)
|
||||||
except brozzler.ReachedLimit as e:
|
except brozzler.ReachedLimit as e:
|
||||||
self._frontier.reached_limit(site, e)
|
self._frontier.reached_limit(site, e)
|
||||||
|
except brozzler.ReachedTimeLimit as e:
|
||||||
|
self._frontier.finished(site, "FINISHED_TIME_LIMIT")
|
||||||
except brozzler.CrawlStopped:
|
except brozzler.CrawlStopped:
|
||||||
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
||||||
# except brozzler.browser.BrowsingAborted:
|
# except brozzler.browser.BrowsingAborted:
|
||||||
|
|
0
tests/test_brozzling.py
Executable file → Normal file
0
tests/test_brozzling.py
Executable file → Normal file
|
@ -762,3 +762,31 @@ def test_warcprox_outage_resiliency(httpd):
|
||||||
warcprox2_thread.join()
|
warcprox2_thread.join()
|
||||||
start_service('warcprox')
|
start_service('warcprox')
|
||||||
|
|
||||||
|
def test_time_limit(httpd):
|
||||||
|
test_id = 'test_time_limit-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
|
# create a new job with three sites that could be crawled forever
|
||||||
|
job_conf = {'seeds': [{
|
||||||
|
'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port,
|
||||||
|
'time_limit': 20}]}
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
assert job.id
|
||||||
|
|
||||||
|
sites = list(frontier.job_sites(job.id))
|
||||||
|
assert len(sites) == 1
|
||||||
|
site = sites[0]
|
||||||
|
|
||||||
|
# time limit should be enforced pretty soon
|
||||||
|
start = time.time()
|
||||||
|
while not sites[0].status.startswith(
|
||||||
|
'FINISHED') and time.time() - start < 120:
|
||||||
|
time.sleep(0.5)
|
||||||
|
sites[0].refresh()
|
||||||
|
assert sites[0].status == 'FINISHED_TIME_LIMIT'
|
||||||
|
|
||||||
|
# all sites finished so job should be finished too
|
||||||
|
job.refresh()
|
||||||
|
assert job.status == 'FINISHED'
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
test_frontier.py - fairly narrow tests of frontier management, requires
|
test_frontier.py - fairly narrow tests of frontier management, requires
|
||||||
rethinkdb running on localhost
|
rethinkdb running on localhost
|
||||||
|
|
||||||
Copyright (C) 2017 Internet Archive
|
Copyright (C) 2017-2018 Internet Archive
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
@ -380,7 +380,7 @@ def test_time_limit():
|
||||||
assert site.starts_and_stops[1]['stop'] is None
|
assert site.starts_and_stops[1]['stop'] is None
|
||||||
|
|
||||||
# time limit not reached yet
|
# time limit not reached yet
|
||||||
frontier._enforce_time_limit(site)
|
frontier.enforce_time_limit(site)
|
||||||
|
|
||||||
assert site.status == 'ACTIVE'
|
assert site.status == 'ACTIVE'
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
|
@ -392,7 +392,7 @@ def test_time_limit():
|
||||||
site.save()
|
site.save()
|
||||||
|
|
||||||
# time limit not reached yet
|
# time limit not reached yet
|
||||||
frontier._enforce_time_limit(site)
|
frontier.enforce_time_limit(site)
|
||||||
assert site.status == 'ACTIVE'
|
assert site.status == 'ACTIVE'
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
assert site.starts_and_stops[1]['start']
|
assert site.starts_and_stops[1]['start']
|
||||||
|
@ -400,7 +400,11 @@ def test_time_limit():
|
||||||
|
|
||||||
site.active_brozzling_time = 0.2 # this is why the time limit will be hit
|
site.active_brozzling_time = 0.2 # this is why the time limit will be hit
|
||||||
|
|
||||||
frontier._enforce_time_limit(site)
|
try:
|
||||||
|
frontier.enforce_time_limit(site)
|
||||||
|
except brozzler.ReachedTimeLimit:
|
||||||
|
frontier.finished(site, 'FINISHED_TIME_LIMIT')
|
||||||
|
|
||||||
assert site.status == 'FINISHED_TIME_LIMIT'
|
assert site.status == 'FINISHED_TIME_LIMIT'
|
||||||
assert not site.claimed
|
assert not site.claimed
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue