mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-04-19 07:15:52 -04:00
new job setting max_claimed_sites
Puts a cap on the number of sites belonging to a given job that can be brozzled simultaneously across the cluster. Addresses the problem of a job with many seeds starving out other jobs. For AITFIVE-1578.
This commit is contained in:
parent
d7512fbeb6
commit
f26d711a89
@ -1,7 +1,7 @@
|
||||
'''
|
||||
brozzler/frontier.py - RethinkDbFrontier manages crawl jobs, sites and pages
|
||||
|
||||
Copyright (C) 2014-2017 Internet Archive
|
||||
Copyright (C) 2014-2018 Internet Archive
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -93,47 +93,104 @@ class RethinkDbFrontier:
|
||||
raise UnexpectedDbResult("expected %r to be %r in %r" % (
|
||||
k, expected, result))
|
||||
|
||||
def claim_sites(self, n=1):
|
||||
# XXX keep track of aggregate priority and prioritize sites accordingly?
|
||||
while True:
|
||||
result = (
|
||||
self.rr.table("sites", read_mode="majority")
|
||||
.between(
|
||||
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
|
||||
index="sites_last_disclaimed")
|
||||
.order_by(index="sites_last_disclaimed")
|
||||
.filter((r.row["claimed"] != True) | (
|
||||
r.row["last_claimed"] < r.now() - 60*60))
|
||||
.limit(n)
|
||||
.update(
|
||||
# try to avoid a race condition resulting in multiple
|
||||
# brozzler-workers claiming the same site
|
||||
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
||||
r.branch((r.row["claimed"] != True) | (
|
||||
r.row["last_claimed"] < r.now() - 60*60), {
|
||||
"claimed": True,
|
||||
"last_claimed": doublethink.utcnow()}, {}),
|
||||
return_changes=True)).run()
|
||||
def _claimed_sites_by_job(self):
|
||||
'''
|
||||
Returns a dictionary that looks like this:
|
||||
{<job_id>: {'claimed_sites': 2, 'max_claimed_sites': 3}, ...}
|
||||
'''
|
||||
# js query: r.db('brozzler').table('sites').between(['ACTIVE', r.minval], ['ACTIVE', r.maxval], {'index': 'sites_last_disclaimed'}).eqJoin('job_id', r.db('brozzler').table('jobs')).group(function(x){return x('right')('id')}).ungroup().map(function(x){return {'job_id': x('group'), 'max_claimed_sites':x('reduction')('right')('max_claimed_sites')(0), 'claimed_sites':x('reduction')('left').filter({'claimed':true}).count()}})
|
||||
# returns results like:
|
||||
# [{'max_claimed_sites': 2, 'claimed_sites': 0, 'job_id': 1234},
|
||||
# {'max_claimed_sites': 3, 'claimed_sites': 3, 'job_id': 1235}]
|
||||
results = (
|
||||
self.rr.table('sites')
|
||||
.between(
|
||||
['ACTIVE', r.minval], ['ACTIVE', r.maxval],
|
||||
index='sites_last_disclaimed')
|
||||
.eq_join('job_id', r.db(self.rr.dbname).table('jobs'))
|
||||
.group(lambda x: x['right']['id'])
|
||||
.ungroup()
|
||||
.map(lambda x: {
|
||||
'job_id': x['group'],
|
||||
'max_claimed_sites': x['reduction']['right']['max_claimed_sites'][0],
|
||||
'claimed_sites': x['reduction']['left'].filter({'claimed':True}).count()
|
||||
})).run()
|
||||
xformed = {d['job_id']: {
|
||||
'claimed_sites': d['claimed_sites'],
|
||||
'max_claimed_sites': d['max_claimed_sites']
|
||||
} for d in results}
|
||||
return xformed
|
||||
|
||||
self._vet_result(
|
||||
result, replaced=list(range(n+1)),
|
||||
unchanged=list(range(n+1)))
|
||||
sites = []
|
||||
for i in range(result["replaced"]):
|
||||
if result["changes"][i]["old_val"]["claimed"]:
|
||||
self.logger.warn(
|
||||
"re-claimed site that was still marked 'claimed' "
|
||||
"because it was last claimed a long time ago "
|
||||
"at %s, and presumably some error stopped it from "
|
||||
"being disclaimed",
|
||||
result["changes"][i]["old_val"]["last_claimed"])
|
||||
site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
|
||||
sites.append(site)
|
||||
if not sites:
|
||||
raise brozzler.NothingToClaim
|
||||
if sites:
|
||||
return sites
|
||||
# else try again
|
||||
def claim_sites(self, n=1):
|
||||
'''
|
||||
What does this crazy method do? Well, let me break it down for you.
|
||||
It claims sites for a brozzler worker to brozzle. It does this in bulk
|
||||
to minimize contention over the "sites" table. It also respects the job
|
||||
setting `max_claimed_sites`. To accomplish this, first it claims all
|
||||
the currently unclaimed sites. If there are more than `n`, it puts
|
||||
some back (disclaims them). Also if it would exceed `max_claimed_sites`
|
||||
for a job, it disclaims some for that job. Finally it returns the
|
||||
results, or raises `brozzler.NothingToClaim` if there is nothing to
|
||||
claim.
|
||||
'''
|
||||
# need to do this before the next query
|
||||
claimed_sites_by_job = self._claimed_sites_by_job()
|
||||
|
||||
temp_claimed = (
|
||||
self.rr.table("sites", read_mode="majority")
|
||||
.between(
|
||||
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
|
||||
index="sites_last_disclaimed")
|
||||
.order_by(index="sites_last_disclaimed")
|
||||
.filter((r.row["claimed"] != True) | (
|
||||
r.row["last_claimed"] < r.now() - 60*60))
|
||||
.update(
|
||||
# try to avoid a race condition resulting in multiple
|
||||
# brozzler-workers claiming the same site
|
||||
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
||||
r.branch((r.row["claimed"] != True) | (
|
||||
r.row["last_claimed"] < r.now() - 60*60), {
|
||||
"claimed": True,
|
||||
"last_claimed": doublethink.utcnow()}, {}),
|
||||
return_changes=True)).run()
|
||||
# XXX vet result?
|
||||
|
||||
claiming_these_sites = []
|
||||
not_claiming_these_ids = []
|
||||
for i in range(temp_claimed["replaced"]):
|
||||
site = brozzler.Site(self.rr, temp_claimed["changes"][i]["new_val"])
|
||||
if len(claiming_these_sites) >= n:
|
||||
not_claiming_these_ids.append(site.id)
|
||||
continue
|
||||
if site.job_id in claimed_sites_by_job:
|
||||
d = claimed_sites_by_job[site.job_id]
|
||||
if d['claimed_sites'] + 1 > d['max_claimed_sites']:
|
||||
not_claiming_these_ids.append(site.id)
|
||||
continue
|
||||
|
||||
if temp_claimed["changes"][i]["old_val"]["claimed"]:
|
||||
self.logger.warn(
|
||||
"re-claimed site that was still marked 'claimed' "
|
||||
"because it was last claimed a long time ago "
|
||||
"at %s, and presumably some error stopped it from "
|
||||
"being disclaimed",
|
||||
temp_claimed["changes"][i]["old_val"]["last_claimed"])
|
||||
claiming_these_sites.append(site)
|
||||
if site.job_id in claimed_sites_by_job:
|
||||
claimed_sites_by_job[site.job_id]['claimed_sites'] += 1
|
||||
|
||||
if not_claiming_these_ids:
|
||||
logging.trace(
|
||||
'not claiming %s of %s candidate sites',
|
||||
len(not_claiming_these_ids), temp_claimed["replaced"])
|
||||
self.rr.table('sites')\
|
||||
.get_all(*not_claiming_these_ids)\
|
||||
.update({'claimed': False}).run()
|
||||
|
||||
if claiming_these_sites:
|
||||
return claiming_these_sites
|
||||
else:
|
||||
raise brozzler.NothingToClaim
|
||||
|
||||
def enforce_time_limit(self, site):
|
||||
'''
|
||||
|
@ -96,3 +96,7 @@ seeds:
|
||||
type: string
|
||||
|
||||
<<: *multi_level_options
|
||||
|
||||
max_claimed_sites:
|
||||
type: integer
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
brozzler/models.py - model classes representing jobs, sites, and pages, with
|
||||
related logic
|
||||
|
||||
Copyright (C) 2014-2017 Internet Archive
|
||||
Copyright (C) 2014-2018 Internet Archive
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -79,11 +79,14 @@ def new_job(frontier, job_conf):
|
||||
"started": doublethink.utcnow()})
|
||||
if "id" in job_conf:
|
||||
job.id = job_conf["id"]
|
||||
if "max_claimed_sites" in job_conf:
|
||||
job.max_claimed_sites = job_conf["max_claimed_sites"]
|
||||
job.save()
|
||||
|
||||
sites = []
|
||||
for seed_conf in job_conf["seeds"]:
|
||||
merged_conf = merge(seed_conf, job_conf)
|
||||
merged_conf.pop("max_claimed_sites", None)
|
||||
merged_conf.pop("seeds")
|
||||
merged_conf["job_id"] = job.id
|
||||
merged_conf["seed"] = merged_conf.pop("url")
|
||||
|
19
job-conf.rst
19
job-conf.rst
@ -13,6 +13,7 @@ an example
|
||||
id: myjob
|
||||
time_limit: 60 # seconds
|
||||
ignore_robots: false
|
||||
max_claimed_sites: 2
|
||||
warcprox_meta:
|
||||
warc-prefix: job1
|
||||
stats:
|
||||
@ -102,6 +103,17 @@ List of seeds. Each item in the list is a dictionary (associative array) which
|
||||
defines the seed. It must specify ``url`` (see below) and can additionally
|
||||
specify any of the settings of scope *seed-level*.
|
||||
|
||||
``max_claimed_sites``
|
||||
---------------------
|
||||
+-----------+--------+----------+---------+
|
||||
| scope | type | required | default |
|
||||
+===========+========+==========+=========+
|
||||
| top-level | number | no | *none* |
|
||||
+-----------+--------+----------+---------+
|
||||
Puts a cap on the number of sites belonging to a given job that can be brozzled
|
||||
simultaneously across the cluster. Addresses the problem of a job with many
|
||||
seeds starving out other jobs.
|
||||
|
||||
``url``
|
||||
-------
|
||||
+------------+--------+----------+---------+
|
||||
@ -113,6 +125,13 @@ The seed url.
|
||||
|
||||
``metadata``
|
||||
------------
|
||||
+-----------------------+------------+----------+---------+
|
||||
| scope | type | required | default |
|
||||
+=======================+============+==========+=========+
|
||||
| seed-level, top-level | dictionary | no | *none* |
|
||||
+-----------------------+------------+----------+---------+
|
||||
Arbitrary information about the crawl job or site. Merely informative, not used
|
||||
by brozzler for anything. Could be of use to some external process.
|
||||
|
||||
``time_limit``
|
||||
--------------
|
||||
|
@ -863,6 +863,46 @@ def test_claim_site():
|
||||
# clean up
|
||||
rr.table('sites').get(claimed_site.id).delete().run()
|
||||
|
||||
def test_max_claimed_sites():
|
||||
# max_claimed_sites is a brozzler job setting that puts a cap on the number
|
||||
# of the job's sites that can be brozzled simultaneously across the cluster
|
||||
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||
frontier = brozzler.RethinkDbFrontier(rr)
|
||||
|
||||
# clean slate
|
||||
rr.table('jobs').delete().run()
|
||||
rr.table('sites').delete().run()
|
||||
|
||||
job_conf = {
|
||||
'seeds': [
|
||||
{'url': 'http://example.com/1'},
|
||||
{'url': 'http://example.com/2'},
|
||||
{'url': 'http://example.com/3'},
|
||||
{'url': 'http://example.com/4'},
|
||||
{'url': 'http://example.com/5'},
|
||||
],
|
||||
'max_claimed_sites': 3,
|
||||
}
|
||||
|
||||
job = brozzler.new_job(frontier, job_conf)
|
||||
|
||||
assert job.id
|
||||
assert job.max_claimed_sites == 3
|
||||
|
||||
sites = list(frontier.job_sites(job.id))
|
||||
assert len(sites) == 5
|
||||
|
||||
claimed_sites = frontier.claim_sites(1)
|
||||
assert len(claimed_sites) == 1
|
||||
claimed_sites = frontier.claim_sites(3)
|
||||
assert len(claimed_sites) == 2
|
||||
with pytest.raises(brozzler.NothingToClaim):
|
||||
claimed_site = frontier.claim_sites(3)
|
||||
|
||||
# clean slate for the next one
|
||||
rr.table('jobs').delete().run()
|
||||
rr.table('sites').delete().run()
|
||||
|
||||
def test_choose_warcprox():
|
||||
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||
svcreg = doublethink.ServiceRegistry(rr)
|
||||
|
Loading…
x
Reference in New Issue
Block a user