mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-08-12 00:10:28 -04:00
Merge branch 'max-claimed-sites' into qa
* max-claimed-sites: new job setting max_claimed_sites move time limit enforcement Invalid syntax in WebsockReceiverThread._javascript_dialog_open Make Browser._wait_for sleep time a varible Send more compact JSON to browser Remove google safebrowsing flags try to get chromium 64? (#92) Add chromium CLI flags to improve capture performance
This commit is contained in:
commit
5a0700b297
12 changed files with 223 additions and 61 deletions
|
@ -10,9 +10,15 @@ before_install:
|
||||||
install:
|
install:
|
||||||
- ansible-playbook --extra-vars="brozzler_pip_name=file://$TRAVIS_BUILD_DIR#egg=brozzler user=travis" --inventory-file=ansible/hosts-localhost ansible/playbook.yml
|
- ansible-playbook --extra-vars="brozzler_pip_name=file://$TRAVIS_BUILD_DIR#egg=brozzler user=travis" --inventory-file=ansible/hosts-localhost ansible/playbook.yml
|
||||||
- pip install $TRAVIS_BUILD_DIR 'warcprox>=2.4b1' pytest
|
- pip install $TRAVIS_BUILD_DIR 'warcprox>=2.4b1' pytest
|
||||||
|
- chromium-browser --version
|
||||||
|
- sudo apt-get update
|
||||||
|
- sudo apt-get install --only-upgrade chromium-browser
|
||||||
|
- chromium-browser --version
|
||||||
|
- sudo service brozzler-worker restart
|
||||||
script:
|
script:
|
||||||
- DISPLAY=:1 py.test -v tests
|
- DISPLAY=:1 py.test -v tests
|
||||||
after_failure:
|
after_failure:
|
||||||
|
- chromium-browser --version
|
||||||
- sudo cat /var/log/upstart/warcprox.log
|
- sudo cat /var/log/upstart/warcprox.log
|
||||||
- sudo cat /var/log/upstart/brozzler-worker.log
|
- sudo cat /var/log/upstart/brozzler-worker.log
|
||||||
- sudo cat /var/log/upstart/pywb.log
|
- sudo cat /var/log/upstart/pywb.log
|
||||||
|
|
|
@ -32,6 +32,9 @@ class CrawlStopped(Exception):
|
||||||
class ProxyError(Exception):
|
class ProxyError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class ReachedTimeLimit(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
class ReachedLimit(Exception):
|
class ReachedLimit(Exception):
|
||||||
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
|
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
|
||||||
import json
|
import json
|
||||||
|
|
|
@ -228,7 +228,7 @@ class WebsockReceiverThread(threading.Thread):
|
||||||
self.websock.send(
|
self.websock.send(
|
||||||
json.dumps(dict(
|
json.dumps(dict(
|
||||||
id=0, method='Page.handleJavaScriptDialog',
|
id=0, method='Page.handleJavaScriptDialog',
|
||||||
params={'accept': accept})))
|
params={'accept': accept}), separators=',:'))
|
||||||
|
|
||||||
def _handle_message(self, websock, json_message):
|
def _handle_message(self, websock, json_message):
|
||||||
message = json.loads(json_message)
|
message = json.loads(json_message)
|
||||||
|
@ -288,6 +288,7 @@ class Browser:
|
||||||
self.websock_thread = None
|
self.websock_thread = None
|
||||||
self.is_browsing = False
|
self.is_browsing = False
|
||||||
self._command_id = Counter()
|
self._command_id = Counter()
|
||||||
|
self._wait_interval = 0.5
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.start()
|
self.start()
|
||||||
|
@ -309,12 +310,12 @@ class Browser:
|
||||||
raise BrowsingTimeout(
|
raise BrowsingTimeout(
|
||||||
'timed out after %.1fs waiting for: %s' % (
|
'timed out after %.1fs waiting for: %s' % (
|
||||||
elapsed, callback))
|
elapsed, callback))
|
||||||
brozzler.sleep(0.5)
|
brozzler.sleep(self._wait_interval)
|
||||||
|
|
||||||
def send_to_chrome(self, suppress_logging=False, **kwargs):
|
def send_to_chrome(self, suppress_logging=False, **kwargs):
|
||||||
msg_id = next(self._command_id)
|
msg_id = next(self._command_id)
|
||||||
kwargs['id'] = msg_id
|
kwargs['id'] = msg_id
|
||||||
msg = json.dumps(kwargs)
|
msg = json.dumps(kwargs, separators=',:')
|
||||||
logging.log(
|
logging.log(
|
||||||
brozzler.TRACE if suppress_logging else logging.DEBUG,
|
brozzler.TRACE if suppress_logging else logging.DEBUG,
|
||||||
'sending message to %s: %s', self.websock, msg)
|
'sending message to %s: %s', self.websock, msg)
|
||||||
|
|
|
@ -156,6 +156,8 @@ class Chrome:
|
||||||
'--use-mock-keychain', # mac thing
|
'--use-mock-keychain', # mac thing
|
||||||
'--user-data-dir=%s' % self._chrome_user_data_dir,
|
'--user-data-dir=%s' % self._chrome_user_data_dir,
|
||||||
'--disable-background-networking',
|
'--disable-background-networking',
|
||||||
|
'--disable-renderer-backgrounding', '--disable-hang-monitor',
|
||||||
|
'--disable-background-timer-throttling', '--mute-audio',
|
||||||
'--disable-web-sockets', '--disable-cache',
|
'--disable-web-sockets', '--disable-cache',
|
||||||
'--window-size=1100,900', '--no-default-browser-check',
|
'--window-size=1100,900', '--no-default-browser-check',
|
||||||
'--disable-first-run-ui', '--no-first-run',
|
'--disable-first-run-ui', '--no-first-run',
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
'''
|
'''
|
||||||
brozzler/frontier.py - RethinkDbFrontier manages crawl jobs, sites and pages
|
brozzler/frontier.py - RethinkDbFrontier manages crawl jobs, sites and pages
|
||||||
|
|
||||||
Copyright (C) 2014-2017 Internet Archive
|
Copyright (C) 2014-2018 Internet Archive
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
@ -93,66 +93,116 @@ class RethinkDbFrontier:
|
||||||
raise UnexpectedDbResult("expected %r to be %r in %r" % (
|
raise UnexpectedDbResult("expected %r to be %r in %r" % (
|
||||||
k, expected, result))
|
k, expected, result))
|
||||||
|
|
||||||
|
def _claimed_sites_by_job(self):
|
||||||
|
'''
|
||||||
|
Returns a dictionary that looks like this:
|
||||||
|
{<job_id>: {'claimed_sites': 2, 'max_claimed_sites': 3}, ...}
|
||||||
|
'''
|
||||||
|
# js query: r.db('brozzler').table('sites').between(['ACTIVE', r.minval], ['ACTIVE', r.maxval], {'index': 'sites_last_disclaimed'}).eqJoin('job_id', r.db('brozzler').table('jobs')).group(function(x){return x('right')('id')}).ungroup().map(function(x){return {'job_id': x('group'), 'max_claimed_sites':x('reduction')('right')('max_claimed_sites')(0), 'claimed_sites':x('reduction')('left').filter({'claimed':true}).count()}})
|
||||||
|
# returns results like:
|
||||||
|
# [{'max_claimed_sites': 2, 'claimed_sites': 0, 'job_id': 1234},
|
||||||
|
# {'max_claimed_sites': 3, 'claimed_sites': 3, 'job_id': 1235}]
|
||||||
|
results = (
|
||||||
|
self.rr.table('sites')
|
||||||
|
.between(
|
||||||
|
['ACTIVE', r.minval], ['ACTIVE', r.maxval],
|
||||||
|
index='sites_last_disclaimed')
|
||||||
|
.eq_join('job_id', r.db(self.rr.dbname).table('jobs'))
|
||||||
|
.group(lambda x: x['right']['id'])
|
||||||
|
.ungroup()
|
||||||
|
.map(lambda x: {
|
||||||
|
'job_id': x['group'],
|
||||||
|
'max_claimed_sites': x['reduction']['right']['max_claimed_sites'][0],
|
||||||
|
'claimed_sites': x['reduction']['left'].filter({'claimed':True}).count()
|
||||||
|
})).run()
|
||||||
|
xformed = {d['job_id']: {
|
||||||
|
'claimed_sites': d['claimed_sites'],
|
||||||
|
'max_claimed_sites': d['max_claimed_sites']
|
||||||
|
} for d in results}
|
||||||
|
return xformed
|
||||||
|
|
||||||
def claim_sites(self, n=1):
|
def claim_sites(self, n=1):
|
||||||
# XXX keep track of aggregate priority and prioritize sites accordingly?
|
'''
|
||||||
while True:
|
What does this crazy method do? Well, let me break it down for you.
|
||||||
result = (
|
It claims sites for a brozzler worker to brozzle. It does this in bulk
|
||||||
self.rr.table("sites", read_mode="majority")
|
to minimize contention over the "sites" table. It also respects the job
|
||||||
.between(
|
setting `max_claimed_sites`. To accomplish this, first it claims all
|
||||||
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
|
the currently unclaimed sites. If there are more than `n`, it puts
|
||||||
index="sites_last_disclaimed")
|
some back (disclaims them). Also if it would exceed `max_claimed_sites`
|
||||||
.order_by(index="sites_last_disclaimed")
|
for a job, it disclaims some for that job. Finally it returns the
|
||||||
.filter((r.row["claimed"] != True) | (
|
results, or raises `brozzler.NothingToClaim` if there is nothing to
|
||||||
r.row["last_claimed"] < r.now() - 60*60))
|
claim.
|
||||||
.limit(n)
|
'''
|
||||||
.update(
|
# need to do this before the next query
|
||||||
# try to avoid a race condition resulting in multiple
|
claimed_sites_by_job = self._claimed_sites_by_job()
|
||||||
# brozzler-workers claiming the same site
|
|
||||||
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
|
||||||
r.branch((r.row["claimed"] != True) | (
|
|
||||||
r.row["last_claimed"] < r.now() - 60*60), {
|
|
||||||
"claimed": True,
|
|
||||||
"last_claimed": doublethink.utcnow()}, {}),
|
|
||||||
return_changes=True)).run()
|
|
||||||
|
|
||||||
self._vet_result(
|
temp_claimed = (
|
||||||
result, replaced=list(range(n+1)),
|
self.rr.table("sites", read_mode="majority")
|
||||||
unchanged=list(range(n+1)))
|
.between(
|
||||||
sites = []
|
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
|
||||||
for i in range(result["replaced"]):
|
index="sites_last_disclaimed")
|
||||||
if result["changes"][i]["old_val"]["claimed"]:
|
.order_by(index="sites_last_disclaimed")
|
||||||
self.logger.warn(
|
.filter((r.row["claimed"] != True) | (
|
||||||
"re-claimed site that was still marked 'claimed' "
|
r.row["last_claimed"] < r.now() - 60*60))
|
||||||
"because it was last claimed a long time ago "
|
.update(
|
||||||
"at %s, and presumably some error stopped it from "
|
# try to avoid a race condition resulting in multiple
|
||||||
"being disclaimed",
|
# brozzler-workers claiming the same site
|
||||||
result["changes"][i]["old_val"]["last_claimed"])
|
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
||||||
site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
|
r.branch((r.row["claimed"] != True) | (
|
||||||
sites.append(site)
|
r.row["last_claimed"] < r.now() - 60*60), {
|
||||||
if not sites:
|
"claimed": True,
|
||||||
raise brozzler.NothingToClaim
|
"last_claimed": doublethink.utcnow()}, {}),
|
||||||
# XXX This is the only place we enforce time limit for now. Worker
|
return_changes=True)).run()
|
||||||
# loop should probably check time limit. Maybe frontier needs a
|
# XXX vet result?
|
||||||
# housekeeping thread to ensure that time limits get enforced in a
|
|
||||||
# timely fashion.
|
|
||||||
for site in list(sites):
|
|
||||||
if self._enforce_time_limit(site):
|
|
||||||
sites.remove(site)
|
|
||||||
if sites:
|
|
||||||
return sites
|
|
||||||
# else try again
|
|
||||||
|
|
||||||
def _enforce_time_limit(self, site):
|
claiming_these_sites = []
|
||||||
|
not_claiming_these_ids = []
|
||||||
|
for i in range(temp_claimed["replaced"]):
|
||||||
|
site = brozzler.Site(self.rr, temp_claimed["changes"][i]["new_val"])
|
||||||
|
if len(claiming_these_sites) >= n:
|
||||||
|
not_claiming_these_ids.append(site.id)
|
||||||
|
continue
|
||||||
|
if site.job_id in claimed_sites_by_job:
|
||||||
|
d = claimed_sites_by_job[site.job_id]
|
||||||
|
if d['claimed_sites'] + 1 > d['max_claimed_sites']:
|
||||||
|
not_claiming_these_ids.append(site.id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if temp_claimed["changes"][i]["old_val"]["claimed"]:
|
||||||
|
self.logger.warn(
|
||||||
|
"re-claimed site that was still marked 'claimed' "
|
||||||
|
"because it was last claimed a long time ago "
|
||||||
|
"at %s, and presumably some error stopped it from "
|
||||||
|
"being disclaimed",
|
||||||
|
temp_claimed["changes"][i]["old_val"]["last_claimed"])
|
||||||
|
claiming_these_sites.append(site)
|
||||||
|
if site.job_id in claimed_sites_by_job:
|
||||||
|
claimed_sites_by_job[site.job_id]['claimed_sites'] += 1
|
||||||
|
|
||||||
|
if not_claiming_these_ids:
|
||||||
|
logging.trace(
|
||||||
|
'not claiming %s of %s candidate sites',
|
||||||
|
len(not_claiming_these_ids), temp_claimed["replaced"])
|
||||||
|
self.rr.table('sites')\
|
||||||
|
.get_all(*not_claiming_these_ids)\
|
||||||
|
.update({'claimed': False}).run()
|
||||||
|
|
||||||
|
if claiming_these_sites:
|
||||||
|
return claiming_these_sites
|
||||||
|
else:
|
||||||
|
raise brozzler.NothingToClaim
|
||||||
|
|
||||||
|
def enforce_time_limit(self, site):
|
||||||
|
'''
|
||||||
|
Raises `brozzler.ReachedTimeLimit` if appropriate.
|
||||||
|
'''
|
||||||
if (site.time_limit and site.time_limit > 0
|
if (site.time_limit and site.time_limit > 0
|
||||||
and (site.active_brozzling_time or 0) > site.time_limit):
|
and (site.active_brozzling_time or 0) > site.time_limit):
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"site FINISHED_TIME_LIMIT! time_limit=%s "
|
"site FINISHED_TIME_LIMIT! time_limit=%s "
|
||||||
"active_brozzling_time=%s %s", site.time_limit,
|
"active_brozzling_time=%s %s", site.time_limit,
|
||||||
site.active_brozzling_time, site)
|
site.active_brozzling_time, site)
|
||||||
self.finished(site, "FINISHED_TIME_LIMIT")
|
raise brozzler.ReachedTimeLimit
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def claim_page(self, site, worker_id):
|
def claim_page(self, site, worker_id):
|
||||||
# ignores the "claimed" field of the page, because only one
|
# ignores the "claimed" field of the page, because only one
|
||||||
|
|
|
@ -96,3 +96,7 @@ seeds:
|
||||||
type: string
|
type: string
|
||||||
|
|
||||||
<<: *multi_level_options
|
<<: *multi_level_options
|
||||||
|
|
||||||
|
max_claimed_sites:
|
||||||
|
type: integer
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
brozzler/models.py - model classes representing jobs, sites, and pages, with
|
brozzler/models.py - model classes representing jobs, sites, and pages, with
|
||||||
related logic
|
related logic
|
||||||
|
|
||||||
Copyright (C) 2014-2017 Internet Archive
|
Copyright (C) 2014-2018 Internet Archive
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
@ -79,11 +79,14 @@ def new_job(frontier, job_conf):
|
||||||
"started": doublethink.utcnow()})
|
"started": doublethink.utcnow()})
|
||||||
if "id" in job_conf:
|
if "id" in job_conf:
|
||||||
job.id = job_conf["id"]
|
job.id = job_conf["id"]
|
||||||
|
if "max_claimed_sites" in job_conf:
|
||||||
|
job.max_claimed_sites = job_conf["max_claimed_sites"]
|
||||||
job.save()
|
job.save()
|
||||||
|
|
||||||
sites = []
|
sites = []
|
||||||
for seed_conf in job_conf["seeds"]:
|
for seed_conf in job_conf["seeds"]:
|
||||||
merged_conf = merge(seed_conf, job_conf)
|
merged_conf = merge(seed_conf, job_conf)
|
||||||
|
merged_conf.pop("max_claimed_sites", None)
|
||||||
merged_conf.pop("seeds")
|
merged_conf.pop("seeds")
|
||||||
merged_conf["job_id"] = job.id
|
merged_conf["job_id"] = job.id
|
||||||
merged_conf["seed"] = merged_conf.pop("url")
|
merged_conf["seed"] = merged_conf.pop("url")
|
||||||
|
|
|
@ -507,12 +507,12 @@ class BrozzlerWorker:
|
||||||
site.save()
|
site.save()
|
||||||
start = time.time()
|
start = time.time()
|
||||||
page = None
|
page = None
|
||||||
self._frontier.honor_stop_request(site)
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"brozzling site (proxy=%r) %r",
|
"brozzling site (proxy=%r) %r",
|
||||||
self._proxy_for(site), site)
|
self._proxy_for(site), site)
|
||||||
while time.time() - start < self.SITE_SESSION_MINUTES * 60:
|
while time.time() - start < self.SITE_SESSION_MINUTES * 60:
|
||||||
site.refresh()
|
site.refresh()
|
||||||
|
self._frontier.enforce_time_limit(site)
|
||||||
self._frontier.honor_stop_request(site)
|
self._frontier.honor_stop_request(site)
|
||||||
page = self._frontier.claim_page(site, "%s:%s" % (
|
page = self._frontier.claim_page(site, "%s:%s" % (
|
||||||
socket.gethostname(), browser.chrome.port))
|
socket.gethostname(), browser.chrome.port))
|
||||||
|
@ -540,6 +540,8 @@ class BrozzlerWorker:
|
||||||
self.logger.info("no pages left for site %s", site)
|
self.logger.info("no pages left for site %s", site)
|
||||||
except brozzler.ReachedLimit as e:
|
except brozzler.ReachedLimit as e:
|
||||||
self._frontier.reached_limit(site, e)
|
self._frontier.reached_limit(site, e)
|
||||||
|
except brozzler.ReachedTimeLimit as e:
|
||||||
|
self._frontier.finished(site, "FINISHED_TIME_LIMIT")
|
||||||
except brozzler.CrawlStopped:
|
except brozzler.CrawlStopped:
|
||||||
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
|
||||||
# except brozzler.browser.BrowsingAborted:
|
# except brozzler.browser.BrowsingAborted:
|
||||||
|
|
19
job-conf.rst
19
job-conf.rst
|
@ -13,6 +13,7 @@ an example
|
||||||
id: myjob
|
id: myjob
|
||||||
time_limit: 60 # seconds
|
time_limit: 60 # seconds
|
||||||
ignore_robots: false
|
ignore_robots: false
|
||||||
|
max_claimed_sites: 2
|
||||||
warcprox_meta:
|
warcprox_meta:
|
||||||
warc-prefix: job1
|
warc-prefix: job1
|
||||||
stats:
|
stats:
|
||||||
|
@ -102,6 +103,17 @@ List of seeds. Each item in the list is a dictionary (associative array) which
|
||||||
defines the seed. It must specify ``url`` (see below) and can additionally
|
defines the seed. It must specify ``url`` (see below) and can additionally
|
||||||
specify any of the settings of scope *seed-level*.
|
specify any of the settings of scope *seed-level*.
|
||||||
|
|
||||||
|
``max_claimed_sites``
|
||||||
|
---------------------
|
||||||
|
+-----------+--------+----------+---------+
|
||||||
|
| scope | type | required | default |
|
||||||
|
+===========+========+==========+=========+
|
||||||
|
| top-level | number | no | *none* |
|
||||||
|
+-----------+--------+----------+---------+
|
||||||
|
Puts a cap on the number of sites belonging to a given job that can be brozzled
|
||||||
|
simultaneously across the cluster. Addresses the problem of a job with many
|
||||||
|
seeds starving out other jobs.
|
||||||
|
|
||||||
``url``
|
``url``
|
||||||
-------
|
-------
|
||||||
+------------+--------+----------+---------+
|
+------------+--------+----------+---------+
|
||||||
|
@ -113,6 +125,13 @@ The seed url.
|
||||||
|
|
||||||
``metadata``
|
``metadata``
|
||||||
------------
|
------------
|
||||||
|
+-----------------------+------------+----------+---------+
|
||||||
|
| scope | type | required | default |
|
||||||
|
+=======================+============+==========+=========+
|
||||||
|
| seed-level, top-level | dictionary | no | *none* |
|
||||||
|
+-----------------------+------------+----------+---------+
|
||||||
|
Arbitrary information about the crawl job or site. Merely informative, not used
|
||||||
|
by brozzler for anything. Could be of use to some external process.
|
||||||
|
|
||||||
``time_limit``
|
``time_limit``
|
||||||
--------------
|
--------------
|
||||||
|
|
0
tests/test_brozzling.py
Executable file → Normal file
0
tests/test_brozzling.py
Executable file → Normal file
|
@ -762,3 +762,31 @@ def test_warcprox_outage_resiliency(httpd):
|
||||||
warcprox2_thread.join()
|
warcprox2_thread.join()
|
||||||
start_service('warcprox')
|
start_service('warcprox')
|
||||||
|
|
||||||
|
def test_time_limit(httpd):
|
||||||
|
test_id = 'test_time_limit-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
|
# create a new job with three sites that could be crawled forever
|
||||||
|
job_conf = {'seeds': [{
|
||||||
|
'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port,
|
||||||
|
'time_limit': 20}]}
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
assert job.id
|
||||||
|
|
||||||
|
sites = list(frontier.job_sites(job.id))
|
||||||
|
assert len(sites) == 1
|
||||||
|
site = sites[0]
|
||||||
|
|
||||||
|
# time limit should be enforced pretty soon
|
||||||
|
start = time.time()
|
||||||
|
while not sites[0].status.startswith(
|
||||||
|
'FINISHED') and time.time() - start < 120:
|
||||||
|
time.sleep(0.5)
|
||||||
|
sites[0].refresh()
|
||||||
|
assert sites[0].status == 'FINISHED_TIME_LIMIT'
|
||||||
|
|
||||||
|
# all sites finished so job should be finished too
|
||||||
|
job.refresh()
|
||||||
|
assert job.status == 'FINISHED'
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
test_frontier.py - fairly narrow tests of frontier management, requires
|
test_frontier.py - fairly narrow tests of frontier management, requires
|
||||||
rethinkdb running on localhost
|
rethinkdb running on localhost
|
||||||
|
|
||||||
Copyright (C) 2017 Internet Archive
|
Copyright (C) 2017-2018 Internet Archive
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
@ -380,7 +380,7 @@ def test_time_limit():
|
||||||
assert site.starts_and_stops[1]['stop'] is None
|
assert site.starts_and_stops[1]['stop'] is None
|
||||||
|
|
||||||
# time limit not reached yet
|
# time limit not reached yet
|
||||||
frontier._enforce_time_limit(site)
|
frontier.enforce_time_limit(site)
|
||||||
|
|
||||||
assert site.status == 'ACTIVE'
|
assert site.status == 'ACTIVE'
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
|
@ -392,7 +392,7 @@ def test_time_limit():
|
||||||
site.save()
|
site.save()
|
||||||
|
|
||||||
# time limit not reached yet
|
# time limit not reached yet
|
||||||
frontier._enforce_time_limit(site)
|
frontier.enforce_time_limit(site)
|
||||||
assert site.status == 'ACTIVE'
|
assert site.status == 'ACTIVE'
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
assert site.starts_and_stops[1]['start']
|
assert site.starts_and_stops[1]['start']
|
||||||
|
@ -400,7 +400,11 @@ def test_time_limit():
|
||||||
|
|
||||||
site.active_brozzling_time = 0.2 # this is why the time limit will be hit
|
site.active_brozzling_time = 0.2 # this is why the time limit will be hit
|
||||||
|
|
||||||
frontier._enforce_time_limit(site)
|
try:
|
||||||
|
frontier.enforce_time_limit(site)
|
||||||
|
except brozzler.ReachedTimeLimit:
|
||||||
|
frontier.finished(site, 'FINISHED_TIME_LIMIT')
|
||||||
|
|
||||||
assert site.status == 'FINISHED_TIME_LIMIT'
|
assert site.status == 'FINISHED_TIME_LIMIT'
|
||||||
assert not site.claimed
|
assert not site.claimed
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
|
@ -859,6 +863,46 @@ def test_claim_site():
|
||||||
# clean up
|
# clean up
|
||||||
rr.table('sites').get(claimed_site.id).delete().run()
|
rr.table('sites').get(claimed_site.id).delete().run()
|
||||||
|
|
||||||
|
def test_max_claimed_sites():
|
||||||
|
# max_claimed_sites is a brozzler job setting that puts a cap on the number
|
||||||
|
# of the job's sites that can be brozzled simultaneously across the cluster
|
||||||
|
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
|
# clean slate
|
||||||
|
rr.table('jobs').delete().run()
|
||||||
|
rr.table('sites').delete().run()
|
||||||
|
|
||||||
|
job_conf = {
|
||||||
|
'seeds': [
|
||||||
|
{'url': 'http://example.com/1'},
|
||||||
|
{'url': 'http://example.com/2'},
|
||||||
|
{'url': 'http://example.com/3'},
|
||||||
|
{'url': 'http://example.com/4'},
|
||||||
|
{'url': 'http://example.com/5'},
|
||||||
|
],
|
||||||
|
'max_claimed_sites': 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
|
||||||
|
assert job.id
|
||||||
|
assert job.max_claimed_sites == 3
|
||||||
|
|
||||||
|
sites = list(frontier.job_sites(job.id))
|
||||||
|
assert len(sites) == 5
|
||||||
|
|
||||||
|
claimed_sites = frontier.claim_sites(1)
|
||||||
|
assert len(claimed_sites) == 1
|
||||||
|
claimed_sites = frontier.claim_sites(3)
|
||||||
|
assert len(claimed_sites) == 2
|
||||||
|
with pytest.raises(brozzler.NothingToClaim):
|
||||||
|
claimed_site = frontier.claim_sites(3)
|
||||||
|
|
||||||
|
# clean slate for the next one
|
||||||
|
rr.table('jobs').delete().run()
|
||||||
|
rr.table('sites').delete().run()
|
||||||
|
|
||||||
def test_choose_warcprox():
|
def test_choose_warcprox():
|
||||||
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||||
svcreg = doublethink.ServiceRegistry(rr)
|
svcreg = doublethink.ServiceRegistry(rr)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue