mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 08:39:59 -05:00
--warcprox-auto distribute assigned sites evenly (#78)
--warcprox-auto distribute assigned sites evenly When running with --warcprox-auto, choose the instance of warcprox with the least number of assigned sites, instead of the lowest load in the service registry. In practice we often start brozzling a whole bunch of sites at approximately the same time, and because it takes time for that to affect the "load" reported by warcprox instances, sites end up being distributed very unevenly.
This commit is contained in:
parent
9e80a3b0d3
commit
7f78c335e1
0
brozzler/cli.py
Executable file → Normal file
0
brozzler/cli.py
Executable file → Normal file
@ -129,13 +129,31 @@ class BrozzlerWorker:
|
|||||||
self._start_stop_lock = threading.Lock()
|
self._start_stop_lock = threading.Lock()
|
||||||
self._shutdown = threading.Event()
|
self._shutdown = threading.Event()
|
||||||
|
|
||||||
|
def _choose_warcprox(self):
|
||||||
|
warcproxes = self._service_registry.available_services('warcprox')
|
||||||
|
if not warcproxes:
|
||||||
|
return None
|
||||||
|
reql = self._frontier.rr.table('sites').between(
|
||||||
|
['ACTIVE', r.minval], ['ACTIVE', r.maxval],
|
||||||
|
index='sites_last_disclaimed')
|
||||||
|
active_sites = list(reql.run())
|
||||||
|
for warcprox in warcproxes:
|
||||||
|
address = '%s:%s' % (warcprox['host'], warcprox['port'])
|
||||||
|
warcprox['assigned_sites'] = len([
|
||||||
|
site for site in active_sites
|
||||||
|
if 'proxy' in site and site['proxy'] == address])
|
||||||
|
warcproxes.sort(key=lambda warcprox: (
|
||||||
|
warcprox['assigned_sites'], warcprox['load']))
|
||||||
|
# XXX make this heuristic more advanced?
|
||||||
|
return warcproxes[0]
|
||||||
|
|
||||||
def _proxy_for(self, site):
|
def _proxy_for(self, site):
|
||||||
if self._proxy:
|
if self._proxy:
|
||||||
return self._proxy
|
return self._proxy
|
||||||
elif site.proxy:
|
elif site.proxy:
|
||||||
return site.proxy
|
return site.proxy
|
||||||
elif self._warcprox_auto:
|
elif self._warcprox_auto:
|
||||||
svc = self._service_registry.available_service('warcprox')
|
svc = self._choose_warcprox()
|
||||||
if svc is None:
|
if svc is None:
|
||||||
raise brozzler.ProxyError(
|
raise brozzler.ProxyError(
|
||||||
'no available instances of warcprox in the service '
|
'no available instances of warcprox in the service '
|
||||||
|
@ -855,3 +855,78 @@ def test_claim_site():
|
|||||||
# clean up
|
# clean up
|
||||||
rr.table('sites').get(claimed_site.id).delete().run()
|
rr.table('sites').get(claimed_site.id).delete().run()
|
||||||
|
|
||||||
|
def test_choose_warcprox():
|
||||||
|
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||||
|
svcreg = doublethink.ServiceRegistry(rr)
|
||||||
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
|
# avoid this of error: https://travis-ci.org/internetarchive/brozzler/jobs/330991786#L1021
|
||||||
|
rr.table('sites').wait().run()
|
||||||
|
rr.table('services').wait().run()
|
||||||
|
rr.table('sites').index_wait().run()
|
||||||
|
rr.table('services').index_wait().run()
|
||||||
|
|
||||||
|
# clean slate
|
||||||
|
rr.table('sites').delete().run()
|
||||||
|
rr.table('services').delete().run()
|
||||||
|
worker = brozzler.BrozzlerWorker(frontier, svcreg)
|
||||||
|
assert worker._choose_warcprox() is None
|
||||||
|
|
||||||
|
rr.table('services').insert({
|
||||||
|
'role': 'warcprox',
|
||||||
|
'first_heartbeat': doublethink.utcnow(),
|
||||||
|
'last_heartbeat': doublethink.utcnow(),
|
||||||
|
'host': 'host1', 'port': 8000,
|
||||||
|
'load': 0, 'ttl': 60}).run()
|
||||||
|
rr.table('services').insert({
|
||||||
|
'role': 'warcprox',
|
||||||
|
'first_heartbeat': doublethink.utcnow(),
|
||||||
|
'last_heartbeat': doublethink.utcnow(),
|
||||||
|
'host': 'host2', 'port': 8000,
|
||||||
|
'load': 0, 'ttl': 60}).run()
|
||||||
|
rr.table('services').insert({
|
||||||
|
'role': 'warcprox',
|
||||||
|
'first_heartbeat': doublethink.utcnow(),
|
||||||
|
'last_heartbeat': doublethink.utcnow(),
|
||||||
|
'host': 'host2', 'port': 8001,
|
||||||
|
'load': 0, 'ttl': 60}).run()
|
||||||
|
rr.table('services').insert({
|
||||||
|
'role': 'warcprox',
|
||||||
|
'first_heartbeat': doublethink.utcnow(),
|
||||||
|
'last_heartbeat': doublethink.utcnow(),
|
||||||
|
'host': 'host3', 'port': 8000,
|
||||||
|
'load': 0, 'ttl': 60}).run()
|
||||||
|
rr.table('services').insert({
|
||||||
|
'role': 'warcprox',
|
||||||
|
'first_heartbeat': doublethink.utcnow(),
|
||||||
|
'last_heartbeat': doublethink.utcnow(),
|
||||||
|
'host': 'host4', 'port': 8000,
|
||||||
|
'load': 1, 'ttl': 60}).run()
|
||||||
|
|
||||||
|
rr.table('sites').insert({
|
||||||
|
'proxy': 'host1:8000', 'status': 'ACTIVE',
|
||||||
|
'last_disclaimed': doublethink.utcnow()}).run()
|
||||||
|
rr.table('sites').insert({
|
||||||
|
'proxy': 'host1:8000', 'status': 'ACTIVE',
|
||||||
|
'last_disclaimed': doublethink.utcnow()}).run()
|
||||||
|
rr.table('sites').insert({
|
||||||
|
'proxy': 'host2:8000', 'status': 'ACTIVE',
|
||||||
|
'last_disclaimed': doublethink.utcnow()}).run()
|
||||||
|
rr.table('sites').insert({
|
||||||
|
'proxy': 'host2:8001', 'status': 'ACTIVE',
|
||||||
|
'last_disclaimed': doublethink.utcnow()}).run()
|
||||||
|
|
||||||
|
instance = worker._choose_warcprox()
|
||||||
|
assert instance['host'] == 'host3'
|
||||||
|
assert instance['port'] == 8000
|
||||||
|
rr.table('sites').insert({
|
||||||
|
'proxy': 'host3:8000', 'status': 'ACTIVE',
|
||||||
|
'last_disclaimed': doublethink.utcnow()}).run()
|
||||||
|
|
||||||
|
instance = worker._choose_warcprox()
|
||||||
|
assert instance['host'] == 'host4'
|
||||||
|
assert instance['port'] == 8000
|
||||||
|
|
||||||
|
# clean up
|
||||||
|
rr.table('sites').delete().run()
|
||||||
|
rr.table('services').delete().run()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user