diff --git a/brozzler/cli.py b/brozzler/cli.py old mode 100755 new mode 100644 diff --git a/brozzler/worker.py b/brozzler/worker.py index 4955f2c..ba77ba7 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -129,13 +129,31 @@ class BrozzlerWorker: self._start_stop_lock = threading.Lock() self._shutdown = threading.Event() + def _choose_warcprox(self): + warcproxes = self._service_registry.available_services('warcprox') + if not warcproxes: + return None + reql = self._frontier.rr.table('sites').between( + ['ACTIVE', r.minval], ['ACTIVE', r.maxval], + index='sites_last_disclaimed') + active_sites = list(reql.run()) + for warcprox in warcproxes: + address = '%s:%s' % (warcprox['host'], warcprox['port']) + warcprox['assigned_sites'] = len([ + site for site in active_sites + if 'proxy' in site and site['proxy'] == address]) + warcproxes.sort(key=lambda warcprox: ( + warcprox['assigned_sites'], warcprox['load'])) + # XXX make this heuristic more advanced? + return warcproxes[0] + def _proxy_for(self, site): if self._proxy: return self._proxy elif site.proxy: return site.proxy elif self._warcprox_auto: - svc = self._service_registry.available_service('warcprox') + svc = self._choose_warcprox() if svc is None: raise brozzler.ProxyError( 'no available instances of warcprox in the service ' diff --git a/tests/test_frontier.py b/tests/test_frontier.py index b77c1e6..fdae726 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -855,3 +855,78 @@ def test_claim_site(): # clean up rr.table('sites').get(claimed_site.id).delete().run() +def test_choose_warcprox(): + rr = doublethink.Rethinker('localhost', db='ignoreme') + svcreg = doublethink.ServiceRegistry(rr) + frontier = brozzler.RethinkDbFrontier(rr) + + # avoid this of error: https://travis-ci.org/internetarchive/brozzler/jobs/330991786#L1021 + rr.table('sites').wait().run() + rr.table('services').wait().run() + rr.table('sites').index_wait().run() + rr.table('services').index_wait().run() + + # clean slate + rr.table('sites').delete().run() + rr.table('services').delete().run() + worker = brozzler.BrozzlerWorker(frontier, svcreg) + assert worker._choose_warcprox() is None + + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host1', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host2', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host2', 'port': 8001, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host3', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host4', 'port': 8000, + 'load': 1, 'ttl': 60}).run() + + rr.table('sites').insert({ + 'proxy': 'host1:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host1:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host2:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host2:8001', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + + instance = worker._choose_warcprox() + assert instance['host'] == 'host3' + assert instance['port'] == 8000 + rr.table('sites').insert({ + 'proxy': 'host3:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + + instance = worker._choose_warcprox() + assert instance['host'] == 'host4' + assert instance['port'] == 8000 + + # clean up + rr.table('sites').delete().run() + rr.table('services').delete().run()