Refactor the Appservice scheduler code (#5886)

Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code
(it seemed to be previously inexplicably split between
`ApplicationServiceScheduler.start` and `_Recoverer.start`).

Add some docstrings too.
This commit is contained in:
Richard van der Hoff 2019-08-20 17:42:45 +01:00 committed by GitHub
commit 72bc285669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 49 deletions

1
changelog.d/5886.misc Normal file
View File

@ -0,0 +1 @@
Refactor the Appservice scheduler code.

View File

@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api() self.as_api = hs.get_application_service_api()
def create_recoverer(service, callback): self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
logger.info("Starting appservice scheduler") logger.info("Starting appservice scheduler")
# check for any DOWN ASes and start recoverers for them. # check for any DOWN ASes and start recoverers for them.
recoverers = yield _Recoverer.start( services = yield self.store.get_appservices_by_state(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ApplicationServiceState.DOWN
) )
self.txn_ctrl.add_recoverers(recoverers)
for service in services:
self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(self, service, event): def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event) self.queuer.enqueue(service, event)
class _ServiceQueuer(object): class _ServiceQueuer(object):
"""Queues events for the same application service together, sending """Queue of events waiting to be sent to appservices.
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run. Groups events into transactions per-appservice, and sends them on to the
TransactionController. Makes sure that we only have one transaction in flight per
appservice at a given time.
""" """
def __init__(self, txn_ctrl, clock): def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]} self.queued_events = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set() self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl self.txn_ctrl = txn_ctrl
self.clock = clock self.clock = clock
@ -136,13 +138,29 @@ class _ServiceQueuer(object):
class _TransactionController(object): class _TransactionController(object):
def __init__(self, clock, store, as_api, recoverer_fn): """Transaction manager.
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
if a transaction fails.
(Note we have only have one of these in the homeserver.)
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
"""
def __init__(self, clock, store, as_api):
self.clock = clock self.clock = clock
self.store = store self.store = store
self.as_api = as_api self.as_api = as_api
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are # map from service id to recoverer instance
self.recoverers = [] self.recoverers = {}
# for UTs
self.RECOVERER_CLASS = _Recoverer
@defer.inlineCallbacks @defer.inlineCallbacks
def send(self, service, events): def send(self, service, events):
@ -154,42 +172,45 @@ class _TransactionController(object):
if sent: if sent:
yield txn.complete(self.store) yield txn.complete(self.store)
else: else:
run_in_background(self._start_recoverer, service) run_in_background(self._on_txn_fail, service)
except Exception: except Exception:
logger.exception("Error creating appservice transaction") logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service) run_in_background(self._on_txn_fail, service)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_recovered(self, recoverer): def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
logger.info( logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id "Successfully recovered application service AS ID %s", recoverer.service.id
) )
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers)) logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state( yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP recoverer.service, ApplicationServiceState.UP
) )
def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
logger.info("New active recoverers: %s", len(self.recoverers))
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_recoverer(self, service): def _on_txn_fail(self, service):
try: try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info( self.start_recoverer(service)
"Application service falling behind. Starting recoverer. AS ID %s",
service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
except Exception: except Exception:
logger.exception("Error starting AS recoverer") logger.exception("Error starting AS recoverer")
def start_recoverer(self, service):
"""Start a Recoverer for the given service
Args:
service (synapse.appservice.ApplicationService):
"""
logger.info("Starting recoverer for AS ID %s", service.id)
assert service.id not in self.recoverers
recoverer = self.RECOVERER_CLASS(
self.clock, self.store, self.as_api, service, self.on_recovered
)
self.recoverers[service.id] = recoverer
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
@defer.inlineCallbacks @defer.inlineCallbacks
def _is_service_up(self, service): def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service) state = yield self.store.get_appservice_state(service)
@ -197,18 +218,17 @@ class _TransactionController(object):
class _Recoverer(object): class _Recoverer(object):
@staticmethod """Manages retries and backoff for a DOWN appservice.
@defer.inlineCallbacks
def start(clock, store, as_api, callback): We have one of these for each appservice which is currently considered DOWN.
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] Args:
for r in recoverers: clock (synapse.util.Clock):
logger.info( store (synapse.storage.DataStore):
"Starting recoverer for AS ID %s which was marked as " "DOWN", as_api (synapse.appservice.api.ApplicationServiceApi):
r.service.id, service (synapse.appservice.ApplicationService): the service we are managing
) callback (callable[_Recoverer]): called once the service recovers.
r.recover() """
return recoverers
def __init__(self, clock, store, as_api, service, callback): def __init__(self, clock, store, as_api, service, callback):
self.clock = clock self.clock = clock

View File

@ -37,11 +37,9 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.recoverer = Mock() self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer) self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController( self.txnctrl = _TransactionController(
clock=self.clock, clock=self.clock, store=self.store, as_api=self.as_api
store=self.store,
as_api=self.as_api,
recoverer_fn=self.recoverer_fn,
) )
self.txnctrl.RECOVERER_CLASS = self.recoverer_fn
def test_single_service_up_txn_sent(self): def test_single_service_up_txn_sent(self):
# Test: The AS is up and the txn is successfully sent. # Test: The AS is up and the txn is successfully sent.