Replace EventGrouper for ServiceQueuer to move to push-based txns. Fix tests and add stub tests for ServiceQueuer.

This commit is contained in:
Kegan Dougal 2015-03-16 13:15:40 +00:00
parent c9c444f562
commit 6279285b2a
2 changed files with 57 additions and 98 deletions

View File

@ -16,10 +16,10 @@
This module controls the reliability for application service transactions. This module controls the reliability for application service transactions.
The nominal flow through this module looks like: The nominal flow through this module looks like:
_________ __________
---ASa[e]-->| Event | 1---ASa[e]-->| Service |--> Queue ASa[f]
----ASb[e]->| Grouper |<-poll 1/s--+ 2----ASb[e]->| Queuer |
--ASa[e]--->|_________| | ASa[e,e] ASb[e] 3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
V V
-````````- +------------+ -````````- +------------+
|````````|<--StoreTxn-|Transaction | |````````|<--StoreTxn-|Transaction |
@ -66,14 +66,14 @@ class AppServiceScheduler(object):
self.clock = clock self.clock = clock
self.store = store self.store = store
self.as_api = as_api self.as_api = as_api
self.event_grouper = _EventGrouper()
def create_recoverer(service, callback): def create_recoverer(service, callback):
return _Recoverer(clock, store, as_api, service, callback) return _Recoverer(clock, store, as_api, service, callback)
self.txn_ctrl = _TransactionController( self.txn_ctrl = _TransactionController(
clock, store, as_api, self.event_grouper, create_recoverer clock, store, as_api, create_recoverer
) )
self.queuer = _ServiceQueuer(self.txn_ctrl)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
@ -86,17 +86,26 @@ class AppServiceScheduler(object):
self.txn_ctrl.start_polling() self.txn_ctrl.start_polling()
def submit_event_for_as(self, service, event): def submit_event_for_as(self, service, event):
self.event_grouper.enqueue(service, event) self.queuer.enqueue(service, event)
class _EventGrouper(object): class _ServiceQueuer(object):
"""Groups events for the same application service together. """Queues events for the same application service together, sending
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run.
""" """
def __init__(self): def __init__(self, txn_ctrl):
self.groups = {} # dict of {service: [events]} self.groups = {} # dict of {service: [events]}
self.txn_ctrl = txn_ctrl
def enqueue(self, service, event): def enqueue(self, service, event):
# if nothing in queue for this service, send event immediately and add
# callbacks.
self.txn_ctrl.send(service, [event])
# else add to queue for this service
if service not in self.groups: if service not in self.groups:
self.groups[service] = [] self.groups[service] = []
self.groups[service].append(event) self.groups[service].append(event)
@ -109,23 +118,20 @@ class _EventGrouper(object):
class _TransactionController(object): class _TransactionController(object):
def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): def __init__(self, clock, store, as_api, recoverer_fn):
self.clock = clock self.clock = clock
self.store = store self.store = store
self.as_api = as_api self.as_api = as_api
self.event_grouper = event_grouper
self.recoverer_fn = recoverer_fn self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are # keep track of how many recoverers there are
self.recoverers = [] self.recoverers = []
@defer.inlineCallbacks @defer.inlineCallbacks
def start_polling(self): def send(self, service, events):
try: try:
groups = self.event_grouper.drain_groups()
for service in groups:
txn = yield self.store.create_appservice_txn( txn = yield self.store.create_appservice_txn(
service=service, service=service,
events=groups[service] events=events
) )
service_is_up = yield self._is_service_up(service) service_is_up = yield self._is_service_up(service)
if service_is_up: if service_is_up:
@ -136,7 +142,6 @@ class _TransactionController(object):
self._start_recoverer(service) self._start_recoverer(service)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
self.clock.call_later(1, self.start_polling)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_recovered(self, recoverer): def on_recovered(self, recoverer):

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice import ApplicationServiceState, AppServiceTransaction
from synapse.appservice.scheduler import ( from synapse.appservice.scheduler import (
_EventGrouper, _TransactionController, _Recoverer _ServiceQueuer, _TransactionController, _Recoverer
) )
from twisted.internet import defer from twisted.internet import defer
from ..utils import MockClock from ..utils import MockClock
@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.clock = MockClock() self.clock = MockClock()
self.store = Mock() self.store = Mock()
self.as_api = Mock() self.as_api = Mock()
self.event_grouper = Mock()
self.recoverer = Mock() self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer) self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController( self.txnctrl = _TransactionController(
clock=self.clock, store=self.store, as_api=self.as_api, clock=self.clock, store=self.store, as_api=self.as_api,
event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn recoverer_fn=self.recoverer_fn
) )
def test_poll_single_group_service_up(self): def test_single_service_up_txn_sent(self):
# Test: The AS is up and the txn is successfully sent. # Test: The AS is up and the txn is successfully sent.
service = Mock() service = Mock()
events = [Mock(), Mock()] events = [Mock(), Mock()]
groups = {}
groups[service] = events
txn_id = "foobar" txn_id = "foobar"
txn = Mock(id=txn_id, service=service, events=events) txn = Mock(id=txn_id, service=service, events=events)
# mock methods # mock methods
self.event_grouper.drain_groups = Mock(return_value=groups)
self.store.get_appservice_state = Mock( self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.UP) return_value=defer.succeed(ApplicationServiceState.UP)
) )
@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
) )
# actual call # actual call
self.txnctrl.start_polling() self.txnctrl.send(service, events)
self.store.create_appservice_txn.assert_called_once_with( self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved service=service, events=events # txn made and saved
@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
txn.complete.assert_called_once_with(self.store) # txn completed txn.complete.assert_called_once_with(self.store) # txn completed
def test_poll_single_group_service_down(self): def test_single_service_down(self):
# Test: The AS is down so it shouldn't push; Recoverers will do it. # Test: The AS is down so it shouldn't push; Recoverers will do it.
# It should still make a transaction though. # It should still make a transaction though.
service = Mock() service = Mock()
events = [Mock(), Mock()] events = [Mock(), Mock()]
groups = {}
groups[service] = events
self.event_grouper.drain_groups = Mock(return_value=groups)
txn = Mock(id="idhere", service=service, events=events) txn = Mock(id="idhere", service=service, events=events)
self.store.get_appservice_state = Mock( self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.DOWN) return_value=defer.succeed(ApplicationServiceState.DOWN)
@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
) )
# actual call # actual call
self.txnctrl.start_polling() self.txnctrl.send(service, events)
self.store.create_appservice_txn.assert_called_once_with( self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved service=service, events=events # txn made and saved
@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.send.call_count) # txn not sent though
self.assertEquals(0, txn.complete.call_count) # or completed self.assertEquals(0, txn.complete.call_count) # or completed
def test_poll_single_group_service_up(self): def test_single_service_up_txn_not_sent(self):
# Test: The AS is up and the txn is not sent. A Recoverer is made and # Test: The AS is up and the txn is not sent. A Recoverer is made and
# started. # started.
service = Mock() service = Mock()
events = [Mock(), Mock()] events = [Mock(), Mock()]
groups = {}
groups[service] = events
txn_id = "foobar" txn_id = "foobar"
txn = Mock(id=txn_id, service=service, events=events) txn = Mock(id=txn_id, service=service, events=events)
# mock methods # mock methods
self.event_grouper.drain_groups = Mock(return_value=groups)
self.store.get_appservice_state = Mock( self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.UP) return_value=defer.succeed(ApplicationServiceState.UP)
) )
@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
) )
# actual call # actual call
self.txnctrl.start_polling() self.txnctrl.send(service, events)
self.store.create_appservice_txn.assert_called_once_with( self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events service=service, events=events
@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
service, ApplicationServiceState.DOWN # service marked as down service, ApplicationServiceState.DOWN # service marked as down
) )
def test_poll_no_groups(self):
self.as_api.push_bulk = Mock()
self.event_grouper.drain_groups = Mock(return_value={})
self.txnctrl.start_polling()
self.assertEquals(0, self.as_api.push_bulk.call_count)
class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.callback.assert_called_once_with(self.recoverer) self.callback.assert_called_once_with(self.recoverer)
class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.grouper = _EventGrouper() self.txn_ctrl = Mock()
self.queuer = _ServiceQueuer(self.txn_ctrl)
def test_drain_single_event(self): def test_send_single_event_no_queue(self):
service = Mock() # Expect the event to be sent immediately.
event = Mock() pass
self.grouper.enqueue(service, event)
groups = self.grouper.drain_groups()
self.assertTrue(service in groups)
self.assertEquals([event], groups[service])
self.assertEquals(1, len(groups.keys()))
# no more events
self.assertEquals(self.grouper.drain_groups(), {})
def test_drain_multiple_events(self): def test_send_single_event_with_queue(self):
service = Mock() # - Send an event and don't resolve it just yet.
events = [Mock(), Mock(), Mock()] # - Send another event: expect send() to NOT be called.
for e in events: # - Resolve the send event
self.grouper.enqueue(service, e) # - Expect queued event to be sent
groups = self.grouper.drain_groups() pass
self.assertTrue(service in groups)
self.assertEquals(events, groups[service])
# no more events
self.assertEquals(self.grouper.drain_groups(), {})
def test_drain_multiple_services(self): def test_multiple_service_queues(self):
services = [Mock(), Mock(), Mock()] # Tests that each service has its own queue, and that they don't block
events_a = [Mock(), Mock()] # on each other.
events_b = [Mock()] pass
events_c = [Mock(), Mock(), Mock(), Mock()]
mappings = {
services[0]: events_a,
services[1]: events_b,
services[2]: events_c
}
for e in events_b:
self.grouper.enqueue(services[1], e)
for e in events_c:
self.grouper.enqueue(services[2], e)
for e in events_a:
self.grouper.enqueue(services[0], e)
groups = self.grouper.drain_groups()
for service in services:
self.assertTrue(service in groups)
self.assertEquals(mappings[service], groups[service])
self.assertEquals(3, len(groups.keys()))
# no more events
self.assertEquals(self.grouper.drain_groups(), {})