From dc00f5de321e00e057aaa334218b67ba92cea75d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 15 Mar 2018 12:57:49 -0700 Subject: [PATCH] reimplement max_claimed_sites Other approach was too slow and caused db contention. New approach avoids (slow) rethinkdb join by max_claimed_sites job parameter to each of the job's sites. Uses rethinkdb fold() to count claimed sites and enforce max_claimed_sites within a single query. --- brozzler/frontier.py | 137 ++++++++++++++----------------------------- brozzler/model.py | 1 - 2 files changed, 45 insertions(+), 93 deletions(-) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 1160f3c..a0f8ab4 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -93,107 +93,60 @@ class RethinkDbFrontier: raise UnexpectedDbResult("expected %r to be %r in %r" % ( k, expected, result)) - def _claimed_sites_by_job(self): - ''' - Returns a dictionary that looks like this: - {: {'claimed_sites': 2, 'max_claimed_sites': 3}, ...} - ''' - # js query: r.db('brozzler').table('sites').between(['ACTIVE', r.minval], ['ACTIVE', r.maxval], {'index': 'sites_last_disclaimed'}).eqJoin('job_id', r.db('brozzler').table('jobs')).filter(function(x){return x('right').hasFields('max_claimed_sites')}).group(function(x){return x('right')('id')}).ungroup().map(function(x){return {'job_id': x('group'), 'max_claimed_sites':x('reduction')('right')('max_claimed_sites')(0), 'claimed_sites':x('reduction')('left').filter({'claimed':true}).count()}}) - # returns results like: - # [{'max_claimed_sites': 2, 'claimed_sites': 0, 'job_id': 1234}, - # {'max_claimed_sites': 3, 'claimed_sites': 3, 'job_id': 1235}] - results = ( - self.rr.table('sites') + def claim_sites(self, n=1): + result = ( + self.rr.table('sites').get_all(r.args( + r.db(self.rr.dbname).table('sites', read_mode='majority') .between( ['ACTIVE', r.minval], ['ACTIVE', r.maxval], index='sites_last_disclaimed') - .eq_join('job_id', r.db(self.rr.dbname).table('jobs')) - .filter(lambda x: x['right'].has_fields('max_claimed_sites')) - .group(lambda x: x['right']['id']) - .ungroup() - .map(lambda x: { - 'job_id': x['group'], - 'max_claimed_sites': x['reduction']['right']['max_claimed_sites'][0], - 'claimed_sites': x['reduction']['left'].filter({'claimed':True}).count() - })).run() + .order_by(r.desc('claimed'), 'last_disclaimed') + .fold( + {}, lambda acc, site: acc.merge( + r.branch( + site.has_fields('job_id'), + r.object( + site['job_id'].coerce_to('string'), + acc[site['job_id'].coerce_to('string')].default(0).add(1)), + {})), + emit=lambda acc, site, new_acc: r.branch( + r.and_( + r.or_( + site['claimed'].not_(), + site['last_claimed'].lt(r.now().sub(60*60))), + r.or_( + site.has_fields('max_claimed_sites').not_(), + new_acc[site['job_id'].coerce_to('string')].le(site['max_claimed_sites']))), + [site['id']], [])) + .limit(n))) + .update( + # try to avoid a race condition resulting in multiple + # brozzler-workers claiming the same site + # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 + r.branch( + r.or_( + r.row['claimed'].not_(), + r.row['last_claimed'].lt(r.now().sub(60*60))), + {'claimed': True, 'last_claimed': r.now()}, + {}), + return_changes=True)).run() - xformed = { - d['job_id']: { - 'claimed_sites': d['claimed_sites'], - 'max_claimed_sites': d['max_claimed_sites'] - } - for d in results - } - return xformed - - def claim_sites(self, n=1): - ''' - What does this crazy method do? Well, let me break it down for you. - It claims sites for a brozzler worker to brozzle. It does this in bulk - to minimize contention over the "sites" table. It also respects the job - setting `max_claimed_sites`. To accomplish this, first it claims all - the currently unclaimed sites. If there are more than `n`, it puts - some back (disclaims them). Also if it would exceed `max_claimed_sites` - for a job, it disclaims some for that job. Finally it returns the - results, or raises `brozzler.NothingToClaim` if there is nothing to - claim. - ''' - # need to do this before the next query - claimed_sites_by_job = self._claimed_sites_by_job() - - temp_claimed = ( - self.rr.table("sites", read_mode="majority") - .between( - ["ACTIVE", r.minval], ["ACTIVE", r.maxval], - index="sites_last_disclaimed") - .order_by(index="sites_last_disclaimed") - .filter((r.row["claimed"] != True) | ( - r.row["last_claimed"] < r.now() - 60*60)) - .update( - # try to avoid a race condition resulting in multiple - # brozzler-workers claiming the same site - # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 - r.branch((r.row["claimed"] != True) | ( - r.row["last_claimed"] < r.now() - 60*60), { - "claimed": True, - "last_claimed": doublethink.utcnow()}, {}), - return_changes=True)).run() - # XXX vet result? - - claiming_these_sites = [] - not_claiming_these_ids = [] - for i in range(temp_claimed["replaced"]): - site = brozzler.Site(self.rr, temp_claimed["changes"][i]["new_val"]) - if len(claiming_these_sites) >= n: - not_claiming_these_ids.append(site.id) - continue - if site.job_id in claimed_sites_by_job: - d = claimed_sites_by_job[site.job_id] - if d['claimed_sites'] + 1 > d['max_claimed_sites']: - not_claiming_these_ids.append(site.id) - continue - - if temp_claimed["changes"][i]["old_val"]["claimed"]: + self._vet_result( + result, replaced=list(range(n+1)), + unchanged=list(range(n+1))) + sites = [] + for i in range(result["replaced"]): + if result["changes"][i]["old_val"]["claimed"]: self.logger.warn( "re-claimed site that was still marked 'claimed' " "because it was last claimed a long time ago " "at %s, and presumably some error stopped it from " "being disclaimed", - temp_claimed["changes"][i]["old_val"]["last_claimed"]) - claiming_these_sites.append(site) - if site.job_id in claimed_sites_by_job: - claimed_sites_by_job[site.job_id]['claimed_sites'] += 1 - - if not_claiming_these_ids: - logging.trace( - 'not claiming %s of %s candidate sites', - len(not_claiming_these_ids), temp_claimed["replaced"]) - self.rr.table('sites')\ - .get_all(*not_claiming_these_ids)\ - .update({'claimed': False}).run() - - if claiming_these_sites: - return claiming_these_sites + result["changes"][i]["old_val"]["last_claimed"]) + site = brozzler.Site(self.rr, result["changes"][i]["new_val"]) + sites.append(site) + if sites: + return sites else: raise brozzler.NothingToClaim diff --git a/brozzler/model.py b/brozzler/model.py index 856ac13..9c1a60f 100644 --- a/brozzler/model.py +++ b/brozzler/model.py @@ -86,7 +86,6 @@ def new_job(frontier, job_conf): sites = [] for seed_conf in job_conf["seeds"]: merged_conf = merge(seed_conf, job_conf) - merged_conf.pop("max_claimed_sites", None) merged_conf.pop("seeds") merged_conf["job_id"] = job.id merged_conf["seed"] = merged_conf.pop("url")