Finish synapse.appservice.scheduler implementation.

With tests to assert behaviour. Not hooked up yet. Stub datastore methods
not implemented yet.
This commit is contained in:
Kegan Dougal 2015-03-06 16:09:05 +00:00
parent 7d3491c741
commit 0354659f9d
4 changed files with 186 additions and 53 deletions

View File

@ -25,6 +25,45 @@ class ApplicationServiceState(object):
UP = "up" UP = "up"
class AppServiceTransaction(object):
"""Represents an application service transaction."""
def __init__(self, service, id, events):
self.service = service
self.id = id
self.events = events
def send(self, as_api):
"""Sends this transaction using the provided AS API interface.
Args:
as_api(ApplicationServiceApi): The API to use to send.
Returns:
A Deferred which resolves to True if the transaction was sent.
"""
return as_api.push_bulk(
service=self.service,
events=self.events,
txn_id=self.id
)
def complete(self, store):
"""Completes this transaction as successful.
Marks this transaction ID on the application service and removes the
transaction contents from the database.
Args:
store: The database store to operate on.
Returns:
A Deferred which resolves to True if the transaction was completed.
"""
return store.complete_appservice_txn(
service=self.service,
txn_id=self.id
)
class ApplicationService(object): class ApplicationService(object):
"""Defines an application service. This definition is mostly what is """Defines an application service. This definition is mostly what is
provided to the /register AS API. provided to the /register AS API.

View File

@ -88,45 +88,6 @@ class AppServiceScheduler(object):
self.event_grouper.on_receive(service, event) self.event_grouper.on_receive(service, event)
class AppServiceTransaction(object):
"""Represents an application service transaction."""
def __init__(self, service, id, events):
self.service = service
self.id = id
self.events = events
def send(self, as_api):
"""Sends this transaction using the provided AS API interface.
Args:
as_api(ApplicationServiceApi): The API to use to send.
Returns:
A Deferred which resolves to True if the transaction was sent.
"""
return as_api.push_bulk(
service=self.service,
events=self.events,
txn_id=self.id
)
def complete(self, store):
"""Completes this transaction as successful.
Marks this transaction ID on the application service and removes the
transaction contents from the database.
Args:
store: The database store to operate on.
Returns:
A Deferred which resolves to True if the transaction was completed.
"""
return store.complete_appservice_txn(
service=self.service,
txn_id=self.id
)
class _EventGrouper(object): class _EventGrouper(object):
"""Groups events for the same application service together. """Groups events for the same application service together.
""" """
@ -156,14 +117,18 @@ class _TransactionController(object):
# keep track of how many recoverers there are # keep track of how many recoverers there are
self.recoverers = [] self.recoverers = []
@defer.inlineCallbacks
def start_polling(self): def start_polling(self):
groups = self.event_grouper.drain_groups() groups = self.event_grouper.drain_groups()
for service in groups: for service in groups:
txn_id = self._get_next_txn_id(service) txn = yield self.store.create_appservice_txn(
txn = AppServiceTransaction(service, txn_id, groups[service]) service=service,
self._store_txn(txn) events=groups[service]
if self._is_service_up(service): )
if txn.send(self.as_api): service_is_up = yield self._is_service_up(service)
if service_is_up:
sent = yield txn.send(self.as_api)
if sent:
txn.complete(self.store) txn.complete(self.store)
else: else:
self._start_recoverer(service) self._start_recoverer(service)
@ -207,14 +172,10 @@ class _TransactionController(object):
logger.error("Failed to apply appservice state DOWN to service %s", logger.error("Failed to apply appservice state DOWN to service %s",
service) service)
@defer.inlineCallbacks
def _is_service_up(self, service): def _is_service_up(self, service):
pass state = yield self.store.get_appservice_state(service)
defer.returnValue(state == ApplicationServiceState.UP)
def _get_next_txn_id(self, service):
pass # TODO work out the next txn_id for this service
def _store_txn(self, txn):
pass
class _Recoverer(object): class _Recoverer(object):

View File

@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
""" """
pass pass
def get_appservice_state(self, service):
"""Get the application service state.
Args:
service(ApplicationService): The service whose state to set.
Returns:
A Deferred which resolves to ApplicationServiceState.
"""
pass
def set_appservice_state(self, service, state): def set_appservice_state(self, service, state):
"""Set the application service state. """Set the application service state.
@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
""" """
pass pass
def create_appservice_txn(self, service, events):
"""Atomically creates a new transaction for this application service
with the given list of events.
Args:
service(ApplicationService): The service who the transaction is for.
events(list<Event>): A list of events to put in the transaction.
Returns:
ApplicationServiceTransaction: A new transaction.
"""
pass
def complete_appservice_txn(self, txn_id, service): def complete_appservice_txn(self, txn_id, service):
"""Completes an application service transaction. """Completes an application service transaction.

View File

@ -12,9 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.appservice import ApplicationServiceState, AppServiceTransaction
from synapse.appservice.scheduler import ( from synapse.appservice.scheduler import (
AppServiceScheduler, AppServiceTransaction, _EventGrouper, AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer
_TransactionController, _Recoverer
) )
from twisted.internet import defer from twisted.internet import defer
from ..utils import MockClock from ..utils import MockClock
@ -22,6 +22,116 @@ from mock import Mock
from tests import unittest from tests import unittest
class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
def setUp(self):
self.clock = MockClock()
self.store = Mock()
self.as_api = Mock()
self.event_grouper = Mock()
self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController(
clock=self.clock, store=self.store, as_api=self.as_api,
event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn
)
def test_poll_single_group_service_up(self):
# Test: The AS is up and the txn is successfully sent.
service = Mock()
events = [Mock(), Mock()]
groups = {}
groups[service] = events
txn_id = "foobar"
txn = Mock(id=txn_id, service=service, events=events)
# mock methods
self.event_grouper.drain_groups = Mock(return_value=groups)
self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.UP)
)
txn.send = Mock(return_value=defer.succeed(True))
self.store.create_appservice_txn = Mock(
return_value=defer.succeed(txn)
)
# actual call
self.txnctrl.start_polling()
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
)
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
txn.complete.assert_called_once_with(self.store) # txn completed
def test_poll_single_group_service_down(self):
# Test: The AS is down so it shouldn't push; Recoverers will do it.
# It should still make a transaction though.
service = Mock()
events = [Mock(), Mock()]
groups = {}
groups[service] = events
self.event_grouper.drain_groups = Mock(return_value=groups)
txn = Mock(id="idhere", service=service, events=events)
self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.DOWN)
)
self.store.create_appservice_txn = Mock(
return_value=defer.succeed(txn)
)
# actual call
self.txnctrl.start_polling()
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
)
self.assertEquals(0, txn.send.call_count) # txn not sent though
self.assertEquals(0, txn.complete.call_count) # or completed
def test_poll_single_group_service_up(self):
# Test: The AS is up and the txn is not sent. A Recoverer is made and
# started.
service = Mock()
events = [Mock(), Mock()]
groups = {}
groups[service] = events
txn_id = "foobar"
txn = Mock(id=txn_id, service=service, events=events)
# mock methods
self.event_grouper.drain_groups = Mock(return_value=groups)
self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.UP)
)
self.store.set_appservice_state = Mock(return_value=defer.succeed(True))
txn.send = Mock(return_value=defer.succeed(False)) # fails to send
self.store.create_appservice_txn = Mock(
return_value=defer.succeed(txn)
)
# actual call
self.txnctrl.start_polling()
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events
)
self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made
self.assertEquals(1, self.recoverer.recover.call_count) # and invoked
self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored
self.assertEquals(0, txn.complete.call_count) # txn not completed
self.store.set_appservice_state.assert_called_once_with(
service, ApplicationServiceState.DOWN # service marked as down
)
def test_poll_no_groups(self):
self.as_api.push_bulk = Mock()
self.event_grouper.drain_groups = Mock(return_value={})
self.txnctrl.start_polling()
self.assertEquals(0, self.as_api.push_bulk.call_count)
class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.assertEquals(1, txn.complete.call_count) self.assertEquals(1, txn.complete.call_count)
self.callback.assert_called_once_with(self.recoverer) self.callback.assert_called_once_with(self.recoverer)
class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):
def setUp(self): def setUp(self):