From 0b014eb25e8c0766d2552fae2ba366492dc16d4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2022 13:05:31 +0100 Subject: [PATCH] Only send out device list updates for our own users (#12465) Broke in #12365 --- changelog.d/12465.feature | 1 + synapse/handlers/device.py | 10 +++-- synapse/storage/databases/main/devices.py | 4 +- tests/federation/test_federation_sender.py | 43 +++++++++++++++++++++- tests/storage/test_devices.py | 6 +-- 5 files changed, 56 insertions(+), 8 deletions(-) create mode 100644 changelog.d/12465.feature diff --git a/changelog.d/12465.feature b/changelog.d/12465.feature new file mode 100644 index 000000000..642dea966 --- /dev/null +++ b/changelog.d/12465.feature @@ -0,0 +1 @@ +Enable processing of device list updates asynchronously. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 958599e7b..3c0fc756d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -649,9 +649,13 @@ class DeviceHandler(DeviceWorkerHandler): return for user_id, device_id, room_id, stream_id, opentracing_context in rows: - joined_user_ids = await self.store.get_users_in_room(room_id) - hosts = {get_domain_from_id(u) for u in joined_user_ids} - hosts.discard(self.server_name) + hosts = set() + + # Ignore any users that aren't ours + if self.hs.is_mine_id(user_id): + joined_user_ids = await self.store.get_users_in_room(room_id) + hosts = {get_domain_from_id(u) for u in joined_user_ids} + hosts.discard(self.server_name) # Check if we've already sent this update to some hosts if current_stream_id == stream_id: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 74e4e2122..318e4df37 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1703,7 +1703,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): next(stream_id_iterator), user_id, device_id, - False, + not self.hs.is_mine_id( + user_id + ), # We only need to send out update for *our* users now, encoded_context if whitelisted_homeserver(destination) else "{}", ) diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 63ea4f9ee..91f982518 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -162,7 +162,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): def make_homeserver(self, reactor, clock): return self.setup_test_homeserver( - federation_transport_client=Mock(spec=["send_transaction"]), + federation_transport_client=Mock( + spec=["send_transaction", "query_user_devices"] + ), ) def default_config(self): @@ -218,6 +220,45 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): self.assertEqual(len(self.edus), 1) self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id) + def test_dont_send_device_updates_for_remote_users(self): + """Check that we don't send device updates for remote users""" + + # Send the server a device list EDU for the other user, this will cause + # it to try and resync the device lists. + self.hs.get_federation_transport_client().query_user_devices.return_value = ( + defer.succeed( + { + "stream_id": "1", + "user_id": "@user2:host2", + "devices": [{"device_id": "D1"}], + } + ) + ) + + self.get_success( + self.hs.get_device_handler().device_list_updater.incoming_device_list_update( + "host2", + { + "user_id": "@user2:host2", + "device_id": "D1", + "stream_id": "1", + "prev_ids": [], + }, + ) + ) + + self.reactor.advance(1) + + # We shouldn't see an EDU for that update + self.assertEqual(self.edus, []) + + # Check that we did successfully process the inbound EDU (otherwise this + # test would pass if we failed to process the EDU) + devices = self.get_success( + self.hs.get_datastores().main.get_cached_devices_for_user("@user2:host2") + ) + self.assertIn("D1", devices) + def test_upload_signatures(self): """Uploading signatures on some devices should produce updates for that user""" diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index 5491fbf6d..ccc389386 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -118,7 +118,7 @@ class DeviceStoreTestCase(HomeserverTestCase): device_ids = ["device_id1", "device_id2"] # Add two device updates with sequential `stream_id`s - self.add_device_change("user_id", device_ids, "somehost") + self.add_device_change("@user_id:test", device_ids, "somehost") # Get all device updates ever meant for this remote now_stream_id, device_updates = self.get_success( @@ -142,7 +142,7 @@ class DeviceStoreTestCase(HomeserverTestCase): "device_id4", "device_id5", ] - self.add_device_change("user_id", device_ids, "somehost") + self.add_device_change("@user_id:test", device_ids, "somehost") # Get device updates meant for this remote next_stream_id, device_updates = self.get_success( @@ -162,7 +162,7 @@ class DeviceStoreTestCase(HomeserverTestCase): # Add some more device updates to ensure it still resumes properly device_ids = ["device_id6", "device_id7"] - self.add_device_change("user_id", device_ids, "somehost") + self.add_device_change("@user_id:test", device_ids, "somehost") # Get the next batch of device updates next_stream_id, device_updates = self.get_success(