diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 06f482f..eb70f3f 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -1,7 +1,7 @@ ''' brozzler/frontier.py - RethinkDbFrontier manages crawl jobs, sites and pages -Copyright (C) 2014-2017 Internet Archive +Copyright (C) 2014-2018 Internet Archive Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -93,47 +93,104 @@ class RethinkDbFrontier: raise UnexpectedDbResult("expected %r to be %r in %r" % ( k, expected, result)) - def claim_sites(self, n=1): - # XXX keep track of aggregate priority and prioritize sites accordingly? - while True: - result = ( - self.rr.table("sites", read_mode="majority") - .between( - ["ACTIVE", r.minval], ["ACTIVE", r.maxval], - index="sites_last_disclaimed") - .order_by(index="sites_last_disclaimed") - .filter((r.row["claimed"] != True) | ( - r.row["last_claimed"] < r.now() - 60*60)) - .limit(n) - .update( - # try to avoid a race condition resulting in multiple - # brozzler-workers claiming the same site - # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 - r.branch((r.row["claimed"] != True) | ( - r.row["last_claimed"] < r.now() - 60*60), { - "claimed": True, - "last_claimed": doublethink.utcnow()}, {}), - return_changes=True)).run() + def _claimed_sites_by_job(self): + ''' + Returns a dictionary that looks like this: + {: {'claimed_sites': 2, 'max_claimed_sites': 3}, ...} + ''' + # js query: r.db('brozzler').table('sites').between(['ACTIVE', r.minval], ['ACTIVE', r.maxval], {'index': 'sites_last_disclaimed'}).eqJoin('job_id', r.db('brozzler').table('jobs')).group(function(x){return x('right')('id')}).ungroup().map(function(x){return {'job_id': x('group'), 'max_claimed_sites':x('reduction')('right')('max_claimed_sites')(0), 'claimed_sites':x('reduction')('left').filter({'claimed':true}).count()}}) + # returns results like: + # [{'max_claimed_sites': 2, 'claimed_sites': 0, 'job_id': 1234}, + # {'max_claimed_sites': 3, 'claimed_sites': 3, 'job_id': 1235}] + results = ( + self.rr.table('sites') + .between( + ['ACTIVE', r.minval], ['ACTIVE', r.maxval], + index='sites_last_disclaimed') + .eq_join('job_id', r.db(self.rr.dbname).table('jobs')) + .group(lambda x: x['right']['id']) + .ungroup() + .map(lambda x: { + 'job_id': x['group'], + 'max_claimed_sites': x['reduction']['right']['max_claimed_sites'][0], + 'claimed_sites': x['reduction']['left'].filter({'claimed':True}).count() + })).run() + xformed = {d['job_id']: { + 'claimed_sites': d['claimed_sites'], + 'max_claimed_sites': d['max_claimed_sites'] + } for d in results} + return xformed - self._vet_result( - result, replaced=list(range(n+1)), - unchanged=list(range(n+1))) - sites = [] - for i in range(result["replaced"]): - if result["changes"][i]["old_val"]["claimed"]: - self.logger.warn( - "re-claimed site that was still marked 'claimed' " - "because it was last claimed a long time ago " - "at %s, and presumably some error stopped it from " - "being disclaimed", - result["changes"][i]["old_val"]["last_claimed"]) - site = brozzler.Site(self.rr, result["changes"][i]["new_val"]) - sites.append(site) - if not sites: - raise brozzler.NothingToClaim - if sites: - return sites - # else try again + def claim_sites(self, n=1): + ''' + What does this crazy method do? Well, let me break it down for you. + It claims sites for a brozzler worker to brozzle. It does this in bulk + to minimize contention over the "sites" table. It also respects the job + setting `max_claimed_sites`. To accomplish this, first it claims all + the currently unclaimed sites. If there are more than `n`, it puts + some back (disclaims them). Also if it would exceed `max_claimed_sites` + for a job, it disclaims some for that job. Finally it returns the + results, or raises `brozzler.NothingToClaim` if there is nothing to + claim. + ''' + # need to do this before the next query + claimed_sites_by_job = self._claimed_sites_by_job() + + temp_claimed = ( + self.rr.table("sites", read_mode="majority") + .between( + ["ACTIVE", r.minval], ["ACTIVE", r.maxval], + index="sites_last_disclaimed") + .order_by(index="sites_last_disclaimed") + .filter((r.row["claimed"] != True) | ( + r.row["last_claimed"] < r.now() - 60*60)) + .update( + # try to avoid a race condition resulting in multiple + # brozzler-workers claiming the same site + # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 + r.branch((r.row["claimed"] != True) | ( + r.row["last_claimed"] < r.now() - 60*60), { + "claimed": True, + "last_claimed": doublethink.utcnow()}, {}), + return_changes=True)).run() + # XXX vet result? + + claiming_these_sites = [] + not_claiming_these_ids = [] + for i in range(temp_claimed["replaced"]): + site = brozzler.Site(self.rr, temp_claimed["changes"][i]["new_val"]) + if len(claiming_these_sites) >= n: + not_claiming_these_ids.append(site.id) + continue + if site.job_id in claimed_sites_by_job: + d = claimed_sites_by_job[site.job_id] + if d['claimed_sites'] + 1 > d['max_claimed_sites']: + not_claiming_these_ids.append(site.id) + continue + + if temp_claimed["changes"][i]["old_val"]["claimed"]: + self.logger.warn( + "re-claimed site that was still marked 'claimed' " + "because it was last claimed a long time ago " + "at %s, and presumably some error stopped it from " + "being disclaimed", + temp_claimed["changes"][i]["old_val"]["last_claimed"]) + claiming_these_sites.append(site) + if site.job_id in claimed_sites_by_job: + claimed_sites_by_job[site.job_id]['claimed_sites'] += 1 + + if not_claiming_these_ids: + logging.trace( + 'not claiming %s of %s candidate sites', + len(not_claiming_these_ids), temp_claimed["replaced"]) + self.rr.table('sites')\ + .get_all(*not_claiming_these_ids)\ + .update({'claimed': False}).run() + + if claiming_these_sites: + return claiming_these_sites + else: + raise brozzler.NothingToClaim def enforce_time_limit(self, site): ''' diff --git a/brozzler/job_schema.yaml b/brozzler/job_schema.yaml index 9a65489..14445bc 100644 --- a/brozzler/job_schema.yaml +++ b/brozzler/job_schema.yaml @@ -96,3 +96,7 @@ seeds: type: string <<: *multi_level_options + +max_claimed_sites: + type: integer + diff --git a/brozzler/model.py b/brozzler/model.py index ddcfb51..856ac13 100644 --- a/brozzler/model.py +++ b/brozzler/model.py @@ -2,7 +2,7 @@ brozzler/models.py - model classes representing jobs, sites, and pages, with related logic -Copyright (C) 2014-2017 Internet Archive +Copyright (C) 2014-2018 Internet Archive Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -79,11 +79,14 @@ def new_job(frontier, job_conf): "started": doublethink.utcnow()}) if "id" in job_conf: job.id = job_conf["id"] + if "max_claimed_sites" in job_conf: + job.max_claimed_sites = job_conf["max_claimed_sites"] job.save() sites = [] for seed_conf in job_conf["seeds"]: merged_conf = merge(seed_conf, job_conf) + merged_conf.pop("max_claimed_sites", None) merged_conf.pop("seeds") merged_conf["job_id"] = job.id merged_conf["seed"] = merged_conf.pop("url") diff --git a/job-conf.rst b/job-conf.rst index 4cedd8b..1174f1a 100644 --- a/job-conf.rst +++ b/job-conf.rst @@ -13,6 +13,7 @@ an example id: myjob time_limit: 60 # seconds ignore_robots: false + max_claimed_sites: 2 warcprox_meta: warc-prefix: job1 stats: @@ -102,6 +103,17 @@ List of seeds. Each item in the list is a dictionary (associative array) which defines the seed. It must specify ``url`` (see below) and can additionally specify any of the settings of scope *seed-level*. +``max_claimed_sites`` +--------------------- ++-----------+--------+----------+---------+ +| scope | type | required | default | ++===========+========+==========+=========+ +| top-level | number | no | *none* | ++-----------+--------+----------+---------+ +Puts a cap on the number of sites belonging to a given job that can be brozzled +simultaneously across the cluster. Addresses the problem of a job with many +seeds starving out other jobs. + ``url`` ------- +------------+--------+----------+---------+ @@ -113,6 +125,13 @@ The seed url. ``metadata`` ------------ ++-----------------------+------------+----------+---------+ +| scope | type | required | default | ++=======================+============+==========+=========+ +| seed-level, top-level | dictionary | no | *none* | ++-----------------------+------------+----------+---------+ +Arbitrary information about the crawl job or site. Merely informative, not used +by brozzler for anything. Could be of use to some external process. ``time_limit`` -------------- diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 58207d2..9b80a4f 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -863,6 +863,46 @@ def test_claim_site(): # clean up rr.table('sites').get(claimed_site.id).delete().run() +def test_max_claimed_sites(): + # max_claimed_sites is a brozzler job setting that puts a cap on the number + # of the job's sites that can be brozzled simultaneously across the cluster + rr = doublethink.Rethinker('localhost', db='ignoreme') + frontier = brozzler.RethinkDbFrontier(rr) + + # clean slate + rr.table('jobs').delete().run() + rr.table('sites').delete().run() + + job_conf = { + 'seeds': [ + {'url': 'http://example.com/1'}, + {'url': 'http://example.com/2'}, + {'url': 'http://example.com/3'}, + {'url': 'http://example.com/4'}, + {'url': 'http://example.com/5'}, + ], + 'max_claimed_sites': 3, + } + + job = brozzler.new_job(frontier, job_conf) + + assert job.id + assert job.max_claimed_sites == 3 + + sites = list(frontier.job_sites(job.id)) + assert len(sites) == 5 + + claimed_sites = frontier.claim_sites(1) + assert len(claimed_sites) == 1 + claimed_sites = frontier.claim_sites(3) + assert len(claimed_sites) == 2 + with pytest.raises(brozzler.NothingToClaim): + claimed_site = frontier.claim_sites(3) + + # clean slate for the next one + rr.table('jobs').delete().run() + rr.table('sites').delete().run() + def test_choose_warcprox(): rr = doublethink.Rethinker('localhost', db='ignoreme') svcreg = doublethink.ServiceRegistry(rr)