mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Fix a bug introduced in Synapse v1.0.0 whereby device list updates would not be sent to remote homeservers if there were too many to send at once. (#11729)
Co-authored-by: Brendan Abolivier <babolivier@matrix.org>
This commit is contained in:
parent
1b1aed38e3
commit
22abfca8d9
1
changelog.d/11729.bugfix
Normal file
1
changelog.d/11729.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in Synapse v1.0.0 whereby some device list updates would not be sent to remote homeservers if there were too many to send at once.
|
@ -270,6 +270,10 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||||||
# The most recent request's opentracing_context is used as the
|
# The most recent request's opentracing_context is used as the
|
||||||
# context which created the Edu.
|
# context which created the Edu.
|
||||||
|
|
||||||
|
# This is the stream ID that we will return for the consumer to resume
|
||||||
|
# following this stream later.
|
||||||
|
last_processed_stream_id = from_stream_id
|
||||||
|
|
||||||
query_map = {}
|
query_map = {}
|
||||||
cross_signing_keys_by_user = {}
|
cross_signing_keys_by_user = {}
|
||||||
for user_id, device_id, update_stream_id, update_context in updates:
|
for user_id, device_id, update_stream_id, update_context in updates:
|
||||||
@ -295,6 +299,8 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||||||
if update_stream_id > previous_update_stream_id:
|
if update_stream_id > previous_update_stream_id:
|
||||||
query_map[key] = (update_stream_id, update_context)
|
query_map[key] = (update_stream_id, update_context)
|
||||||
|
|
||||||
|
last_processed_stream_id = update_stream_id
|
||||||
|
|
||||||
results = await self._get_device_update_edus_by_remote(
|
results = await self._get_device_update_edus_by_remote(
|
||||||
destination, from_stream_id, query_map
|
destination, from_stream_id, query_map
|
||||||
)
|
)
|
||||||
@ -307,7 +313,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||||||
# FIXME: remove this when enough servers have upgraded
|
# FIXME: remove this when enough servers have upgraded
|
||||||
results.append(("org.matrix.signing_key_update", result))
|
results.append(("org.matrix.signing_key_update", result))
|
||||||
|
|
||||||
return now_stream_id, results
|
return last_processed_stream_id, results
|
||||||
|
|
||||||
def _get_device_updates_by_remote_txn(
|
def _get_device_updates_by_remote_txn(
|
||||||
self,
|
self,
|
||||||
|
@ -94,7 +94,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
|
|||||||
def test_get_device_updates_by_remote(self):
|
def test_get_device_updates_by_remote(self):
|
||||||
device_ids = ["device_id1", "device_id2"]
|
device_ids = ["device_id1", "device_id2"]
|
||||||
|
|
||||||
# Add two device updates with a single stream_id
|
# Add two device updates with sequential `stream_id`s
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
|
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
|
||||||
)
|
)
|
||||||
@ -107,6 +107,54 @@ class DeviceStoreTestCase(HomeserverTestCase):
|
|||||||
# Check original device_ids are contained within these updates
|
# Check original device_ids are contained within these updates
|
||||||
self._check_devices_in_updates(device_ids, device_updates)
|
self._check_devices_in_updates(device_ids, device_updates)
|
||||||
|
|
||||||
|
def test_get_device_updates_by_remote_can_limit_properly(self):
|
||||||
|
"""
|
||||||
|
Tests that `get_device_updates_by_remote` returns an appropriate
|
||||||
|
stream_id to resume fetching from (without skipping any results).
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Add some device updates with sequential `stream_id`s
|
||||||
|
device_ids = [
|
||||||
|
"device_id1",
|
||||||
|
"device_id2",
|
||||||
|
"device_id3",
|
||||||
|
"device_id4",
|
||||||
|
"device_id5",
|
||||||
|
]
|
||||||
|
self.get_success(
|
||||||
|
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get all device updates ever meant for this remote
|
||||||
|
next_stream_id, device_updates = self.get_success(
|
||||||
|
self.store.get_device_updates_by_remote("somehost", -1, limit=3)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the first three original device_ids are contained within these updates
|
||||||
|
self._check_devices_in_updates(device_ids[:3], device_updates)
|
||||||
|
|
||||||
|
# Get the next batch of device updates
|
||||||
|
next_stream_id, device_updates = self.get_success(
|
||||||
|
self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the last two original device_ids are contained within these updates
|
||||||
|
self._check_devices_in_updates(device_ids[3:], device_updates)
|
||||||
|
|
||||||
|
# Add some more device updates to ensure it still resumes properly
|
||||||
|
device_ids = ["device_id6", "device_id7"]
|
||||||
|
self.get_success(
|
||||||
|
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the next batch of device updates
|
||||||
|
next_stream_id, device_updates = self.get_success(
|
||||||
|
self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the newly-added device_ids are contained within these updates
|
||||||
|
self._check_devices_in_updates(device_ids, device_updates)
|
||||||
|
|
||||||
def _check_devices_in_updates(self, expected_device_ids, device_updates):
|
def _check_devices_in_updates(self, expected_device_ids, device_updates):
|
||||||
"""Check that an specific device ids exist in a list of device update EDUs"""
|
"""Check that an specific device ids exist in a list of device update EDUs"""
|
||||||
self.assertEqual(len(device_updates), len(expected_device_ids))
|
self.assertEqual(len(device_updates), len(expected_device_ids))
|
||||||
|
Loading…
Reference in New Issue
Block a user