From 7f78c335e1cc61f5e6db983e9dfa608357e1ea49 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 19 Jan 2018 14:54:33 -0800 Subject: [PATCH] --warcprox-auto distribute assigned sites evenly (#78) --warcprox-auto distribute assigned sites evenly When running with --warcprox-auto, choose the instance of warcprox with the least number of assigned sites, instead of the lowest load in the service registry. In practice we often start brozzling a whole bunch of sites at approximately the same time, and because it takes time for that to affect the "load" reported by warcprox instances, sites end up being distributed very unevenly. --- brozzler/cli.py | 0 brozzler/worker.py | 20 ++++++++++- tests/test_frontier.py | 75 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) mode change 100755 => 100644 brozzler/cli.py diff --git a/brozzler/cli.py b/brozzler/cli.py old mode 100755 new mode 100644 diff --git a/brozzler/worker.py b/brozzler/worker.py index 4955f2c..ba77ba7 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -129,13 +129,31 @@ class BrozzlerWorker: self._start_stop_lock = threading.Lock() self._shutdown = threading.Event() + def _choose_warcprox(self): + warcproxes = self._service_registry.available_services('warcprox') + if not warcproxes: + return None + reql = self._frontier.rr.table('sites').between( + ['ACTIVE', r.minval], ['ACTIVE', r.maxval], + index='sites_last_disclaimed') + active_sites = list(reql.run()) + for warcprox in warcproxes: + address = '%s:%s' % (warcprox['host'], warcprox['port']) + warcprox['assigned_sites'] = len([ + site for site in active_sites + if 'proxy' in site and site['proxy'] == address]) + warcproxes.sort(key=lambda warcprox: ( + warcprox['assigned_sites'], warcprox['load'])) + # XXX make this heuristic more advanced? + return warcproxes[0] + def _proxy_for(self, site): if self._proxy: return self._proxy elif site.proxy: return site.proxy elif self._warcprox_auto: - svc = self._service_registry.available_service('warcprox') + svc = self._choose_warcprox() if svc is None: raise brozzler.ProxyError( 'no available instances of warcprox in the service ' diff --git a/tests/test_frontier.py b/tests/test_frontier.py index b77c1e6..fdae726 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -855,3 +855,78 @@ def test_claim_site(): # clean up rr.table('sites').get(claimed_site.id).delete().run() +def test_choose_warcprox(): + rr = doublethink.Rethinker('localhost', db='ignoreme') + svcreg = doublethink.ServiceRegistry(rr) + frontier = brozzler.RethinkDbFrontier(rr) + + # avoid this of error: https://travis-ci.org/internetarchive/brozzler/jobs/330991786#L1021 + rr.table('sites').wait().run() + rr.table('services').wait().run() + rr.table('sites').index_wait().run() + rr.table('services').index_wait().run() + + # clean slate + rr.table('sites').delete().run() + rr.table('services').delete().run() + worker = brozzler.BrozzlerWorker(frontier, svcreg) + assert worker._choose_warcprox() is None + + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host1', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host2', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host2', 'port': 8001, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host3', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host4', 'port': 8000, + 'load': 1, 'ttl': 60}).run() + + rr.table('sites').insert({ + 'proxy': 'host1:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host1:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host2:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host2:8001', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + + instance = worker._choose_warcprox() + assert instance['host'] == 'host3' + assert instance['port'] == 8000 + rr.table('sites').insert({ + 'proxy': 'host3:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + + instance = worker._choose_warcprox() + assert instance['host'] == 'host4' + assert instance['port'] == 8000 + + # clean up + rr.table('sites').delete().run() + rr.table('services').delete().run()