Fix get|set_type_stream_id_for_appservice store functions (#8648)

This commit is contained in:
Will Hunt 2020-10-26 14:51:33 +00:00 committed by GitHub
parent 73d8209694
commit e8dbbcb64c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 13 deletions

1
changelog.d/8648.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices.

View File

@ -236,16 +236,16 @@ class ApplicationServicesHandler:
events = await self._handle_receipts(service) events = await self._handle_receipts(service)
if events: if events:
self.scheduler.submit_ephemeral_events_for_as(service, events) self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice( await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token service, "read_receipt", new_token
) )
elif stream_key == "presence_key": elif stream_key == "presence_key":
events = await self._handle_presence(service, users) events = await self._handle_presence(service, users)
if events: if events:
self.scheduler.submit_ephemeral_events_for_as(service, events) self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice( await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token service, "presence", new_token
) )
async def _handle_typing(self, service: ApplicationService, new_token: int): async def _handle_typing(self, service: ApplicationService, new_token: int):
typing_source = self.event_sources.sources["typing"] typing_source = self.event_sources.sources["typing"]

View File

@ -369,17 +369,25 @@ class ApplicationServiceTransactionWorkerStore(
async def get_type_stream_id_for_appservice( async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str self, service: ApplicationService, type: str
) -> int: ) -> int:
if type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
)
def get_type_stream_id_for_appservice_txn(txn): def get_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type stream_id_type = "%s_stream_id" % type
txn.execute( txn.execute(
"SELECT ? FROM application_services_state WHERE as_id=?", # We do NOT want to escape `stream_id_type`.
(stream_id_type, service.id,), "SELECT %s FROM application_services_state WHERE as_id=?"
% stream_id_type,
(service.id,),
) )
last_txn_id = txn.fetchone() last_stream_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists if last_stream_id is None or last_stream_id[0] is None: # no row exists
return 0 return 0
else: else:
return int(last_txn_id[0]) return int(last_stream_id[0])
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
@ -388,11 +396,18 @@ class ApplicationServiceTransactionWorkerStore(
async def set_type_stream_id_for_appservice( async def set_type_stream_id_for_appservice(
self, service: ApplicationService, type: str, pos: int self, service: ApplicationService, type: str, pos: int
) -> None: ) -> None:
if type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
)
def set_type_stream_id_for_appservice_txn(txn): def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type stream_id_type = "%s_stream_id" % type
txn.execute( txn.execute(
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", "UPDATE application_services_state SET %s = ? WHERE as_id=?"
(stream_id_type, pos, service.id), % stream_id_type,
(pos, service.id),
) )
await self.db_pool.runInteraction( await self.db_pool.runInteraction(

View File

@ -410,6 +410,62 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
) )
class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver()
return hs
def prepare(self, hs, reactor, clock):
self.service = Mock(id="foo")
self.store = self.hs.get_datastore()
self.get_success(self.store.set_appservice_state(self.service, "up"))
def test_get_type_stream_id_for_appservice_no_value(self):
value = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
)
self.assertEquals(value, 0)
value = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "presence")
)
self.assertEquals(value, 0)
def test_get_type_stream_id_for_appservice_invalid_type(self):
self.get_failure(
self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
ValueError,
)
def test_set_type_stream_id_for_appservice(self):
read_receipt_value = 1024
self.get_success(
self.store.set_type_stream_id_for_appservice(
self.service, "read_receipt", read_receipt_value
)
)
result = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
)
self.assertEqual(result, read_receipt_value)
self.get_success(
self.store.set_type_stream_id_for_appservice(
self.service, "presence", read_receipt_value
)
)
result = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "presence")
)
self.assertEqual(result, read_receipt_value)
def test_set_type_stream_id_for_appservice_invalid_type(self):
self.get_failure(
self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
ValueError,
)
# required for ApplicationServiceTransactionStoreTestCase tests # required for ApplicationServiceTransactionStoreTestCase tests
class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
def __init__(self, database: DatabasePool, db_conn, hs): def __init__(self, database: DatabasePool, db_conn, hs):