diff --git a/changelog.d/7453.bugfix b/changelog.d/7453.bugfix new file mode 100644 index 000000000..629a3d61e --- /dev/null +++ b/changelog.d/7453.bugfix @@ -0,0 +1 @@ +Fix a bug that would cause Synapse not to resync out-of-sync device lists. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9bd941b5a..29a19b457 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,6 +29,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -535,6 +536,15 @@ class DeviceListUpdater(object): iterable=True, ) + # Attempt to resync out of sync device lists every 30s. + self._resync_retry_in_progress = False + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): @@ -679,11 +689,50 @@ class DeviceListUpdater(object): return False @defer.inlineCallbacks - def user_device_resync(self, user_id): + def _maybe_retry_device_resync(self): + """Retry to resync device lists that are out of sync, except if another retry is + in progress. + """ + if self._resync_retry_in_progress: + return + + try: + # Prevent another call of this function to retry resyncing device lists so + # we don't send too many requests. + self._resync_retry_in_progress = True + # Get all of the users that need resyncing. + need_resync = yield self.store.get_user_ids_requiring_device_list_resync() + # Iterate over the set of user IDs. + for user_id in need_resync: + # Try to resync the current user's devices list. Exception handling + # isn't necessary here, since user_device_resync catches all instances + # of "Exception" that might be raised from the federation request. This + # means that if an exception is raised by this function, it must be + # because of a database issue, which means _maybe_retry_device_resync + # probably won't be able to go much further anyway. + result = yield self.user_device_resync( + user_id=user_id, mark_failed_as_stale=False, + ) + # user_device_resync only returns a result if it managed to successfully + # resync and update the database. Updating the table of users requiring + # resync isn't necessary here as user_device_resync already does it + # (through self.store.update_remote_device_list_cache). + if result: + logger.debug( + "Successfully resynced the device list for %s" % user_id, + ) + finally: + # Allow future calls to retry resyncinc out of sync device lists. + self._resync_retry_in_progress = False + + @defer.inlineCallbacks + def user_device_resync(self, user_id, mark_failed_as_stale=True): """Fetches all devices for a user and updates the device cache with them. Args: user_id (str): The user's id whose device_list will be updated. + mark_failed_as_stale (bool): Whether to mark the user's device list as stale + if the attempt to resync failed. Returns: Deferred[dict]: a dict with device info as under the "devices" in the result of this request: @@ -694,10 +743,23 @@ class DeviceListUpdater(object): origin = get_domain_from_id(user_id) try: result = yield self.federation.query_user_devices(origin, user_id) - except (NotRetryingDestination, RequestSendFailed, HttpResponseException): - # TODO: Remember that we are now out of sync and try again - # later - logger.warning("Failed to handle device list update for %s", user_id) + except NotRetryingDestination: + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + + return + except (RequestSendFailed, HttpResponseException) as e: + logger.warning( + "Failed to handle device list update for %s: %s", user_id, e, + ) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + # We abort on exceptions rather than accepting the update # as otherwise synapse will 'forget' that its device list # is out of date. If we bail then we will retry the resync @@ -711,13 +773,17 @@ class DeviceListUpdater(object): logger.info(e) return except Exception as e: - # TODO: Remember that we are now out of sync and try again - # later set_tag("error", True) log_kv( {"message": "Exception raised by federation request", "exception": e} ) logger.exception("Failed to handle device list update for %s", user_id) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + return log_kv({"result": result}) stream_id = result["stream_id"] diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index fe6d6ecfe..0e8378714 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +from typing import List, Optional, Set, Tuple from six import iteritems @@ -649,21 +649,31 @@ class DeviceWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]): + def get_user_ids_requiring_device_list_resync( + self, user_ids: Optional[Collection[str]] = None, + ) -> Set[str]: """Given a list of remote users return the list of users that we - should resync the device lists for. + should resync the device lists for. If None is given instead of a list, + return every user that we should resync the device lists for. Returns: - Deferred[Set[str]] + The IDs of users whose device lists need resync. """ - - rows = yield self.db.simple_select_many_batch( - table="device_lists_remote_resync", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync", - ) + if user_ids: + rows = yield self.db.simple_select_many_batch( + table="device_lists_remote_resync", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync_with_iterable", + ) + else: + rows = yield self.db.simple_select_list( + table="device_lists_remote_resync", + keyvalues=None, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync", + ) return {row["user_id"] for row in rows} diff --git a/tests/test_federation.py b/tests/test_federation.py index f297de95f..13ff14863 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -6,12 +6,13 @@ from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext from synapse.types import Requester, UserID from synapse.util import Clock +from synapse.util.retryutils import NotRetryingDestination from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver -class MessageAcceptTests(unittest.TestCase): +class MessageAcceptTests(unittest.HomeserverTestCase): def setUp(self): self.http_client = Mock() @@ -145,3 +146,63 @@ class MessageAcceptTests(unittest.TestCase): # Make sure the invalid event isn't there extrem = maybeDeferred(self.store.get_latest_event_ids_in_room, self.room_id) self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") + + def test_retry_device_list_resync(self): + """Tests that device lists are marked as stale if they couldn't be synced, and + that stale device lists are retried periodically. + """ + remote_user_id = "@john:test_remote" + remote_origin = "test_remote" + + # Track the number of attempts to resync the user's device list. + self.resync_attempts = 0 + + # When this function is called, increment the number of resync attempts (only if + # we're querying devices for the right user ID), then raise a + # NotRetryingDestination error to fail the resync gracefully. + def query_user_devices(destination, user_id): + if user_id == remote_user_id: + self.resync_attempts += 1 + + raise NotRetryingDestination(0, 0, destination) + + # Register the mock on the federation client. + federation_client = self.homeserver.get_federation_client() + federation_client.query_user_devices = Mock(side_effect=query_user_devices) + + # Register a mock on the store so that the incoming update doesn't fail because + # we don't share a room with the user. + store = self.homeserver.get_datastore() + store.get_rooms_for_user = Mock(return_value=["!someroom:test"]) + + # Manually inject a fake device list update. We need this update to include at + # least one prev_id so that the user's device list will need to be retried. + device_list_updater = self.homeserver.get_device_handler().device_list_updater + self.get_success( + device_list_updater.incoming_device_list_update( + origin=remote_origin, + edu_content={ + "deleted": False, + "device_display_name": "Mobile", + "device_id": "QBUAZIFURK", + "prev_id": [5], + "stream_id": 6, + "user_id": remote_user_id, + }, + ) + ) + + # Check that there was one resync attempt. + self.assertEqual(self.resync_attempts, 1) + + # Check that the resync attempt failed and caused the user's device list to be + # marked as stale. + need_resync = self.get_success( + store.get_user_ids_requiring_device_list_resync() + ) + self.assertIn(remote_user_id, need_resync) + + # Check that waiting for 30 seconds caused Synapse to retry resyncing the device + # list. + self.reactor.advance(30) + self.assertEqual(self.resync_attempts, 2)