mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 16:49:56 -05:00
reimplement max_claimed_sites
Other approach was too slow and caused db contention. New approach avoids (slow) rethinkdb join by max_claimed_sites job parameter to each of the job's sites. Uses rethinkdb fold() to count claimed sites and enforce max_claimed_sites within a single query.
This commit is contained in:
parent
4daac3dfc5
commit
dc00f5de32
@ -93,107 +93,60 @@ class RethinkDbFrontier:
|
|||||||
raise UnexpectedDbResult("expected %r to be %r in %r" % (
|
raise UnexpectedDbResult("expected %r to be %r in %r" % (
|
||||||
k, expected, result))
|
k, expected, result))
|
||||||
|
|
||||||
def _claimed_sites_by_job(self):
|
def claim_sites(self, n=1):
|
||||||
'''
|
result = (
|
||||||
Returns a dictionary that looks like this:
|
self.rr.table('sites').get_all(r.args(
|
||||||
{<job_id>: {'claimed_sites': 2, 'max_claimed_sites': 3}, ...}
|
r.db(self.rr.dbname).table('sites', read_mode='majority')
|
||||||
'''
|
|
||||||
# js query: r.db('brozzler').table('sites').between(['ACTIVE', r.minval], ['ACTIVE', r.maxval], {'index': 'sites_last_disclaimed'}).eqJoin('job_id', r.db('brozzler').table('jobs')).filter(function(x){return x('right').hasFields('max_claimed_sites')}).group(function(x){return x('right')('id')}).ungroup().map(function(x){return {'job_id': x('group'), 'max_claimed_sites':x('reduction')('right')('max_claimed_sites')(0), 'claimed_sites':x('reduction')('left').filter({'claimed':true}).count()}})
|
|
||||||
# returns results like:
|
|
||||||
# [{'max_claimed_sites': 2, 'claimed_sites': 0, 'job_id': 1234},
|
|
||||||
# {'max_claimed_sites': 3, 'claimed_sites': 3, 'job_id': 1235}]
|
|
||||||
results = (
|
|
||||||
self.rr.table('sites')
|
|
||||||
.between(
|
.between(
|
||||||
['ACTIVE', r.minval], ['ACTIVE', r.maxval],
|
['ACTIVE', r.minval], ['ACTIVE', r.maxval],
|
||||||
index='sites_last_disclaimed')
|
index='sites_last_disclaimed')
|
||||||
.eq_join('job_id', r.db(self.rr.dbname).table('jobs'))
|
.order_by(r.desc('claimed'), 'last_disclaimed')
|
||||||
.filter(lambda x: x['right'].has_fields('max_claimed_sites'))
|
.fold(
|
||||||
.group(lambda x: x['right']['id'])
|
{}, lambda acc, site: acc.merge(
|
||||||
.ungroup()
|
r.branch(
|
||||||
.map(lambda x: {
|
site.has_fields('job_id'),
|
||||||
'job_id': x['group'],
|
r.object(
|
||||||
'max_claimed_sites': x['reduction']['right']['max_claimed_sites'][0],
|
site['job_id'].coerce_to('string'),
|
||||||
'claimed_sites': x['reduction']['left'].filter({'claimed':True}).count()
|
acc[site['job_id'].coerce_to('string')].default(0).add(1)),
|
||||||
})).run()
|
{})),
|
||||||
|
emit=lambda acc, site, new_acc: r.branch(
|
||||||
xformed = {
|
r.and_(
|
||||||
d['job_id']: {
|
r.or_(
|
||||||
'claimed_sites': d['claimed_sites'],
|
site['claimed'].not_(),
|
||||||
'max_claimed_sites': d['max_claimed_sites']
|
site['last_claimed'].lt(r.now().sub(60*60))),
|
||||||
}
|
r.or_(
|
||||||
for d in results
|
site.has_fields('max_claimed_sites').not_(),
|
||||||
}
|
new_acc[site['job_id'].coerce_to('string')].le(site['max_claimed_sites']))),
|
||||||
return xformed
|
[site['id']], []))
|
||||||
|
.limit(n)))
|
||||||
def claim_sites(self, n=1):
|
|
||||||
'''
|
|
||||||
What does this crazy method do? Well, let me break it down for you.
|
|
||||||
It claims sites for a brozzler worker to brozzle. It does this in bulk
|
|
||||||
to minimize contention over the "sites" table. It also respects the job
|
|
||||||
setting `max_claimed_sites`. To accomplish this, first it claims all
|
|
||||||
the currently unclaimed sites. If there are more than `n`, it puts
|
|
||||||
some back (disclaims them). Also if it would exceed `max_claimed_sites`
|
|
||||||
for a job, it disclaims some for that job. Finally it returns the
|
|
||||||
results, or raises `brozzler.NothingToClaim` if there is nothing to
|
|
||||||
claim.
|
|
||||||
'''
|
|
||||||
# need to do this before the next query
|
|
||||||
claimed_sites_by_job = self._claimed_sites_by_job()
|
|
||||||
|
|
||||||
temp_claimed = (
|
|
||||||
self.rr.table("sites", read_mode="majority")
|
|
||||||
.between(
|
|
||||||
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
|
|
||||||
index="sites_last_disclaimed")
|
|
||||||
.order_by(index="sites_last_disclaimed")
|
|
||||||
.filter((r.row["claimed"] != True) | (
|
|
||||||
r.row["last_claimed"] < r.now() - 60*60))
|
|
||||||
.update(
|
.update(
|
||||||
# try to avoid a race condition resulting in multiple
|
# try to avoid a race condition resulting in multiple
|
||||||
# brozzler-workers claiming the same site
|
# brozzler-workers claiming the same site
|
||||||
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
||||||
r.branch((r.row["claimed"] != True) | (
|
r.branch(
|
||||||
r.row["last_claimed"] < r.now() - 60*60), {
|
r.or_(
|
||||||
"claimed": True,
|
r.row['claimed'].not_(),
|
||||||
"last_claimed": doublethink.utcnow()}, {}),
|
r.row['last_claimed'].lt(r.now().sub(60*60))),
|
||||||
|
{'claimed': True, 'last_claimed': r.now()},
|
||||||
|
{}),
|
||||||
return_changes=True)).run()
|
return_changes=True)).run()
|
||||||
# XXX vet result?
|
|
||||||
|
|
||||||
claiming_these_sites = []
|
self._vet_result(
|
||||||
not_claiming_these_ids = []
|
result, replaced=list(range(n+1)),
|
||||||
for i in range(temp_claimed["replaced"]):
|
unchanged=list(range(n+1)))
|
||||||
site = brozzler.Site(self.rr, temp_claimed["changes"][i]["new_val"])
|
sites = []
|
||||||
if len(claiming_these_sites) >= n:
|
for i in range(result["replaced"]):
|
||||||
not_claiming_these_ids.append(site.id)
|
if result["changes"][i]["old_val"]["claimed"]:
|
||||||
continue
|
|
||||||
if site.job_id in claimed_sites_by_job:
|
|
||||||
d = claimed_sites_by_job[site.job_id]
|
|
||||||
if d['claimed_sites'] + 1 > d['max_claimed_sites']:
|
|
||||||
not_claiming_these_ids.append(site.id)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if temp_claimed["changes"][i]["old_val"]["claimed"]:
|
|
||||||
self.logger.warn(
|
self.logger.warn(
|
||||||
"re-claimed site that was still marked 'claimed' "
|
"re-claimed site that was still marked 'claimed' "
|
||||||
"because it was last claimed a long time ago "
|
"because it was last claimed a long time ago "
|
||||||
"at %s, and presumably some error stopped it from "
|
"at %s, and presumably some error stopped it from "
|
||||||
"being disclaimed",
|
"being disclaimed",
|
||||||
temp_claimed["changes"][i]["old_val"]["last_claimed"])
|
result["changes"][i]["old_val"]["last_claimed"])
|
||||||
claiming_these_sites.append(site)
|
site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
|
||||||
if site.job_id in claimed_sites_by_job:
|
sites.append(site)
|
||||||
claimed_sites_by_job[site.job_id]['claimed_sites'] += 1
|
if sites:
|
||||||
|
return sites
|
||||||
if not_claiming_these_ids:
|
|
||||||
logging.trace(
|
|
||||||
'not claiming %s of %s candidate sites',
|
|
||||||
len(not_claiming_these_ids), temp_claimed["replaced"])
|
|
||||||
self.rr.table('sites')\
|
|
||||||
.get_all(*not_claiming_these_ids)\
|
|
||||||
.update({'claimed': False}).run()
|
|
||||||
|
|
||||||
if claiming_these_sites:
|
|
||||||
return claiming_these_sites
|
|
||||||
else:
|
else:
|
||||||
raise brozzler.NothingToClaim
|
raise brozzler.NothingToClaim
|
||||||
|
|
||||||
|
@ -86,7 +86,6 @@ def new_job(frontier, job_conf):
|
|||||||
sites = []
|
sites = []
|
||||||
for seed_conf in job_conf["seeds"]:
|
for seed_conf in job_conf["seeds"]:
|
||||||
merged_conf = merge(seed_conf, job_conf)
|
merged_conf = merge(seed_conf, job_conf)
|
||||||
merged_conf.pop("max_claimed_sites", None)
|
|
||||||
merged_conf.pop("seeds")
|
merged_conf.pop("seeds")
|
||||||
merged_conf["job_id"] = job.id
|
merged_conf["job_id"] = job.id
|
||||||
merged_conf["seed"] = merged_conf.pop("url")
|
merged_conf["seed"] = merged_conf.pop("url")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user