diff --git a/brozzler/worker.py b/brozzler/worker.py index 49a120a..ba77ba7 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -133,9 +133,10 @@ class BrozzlerWorker: warcproxes = self._service_registry.available_services('warcprox') if not warcproxes: return None - active_sites = self._frontier.rr.table('sites').between( + reql = self._frontier.rr.table('sites').between( ['ACTIVE', r.minval], ['ACTIVE', r.maxval], - index='sites_last_disclaimed').run() + index='sites_last_disclaimed') + active_sites = list(reql.run()) for warcprox in warcproxes: address = '%s:%s' % (warcprox['host'], warcprox['port']) warcprox['assigned_sites'] = len([ diff --git a/tests/test_frontier.py b/tests/test_frontier.py index b77c1e6..31be2f6 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -855,3 +855,71 @@ def test_claim_site(): # clean up rr.table('sites').get(claimed_site.id).delete().run() +def test_choose_warcprox(): + rr = doublethink.Rethinker('localhost', db='ignoreme') + svcreg = doublethink.ServiceRegistry(rr) + frontier = brozzler.RethinkDbFrontier(rr) + # clean slate + rr.table('sites').delete().run() + rr.table('services').delete().run() + worker = brozzler.BrozzlerWorker(frontier, svcreg) + assert worker._choose_warcprox() is None + + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host1', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host2', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host2', 'port': 8001, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host3', 'port': 8000, + 'load': 0, 'ttl': 60}).run() + rr.table('services').insert({ + 'role': 'warcprox', + 'first_heartbeat': doublethink.utcnow(), + 'last_heartbeat': doublethink.utcnow(), + 'host': 'host4', 'port': 8000, + 'load': 1, 'ttl': 60}).run() + + rr.table('sites').insert({ + 'proxy': 'host1:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host1:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host2:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + rr.table('sites').insert({ + 'proxy': 'host2:8001', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + + instance = worker._choose_warcprox() + assert instance['host'] == 'host3' + assert instance['port'] == 8000 + rr.table('sites').insert({ + 'proxy': 'host3:8000', 'status': 'ACTIVE', + 'last_disclaimed': doublethink.utcnow()}).run() + + instance = worker._choose_warcprox() + assert instance['host'] == 'host4' + assert instance['port'] == 8000 + + # clean up + rr.table('sites').delete().run() + rr.table('services').delete().run()