# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2019,2020 The Matrix.org Foundation C.I.C. # Copyright 2016 OpenMarket Ltd # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # import logging from typing import ( TYPE_CHECKING, AbstractSet, Dict, Iterable, List, Mapping, Optional, Set, Tuple, ) from synapse.api import errors from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import ( Codes, FederationDeniedError, HttpResponseException, InvalidAPICallError, RequestSendFailed, SynapseError, ) from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import ( DeviceListUpdates, JsonDict, JsonMapping, ScheduledTask, StrCollection, StreamKeyType, StreamToken, TaskStatus, UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, ) from synapse.util import stringutils from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.cancellation import cancellable from synapse.util.metrics import measure_func from synapse.util.retryutils import ( NotRetryingDestination, filter_destinations_by_retry_limiter, ) if TYPE_CHECKING: from synapse.server import HomeServer logger = logging.getLogger(__name__) DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 class DeviceWorkerHandler: device_list_updater: "DeviceListWorkerUpdater" def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.hs = hs self.store = hs.get_datastores().main self.notifier = hs.get_notifier() self.state = hs.get_state_handler() self._appservice_handler = hs.get_application_service_handler() self._state_storage = hs.get_storage_controllers().state self._auth_handler = hs.get_auth_handler() self._event_sources = hs.get_event_sources() self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled self._query_appservices_for_keys = ( hs.config.experimental.msc3984_appservice_key_query ) self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListWorkerUpdater(hs) self._task_scheduler.register_action( self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME ) @trace async def get_devices_by_user(self, user_id: str) -> List[JsonDict]: """ Retrieve the given user's devices Args: user_id: The user ID to query for devices. Returns: info on each device """ set_tag("user_id", user_id) device_map = await self.store.get_devices_by_user(user_id) ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None) devices = list(device_map.values()) for device in devices: _update_device_from_client_ips(device, ips) log_kv(device_map) return devices async def get_dehydrated_device( self, user_id: str ) -> Optional[Tuple[str, JsonDict]]: """Retrieve the information for a dehydrated device. Args: user_id: the user whose dehydrated device we are looking for Returns: a tuple whose first item is the device ID, and the second item is the dehydrated device information """ return await self.store.get_dehydrated_device(user_id) @trace async def get_device(self, user_id: str, device_id: str) -> JsonDict: """Retrieve the given device Args: user_id: The user to get the device from device_id: The device to fetch. Returns: info on the device Raises: errors.NotFoundError: if the device was not found """ device = await self.store.get_device(user_id, device_id) if device is None: raise errors.NotFoundError() ips = await self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) set_tag("device", str(device)) set_tag("ips", str(ips)) return device @cancellable async def get_device_changes_in_shared_rooms( self, user_id: str, room_ids: StrCollection, from_token: StreamToken, now_token: Optional[StreamToken] = None, ) -> Set[str]: """Get the set of users whose devices have changed who share a room with the given user. """ now_device_lists_key = self.store.get_device_stream_token() if now_token: now_device_lists_key = now_token.device_list_key changed_users = await self.store.get_device_list_changes_in_rooms( room_ids, from_token.device_list_key, now_device_lists_key, ) if changed_users is not None: # We also check if the given user has changed their device. If # they're in no rooms then the above query won't include them. changed = await self.store.get_users_whose_devices_changed( from_token.device_list_key, [user_id], to_key=now_device_lists_key, ) changed_users.update(changed) return changed_users # If the DB returned None then the `from_token` is too old, so we fall # back on looking for device updates for all users. users_who_share_room = await self.store.get_users_who_share_room_with_user( user_id ) tracked_users = set(users_who_share_room) # Always tell the user about their own devices tracked_users.add(user_id) changed = await self.store.get_users_whose_devices_changed( from_token.device_list_key, tracked_users, to_key=now_device_lists_key, ) return changed @trace @measure_func("device.get_user_ids_changed") @cancellable async def get_user_ids_changed( self, user_id: str, from_token: StreamToken ) -> DeviceListUpdates: """Get list of users that have had the devices updated, or have newly joined a room, that `user_id` may be interested in. """ set_tag("user_id", user_id) set_tag("from_token", str(from_token)) now_token = self._event_sources.get_current_token() # We need to work out all the different membership changes for the user # and user they share a room with, to pass to # `generate_sync_entry_for_device_list`. See its docstring for details # on the data required. joined_room_ids = await self.store.get_rooms_for_user(user_id) # Get the set of rooms that the user has joined/left membership_changes = ( await self.store.get_current_state_delta_membership_changes_for_user( user_id, from_key=from_token.room_key, to_key=now_token.room_key ) ) # Check for newly joined or left rooms. We need to make sure that we add # to newly joined in the case membership goes from join -> leave -> join # again. newly_joined_rooms: Set[str] = set() newly_left_rooms: Set[str] = set() for change in membership_changes: # We check for changes in "joinedness", i.e. if the membership has # changed to or from JOIN. if change.membership == Membership.JOIN: if change.prev_membership != Membership.JOIN: newly_joined_rooms.add(change.room_id) newly_left_rooms.discard(change.room_id) elif change.prev_membership == Membership.JOIN: newly_joined_rooms.discard(change.room_id) newly_left_rooms.add(change.room_id) # We now work out if any other users have since joined or left the rooms # the user is currently in. # List of membership changes per room room_to_deltas: Dict[str, List[StateDelta]] = {} # The set of event IDs of membership events (so we can fetch their # associated membership). memberships_to_fetch: Set[str] = set() # TODO: Only pull out membership events? state_changes = await self.store.get_current_state_deltas_for_rooms( joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key ) for delta in state_changes: if delta.event_type != EventTypes.Member: continue room_to_deltas.setdefault(delta.room_id, []).append(delta) if delta.event_id: memberships_to_fetch.add(delta.event_id) if delta.prev_event_id: memberships_to_fetch.add(delta.prev_event_id) # Fetch all the memberships for the membership events event_id_to_memberships = await self.store.get_membership_from_event_ids( memberships_to_fetch ) joined_invited_knocked = ( Membership.JOIN, Membership.INVITE, Membership.KNOCK, ) # We now want to find any user that have newly joined/invited/knocked, # or newly left, similarly to above. newly_joined_or_invited_or_knocked_users: Set[str] = set() newly_left_users: Set[str] = set() for _, deltas in room_to_deltas.items(): for delta in deltas: # Get the prev/new memberships for the delta new_membership = None prev_membership = None if delta.event_id: m = event_id_to_memberships.get(delta.event_id) if m is not None: new_membership = m.membership if delta.prev_event_id: m = event_id_to_memberships.get(delta.prev_event_id) if m is not None: prev_membership = m.membership # Check if a user has newly joined/invited/knocked, or left. if new_membership in joined_invited_knocked: if prev_membership not in joined_invited_knocked: newly_joined_or_invited_or_knocked_users.add(delta.state_key) newly_left_users.discard(delta.state_key) elif prev_membership in joined_invited_knocked: newly_joined_or_invited_or_knocked_users.discard(delta.state_key) newly_left_users.add(delta.state_key) # Now we actually calculate the device list entry with the information # calculated above. device_list_updates = await self.generate_sync_entry_for_device_list( user_id=user_id, since_token=from_token, now_token=now_token, joined_room_ids=joined_room_ids, newly_joined_rooms=newly_joined_rooms, newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, newly_left_rooms=newly_left_rooms, newly_left_users=newly_left_users, ) log_kv( { "changed": device_list_updates.changed, "left": device_list_updates.left, } ) return device_list_updates @measure_func("_generate_sync_entry_for_device_list") async def generate_sync_entry_for_device_list( self, user_id: str, since_token: StreamToken, now_token: StreamToken, joined_room_ids: AbstractSet[str], newly_joined_rooms: AbstractSet[str], newly_joined_or_invited_or_knocked_users: AbstractSet[str], newly_left_rooms: AbstractSet[str], newly_left_users: AbstractSet[str], ) -> DeviceListUpdates: """Generate the DeviceListUpdates section of sync Args: sync_result_builder newly_joined_rooms: Set of rooms user has joined since previous sync newly_joined_or_invited_or_knocked_users: Set of users that have joined, been invited to a room or are knocking on a room since previous sync. newly_left_rooms: Set of rooms user has left since previous sync newly_left_users: Set of users that have left a room we're in since previous sync """ # Take a copy since these fields will be mutated later. newly_joined_or_invited_or_knocked_users = set( newly_joined_or_invited_or_knocked_users ) newly_left_users = set(newly_left_users) # We want to figure out what user IDs the client should refetch # device keys for, and which users we aren't going to track changes # for anymore. # # For the first step we check: # a. if any users we share a room with have updated their devices, # and # b. we also check if we've joined any new rooms, or if a user has # joined a room we're in. # # For the second step we just find any users we no longer share a # room with by looking at all users that have left a room plus users # that were in a room we've left. users_that_have_changed = set() # Step 1a, check for changes in devices of users we share a room # with users_that_have_changed = await self.get_device_changes_in_shared_rooms( user_id, joined_room_ids, from_token=since_token, now_token=now_token, ) # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: joined_users = await self.store.get_users_in_room(room_id) newly_joined_or_invited_or_knocked_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. users_that_have_changed.update(newly_joined_or_invited_or_knocked_users) user_signatures_changed = await self.store.get_users_whose_signatures_changed( user_id, since_token.device_list_key ) users_that_have_changed.update(user_signatures_changed) # Now find users that we no longer track for room_id in newly_left_rooms: left_users = await self.store.get_users_in_room(room_id) newly_left_users.update(left_users) # Remove any users that we still share a room with. left_users_rooms = await self.store.get_rooms_for_users(newly_left_users) for user_id, entries in left_users_rooms.items(): if any(rid in joined_room_ids for rid in entries): newly_left_users.discard(user_id) return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users) async def on_federation_query_user_devices(self, user_id: str) -> JsonDict: if not self.hs.is_mine(UserID.from_string(user_id)): raise SynapseError(400, "User is not hosted on this homeserver") stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query( user_id ) master_key = await self.store.get_e2e_cross_signing_key(user_id, "master") self_signing_key = await self.store.get_e2e_cross_signing_key( user_id, "self_signing" ) # Check if the application services have any results. if self._query_appservices_for_keys: # Query the appservice for all devices for this user. query: Dict[str, Optional[List[str]]] = {user_id: None} # Query the appservices for any keys. appservice_results = await self._appservice_handler.query_keys(query) # Merge results, overriding anything from the database. appservice_devices = appservice_results.get("device_keys", {}).get( user_id, {} ) # Filter the database results to only those devices that the appservice has # *not* responded with. devices = [d for d in devices if d["device_id"] not in appservice_devices] # Append the appservice response by wrapping each result in another dictionary. devices.extend( {"device_id": device_id, "keys": device} for device_id, device in appservice_devices.items() ) # TODO Handle cross-signing keys. return { "user_id": user_id, "stream_id": stream_id, "devices": devices, "master_key": master_key, "self_signing_key": self_signing_key, } async def handle_room_un_partial_stated(self, room_id: str) -> None: """Handles sending appropriate device list updates in a room that has gone from partial to full state. """ # TODO(faster_joins): worker mode support # https://github.com/matrix-org/synapse/issues/12994 logger.error( "Trying handling device list state for partial join: not supported on workers." ) DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 DEVICE_MSGS_DELETE_SLEEP_MS = 100 async def _delete_device_messages( self, task: ScheduledTask, ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" assert task.params is not None user_id = task.params["user_id"] device_id = task.params["device_id"] up_to_stream_id = task.params["up_to_stream_id"] # Delete the messages in batches to avoid too much DB load. from_stream_id = None while True: from_stream_id, _ = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, from_stream_id=from_stream_id, to_stream_id=up_to_stream_id, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, ) if from_stream_id is None: return TaskStatus.COMPLETE, None, None await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) class DeviceHandler(DeviceWorkerHandler): device_list_updater: "DeviceListUpdater" def __init__(self, hs: "HomeServer"): super().__init__(hs) self.federation_sender = hs.get_federation_sender() self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool self._dont_notify_new_devices_for = ( hs.config.registration.dont_notify_new_devices_for ) self.device_list_updater = DeviceListUpdater(hs, self) federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler( EduTypes.DEVICE_LIST_UPDATE, self.device_list_updater.incoming_device_list_update, ) # Whether `_handle_new_device_update_async` is currently processing. self._handle_new_device_update_is_processing = False # If a new device update may have happened while the loop was # processing. self._handle_new_device_update_new_data = False # On start up check if there are any updates pending. hs.get_reactor().callWhenRunning(self._handle_new_device_update_async) self._delete_stale_devices_after = hs.config.server.delete_stale_devices_after # Ideally we would run this on a worker and condition this on the # "run_background_tasks_on" setting, but this would mean making the notification # of device list changes over federation work on workers, which is nontrivial. if self._delete_stale_devices_after is not None: self.clock.looping_call( run_as_background_process, DELETE_STALE_DEVICES_INTERVAL_MS, "delete_stale_devices", self._delete_stale_devices, ) def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. Args: name: The name of the device. Raises: SynapseError: if the device name is too long. """ if name and len(name) > MAX_DEVICE_DISPLAY_NAME_LEN: raise SynapseError( 400, "Device display name is too long (max %i)" % (MAX_DEVICE_DISPLAY_NAME_LEN,), errcode=Codes.TOO_LARGE, ) async def check_device_registered( self, user_id: str, device_id: Optional[str], initial_device_display_name: Optional[str] = None, auth_provider_id: Optional[str] = None, auth_provider_session_id: Optional[str] = None, ) -> str: """ If the given device has not been registered, register it with the supplied display name. If no device_id is supplied, we make one up. Args: user_id: @user:id device_id: device id supplied by client initial_device_display_name: device display name from client auth_provider_id: The SSO IdP the user used, if any. auth_provider_session_id: The session ID (sid) got from the SSO IdP. Returns: device id (generated if none was supplied) """ self._check_device_name_length(initial_device_display_name) # Check if we should send out device lists updates for this new device. notify = user_id not in self._dont_notify_new_devices_for if device_id is not None: new_device = await self.store.store_device( user_id=user_id, device_id=device_id, initial_device_display_name=initial_device_display_name, auth_provider_id=auth_provider_id, auth_provider_session_id=auth_provider_session_id, ) if new_device: if notify: await self.notify_device_update(user_id, [device_id]) return device_id # if the device id is not specified, we'll autogen one, but loop a few # times in case of a clash. attempts = 0 while attempts < 5: new_device_id = stringutils.random_string(10).upper() new_device = await self.store.store_device( user_id=user_id, device_id=new_device_id, initial_device_display_name=initial_device_display_name, auth_provider_id=auth_provider_id, auth_provider_session_id=auth_provider_session_id, ) if new_device: if notify: await self.notify_device_update(user_id, [new_device_id]) return new_device_id attempts += 1 raise errors.StoreError(500, "Couldn't generate a device ID.") async def _delete_stale_devices(self) -> None: """Background task that deletes devices which haven't been accessed for more than a configured time period. """ # We should only be running this job if the config option is defined. assert self._delete_stale_devices_after is not None now_ms = self.clock.time_msec() since_ms = now_ms - self._delete_stale_devices_after devices = await self.store.get_local_devices_not_accessed_since(since_ms) for user_id, user_devices in devices.items(): await self.delete_devices(user_id, user_devices) @trace async def delete_all_devices_for_user( self, user_id: str, except_device_id: Optional[str] = None ) -> None: """Delete all of the user's devices Args: user_id: The user to remove all devices from except_device_id: optional device id which should not be deleted """ device_map = await self.store.get_devices_by_user(user_id) device_ids = list(device_map) if except_device_id is not None: device_ids = [d for d in device_ids if d != except_device_id] await self.delete_devices(user_id, device_ids) async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: """Delete several devices Args: user_id: The user to delete devices from. device_ids: The list of device IDs to delete """ to_device_stream_id = self._event_sources.get_current_token().to_device_key try: await self.store.delete_devices(user_id, device_ids) except errors.StoreError as e: if e.code == 404: # no match set_tag("error", True) set_tag("reason", "User doesn't have that device id.") else: raise # Delete data specific to each device. Not optimised as it is not # considered as part of a critical path. for device_id in device_ids: await self._auth_handler.delete_access_tokens_for_user( user_id, device_id=device_id ) await self.store.delete_e2e_keys_by_device( user_id=user_id, device_id=device_id ) if self.hs.config.experimental.msc3890_enabled: # Remove any local notification settings for this device in accordance # with MSC3890. await self._account_data_handler.remove_account_data_for_user( user_id, f"org.matrix.msc3890.local_notification_settings.{device_id}", ) # Delete device messages asynchronously and in batches using the task scheduler # We specify an upper stream id to avoid deleting non delivered messages # if an user re-uses a device ID. await self._task_scheduler.schedule_task( DELETE_DEVICE_MSGS_TASK_NAME, resource_id=device_id, params={ "user_id": user_id, "device_id": device_id, "up_to_stream_id": to_device_stream_id, }, ) # Pushers are deleted after `delete_access_tokens_for_user` is called so that # modules using `on_logged_out` hook can use them if needed. await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) await self.notify_device_update(user_id, device_ids) async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device Args: user_id: The user to update devices of. device_id: The device to update. content: body of update request """ # Reject a new displayname which is too long. new_display_name = content.get("display_name") self._check_device_name_length(new_display_name) try: await self.store.update_device( user_id, device_id, new_display_name=new_display_name ) await self.notify_device_update(user_id, [device_id]) except errors.StoreError as e: if e.code == 404: raise errors.NotFoundError() else: raise @trace @measure_func("notify_device_update") async def notify_device_update( self, user_id: str, device_ids: StrCollection ) -> None: """Notify that a user's device(s) has changed. Pokes the notifier, and remote servers if the user is local. Args: user_id: The Matrix ID of the user who's device list has been updated. device_ids: The device IDs that have changed. """ if not device_ids: # No changes to notify about, so this is a no-op. return room_ids = await self.store.get_rooms_for_user(user_id) position = await self.store.add_device_change_to_streams( user_id, device_ids, room_ids=room_ids, ) if not position: # This should only happen if there are no updates, so we bail. return for device_id in device_ids: logger.debug( "Notifying about update %r/%r, ID: %r", user_id, device_id, position ) # specify the user ID too since the user should always get their own device list # updates, even if they aren't in any rooms. self.notifier.on_new_event( StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids ) # We may need to do some processing asynchronously for local user IDs. if self.hs.is_mine_id(user_id): self._handle_new_device_update_async() async def notify_user_signature_update( self, from_user_id: str, user_ids: List[str] ) -> None: """Notify a user that they have made new signatures of other users. Args: from_user_id: the user who made the signature user_ids: the users IDs that have new signatures """ position = await self.store.add_user_signature_change_to_streams( from_user_id, user_ids ) self.notifier.on_new_event( StreamKeyType.DEVICE_LIST, position, users=[from_user_id] ) async def store_dehydrated_device( self, user_id: str, device_id: Optional[str], device_data: JsonDict, initial_device_display_name: Optional[str] = None, keys_for_device: Optional[JsonDict] = None, ) -> str: """Store a dehydrated device for a user, optionally storing the keys associated with it as well. If the user had a previous dehydrated device, it is removed. Args: user_id: the user that we are storing the device for device_id: device id supplied by client device_data: the dehydrated device information initial_device_display_name: The display name to use for the device keys_for_device: keys for the dehydrated device Returns: device id of the dehydrated device """ device_id = await self.check_device_registered( user_id, device_id, initial_device_display_name, ) time_now = self.clock.time_msec() old_device_id = await self.store.store_dehydrated_device( user_id, device_id, device_data, time_now, keys_for_device ) if old_device_id is not None: await self.delete_devices(user_id, [old_device_id]) return device_id async def rehydrate_device( self, user_id: str, access_token: str, device_id: str ) -> dict: """Process a rehydration request from the user. Args: user_id: the user who is rehydrating the device access_token: the access token used for the request device_id: the ID of the device that will be rehydrated Returns: a dict containing {"success": True} """ success = await self.store.remove_dehydrated_device(user_id, device_id) if not success: raise errors.NotFoundError() # If the dehydrated device was successfully deleted (the device ID # matched the stored dehydrated device), then modify the access # token and refresh token to use the dehydrated device's ID and # copy the old device display name to the dehydrated device, # and destroy the old device ID old_device_id = await self.store.set_device_for_access_token( access_token, device_id ) await self.store.set_device_for_refresh_token(user_id, old_device_id, device_id) old_device = await self.store.get_device(user_id, old_device_id) if old_device is None: raise errors.NotFoundError() await self.store.update_device(user_id, device_id, old_device["display_name"]) # can't call self.delete_device because that will clobber the # access token so call the storage layer directly await self.store.delete_devices(user_id, [old_device_id]) await self.store.delete_e2e_keys_by_device( user_id=user_id, device_id=old_device_id ) # tell everyone that the old device is gone and that the dehydrated # device has a new display name await self.notify_device_update(user_id, [old_device_id, device_id]) return {"success": True} async def delete_dehydrated_device(self, user_id: str, device_id: str) -> None: """ Delete a stored dehydrated device. Args: user_id: the user_id to delete the device from device_id: id of the dehydrated device to delete """ success = await self.store.remove_dehydrated_device(user_id, device_id) if not success: raise errors.NotFoundError() await self.delete_devices(user_id, [device_id]) await self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id) @wrap_as_background_process("_handle_new_device_update_async") async def _handle_new_device_update_async(self) -> None: """Called when we have a new local device list update that we need to send out over federation. This happens in the background so as not to block the original request that generated the device update. """ if self._handle_new_device_update_is_processing: self._handle_new_device_update_new_data = True return self._handle_new_device_update_is_processing = True # The stream ID we processed previous iteration (if any), and the set of # hosts we've already poked about for this update. This is so that we # don't poke the same remote server about the same update repeatedly. current_stream_id = None hosts_already_sent_to: Set[str] = set() try: stream_id, room_id = await self.store.get_device_change_last_converted_pos() while True: self._handle_new_device_update_new_data = False max_stream_id = self.store.get_device_stream_token() rows = await self.store.get_uncoverted_outbound_room_pokes( stream_id, room_id ) if not rows: # If the DB returned nothing then there is nothing left to # do, *unless* a new device list update happened during the # DB query. # Advance `(stream_id, room_id)`. # `max_stream_id` comes from *before* the query for unconverted # rows, which means that any unconverted rows must have a larger # stream ID. if max_stream_id > stream_id: stream_id, room_id = max_stream_id, "" await self.store.set_device_change_last_converted_pos( stream_id, room_id ) else: assert max_stream_id == stream_id # Avoid moving `room_id` backwards. if self._handle_new_device_update_new_data: continue else: return for user_id, device_id, room_id, stream_id, opentracing_context in rows: hosts = set() # Ignore any users that aren't ours if self.hs.is_mine_id(user_id): hosts = set( await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) ) hosts.discard(self.server_name) # For rooms with partial state, `hosts` is merely an # approximation. When we transition to a full state room, we # will have to send out device list updates to any servers we # missed. # Check if we've already sent this update to some hosts if current_stream_id == stream_id: hosts -= hosts_already_sent_to await self.store.add_device_list_outbound_pokes( user_id=user_id, device_id=device_id, room_id=room_id, hosts=hosts, context=opentracing_context, ) await self.store.mark_redundant_device_lists_pokes( user_id=user_id, device_id=device_id, room_id=room_id, converted_upto_stream_id=stream_id, ) # Notify replication that we've updated the device list stream. self.notifier.notify_replication() if hosts: logger.info( "Sending device list update notif for %r to: %r", user_id, hosts, ) await self.federation_sender.send_device_messages( hosts, immediate=False ) # TODO: when called, this isn't in a logging context. # This leads to log spam, sentry event spam, and massive # memory usage. # See https://github.com/matrix-org/synapse/issues/12552. # log_kv( # {"message": "sent device update to host", "host": host} # ) if current_stream_id != stream_id: # Clear the set of hosts we've already sent to as we're # processing a new update. hosts_already_sent_to.clear() hosts_already_sent_to.update(hosts) current_stream_id = stream_id # Advance `(stream_id, room_id)`. _, _, room_id, stream_id, _ = rows[-1] await self.store.set_device_change_last_converted_pos( stream_id, room_id ) finally: self._handle_new_device_update_is_processing = False async def handle_room_un_partial_stated(self, room_id: str) -> None: """Handles sending appropriate device list updates in a room that has gone from partial to full state. """ # We defer to the device list updater to handle pending remote device # list updates. await self.device_list_updater.handle_room_un_partial_stated(room_id) # Replay local updates. ( join_event_id, device_lists_stream_id, ) = await self.store.get_join_event_id_and_device_lists_stream_id_for_partial_state( room_id ) # Get the local device list changes that have happened in the room since # we started joining. If there are no updates there's nothing left to do. changes = await self.store.get_device_list_changes_in_room( room_id, device_lists_stream_id ) local_changes = {(u, d) for u, d in changes if self.hs.is_mine_id(u)} if not local_changes: return # Note: We have persisted the full state at this point, we just haven't # cleared the `partial_room` flag. join_state_ids = await self._state_storage.get_state_ids_for_event( join_event_id, await_full_state=False ) current_state_ids = await self.store.get_partial_current_state_ids(room_id) # Now we need to work out all servers that might have been in the room # at any point during our join. # First we look for any membership states that have changed between the # initial join and now... all_keys = set(join_state_ids) all_keys.update(current_state_ids) potentially_changed_hosts = set() for etype, state_key in all_keys: if etype != EventTypes.Member: continue prev = join_state_ids.get((etype, state_key)) current = current_state_ids.get((etype, state_key)) if prev != current: potentially_changed_hosts.add(get_domain_from_id(state_key)) # ... then we add all the hosts that are currently joined to the room... current_hosts_in_room = await self.store.get_current_hosts_in_room(room_id) potentially_changed_hosts.update(current_hosts_in_room) # ... and finally we remove any hosts that we were told about, as we # will have sent device list updates to those hosts when they happened. known_hosts_at_join = await self.store.get_partial_state_servers_at_join( room_id ) assert known_hosts_at_join is not None potentially_changed_hosts.difference_update(known_hosts_at_join) potentially_changed_hosts.discard(self.server_name) if not potentially_changed_hosts: # Nothing to do. return logger.info( "Found %d changed hosts to send device list updates to", len(potentially_changed_hosts), ) for user_id, device_id in local_changes: await self.store.add_device_list_outbound_pokes( user_id=user_id, device_id=device_id, room_id=room_id, hosts=potentially_changed_hosts, context=None, ) # Notify things that device lists need to be sent out. self.notifier.notify_replication() await self.federation_sender.send_device_messages( potentially_changed_hosts, immediate=False ) def _update_device_from_client_ips( device: JsonDict, client_ips: Mapping[Tuple[str, str], DeviceLastConnectionInfo] ) -> None: ip = client_ips.get((device["user_id"], device["device_id"])) device.update( { "last_seen_user_agent": ip.user_agent if ip else None, "last_seen_ts": ip.last_seen if ip else None, "last_seen_ip": ip.ip if ip else None, } ) class DeviceListWorkerUpdater: "Handles incoming device list updates from federation and contacts the main process over replication" def __init__(self, hs: "HomeServer"): from synapse.replication.http.devices import ( ReplicationMultiUserDevicesResyncRestServlet, ) self._multi_user_device_resync_client = ( ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. Returns: Dict from User ID to the same Dict as `user_device_resync`. """ # mark_failed_as_stale is not sent. Ensure this doesn't break expectations. assert mark_failed_as_stale if not user_ids: # Shortcut empty requests return {} return await self._multi_user_device_resync_client(user_ids=user_ids) class DeviceListUpdater(DeviceListWorkerUpdater): "Handles incoming device list updates from federation and updates the DB" def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): self.store = hs.get_datastores().main self.federation = hs.get_federation_client() self.clock = hs.get_clock() self.device_handler = device_handler self._notifier = hs.get_notifier() self._remote_edu_linearizer = Linearizer(name="remote_device_list") self._resync_linearizer = Linearizer(name="remote_device_resync") # user_id -> list of updates waiting to be handled. self._pending_updates: Dict[ str, List[Tuple[str, str, Iterable[str], JsonDict]] ] = {} # Recently seen stream ids. We don't bother keeping these in the DB, # but they're useful to have them about to reduce the number of spurious # resyncs. self._seen_updates: ExpiringCache[str, Set[str]] = ExpiringCache( cache_name="device_update_edu", clock=self.clock, max_len=10000, expiry_ms=30 * 60 * 1000, iterable=True, ) # Attempt to resync out of sync device lists every 30s. self._resync_retry_in_progress = False self.clock.looping_call( run_as_background_process, 30 * 1000, func=self._maybe_retry_device_resync, desc="_maybe_retry_device_resync", ) @trace async def incoming_device_list_update( self, origin: str, edu_content: JsonDict ) -> None: """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ set_tag("origin", origin) set_tag("edu_content", str(edu_content)) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints prev_ids = edu_content.pop("prev_id", []) if not isinstance(prev_ids, list): raise SynapseError( 400, "Device list update had an invalid 'prev_ids' field" ) prev_ids = [str(p) for p in prev_ids] # They may come as ints if get_domain_from_id(user_id) != origin: # TODO: Raise? logger.warning( "Got device list update edu for %r/%r from %r", user_id, device_id, origin, ) set_tag("error", True) log_kv( { "message": "Got a device list update edu from a user and " "device which does not match the origin of the request.", "user_id": user_id, "device_id": device_id, } ) return # Check if we are partially joining any rooms. If so we need to store # all device list updates so that we can handle them correctly once we # know who is in the room. # TODO(faster_joins): this fetches and processes a bunch of data that we don't # use. Could be replaced by a tighter query e.g. # SELECT EXISTS(SELECT 1 FROM partial_state_rooms) partial_rooms = await self.store.get_partial_state_room_resync_info() if partial_rooms: await self.store.add_remote_device_list_to_pending( user_id, device_id, ) self._notifier.notify_replication() room_ids = await self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. set_tag("error", True) log_kv( { "message": "Got an update from a user for which " "we don't share any rooms", "other user_id": user_id, } ) logger.warning( "Got device list update edu for %r/%r, but don't share a room", user_id, device_id, ) return logger.debug("Received device list update for %r/%r", user_id, device_id) self._pending_updates.setdefault(user_id, []).append( (device_id, stream_id, prev_ids, edu_content) ) await self._handle_device_updates(user_id) @measure_func("_incoming_device_list_update") async def _handle_device_updates(self, user_id: str) -> None: "Actually handle pending updates." async with self._remote_edu_linearizer.queue(user_id): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: # This can happen since we batch updates return for device_id, stream_id, prev_ids, _ in pending_updates: logger.debug( "Handling update %r/%r, ID: %r, prev: %r ", user_id, device_id, stream_id, prev_ids, ) # Given a list of updates we check if we need to resync. This # happens if we've missed updates. resync = await self._need_to_do_resync(user_id, pending_updates) if logger.isEnabledFor(logging.INFO): logger.info( "Received device list update for %s, requiring resync: %s. Devices: %s", user_id, resync, ", ".join(u[0] for u in pending_updates), ) if resync: # We mark as stale up front in case we get restarted. await self.store.mark_remote_users_device_caches_as_stale([user_id]) run_as_background_process( "_maybe_retry_device_resync", self.multi_user_device_resync, [user_id], False, ) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) for device_id, stream_id, _, content in pending_updates: await self.store.update_remote_device_list_cache_entry( user_id, device_id, content, stream_id ) await self.device_handler.notify_device_update( user_id, [device_id for device_id, _, _, _ in pending_updates] ) self._seen_updates.setdefault(user_id, set()).update( stream_id for _, stream_id, _, _ in pending_updates ) async def _need_to_do_resync( self, user_id: str, updates: Iterable[Tuple[str, str, Iterable[str], JsonDict]] ) -> bool: """Given a list of updates for a user figure out if we need to do a full resync, or whether we have enough data that we can just apply the delta. """ seen_updates: Set[str] = self._seen_updates.get(user_id, set()) extremity = await self.store.get_device_list_last_stream_id_for_remote(user_id) logger.debug("Current extremity for %r: %r", user_id, extremity) stream_id_in_updates = set() # stream_ids in updates list for _, stream_id, prev_ids, _ in updates: if not prev_ids: # We always do a resync if there are no previous IDs return True for prev_id in prev_ids: if prev_id == extremity: continue elif prev_id in seen_updates: continue elif prev_id in stream_id_in_updates: continue else: return True stream_id_in_updates.add(stream_id) return False @trace async def _maybe_retry_device_resync(self) -> None: """Retry to resync device lists that are out of sync, except if another retry is in progress. """ if self._resync_retry_in_progress: return try: # Prevent another call of this function to retry resyncing device lists so # we don't send too many requests. self._resync_retry_in_progress = True # Get all of the users that need resyncing. need_resync = await self.store.get_user_ids_requiring_device_list_resync() # Filter out users whose host is marked as "down" up front. hosts = await filter_destinations_by_retry_limiter( {get_domain_from_id(u) for u in need_resync}, self.clock, self.store ) hosts = set(hosts) # Iterate over the set of user IDs. for user_id in need_resync: if get_domain_from_id(user_id) not in hosts: continue try: # Try to resync the current user's devices list. result = (await self.multi_user_device_resync([user_id], False))[ user_id ] # user_device_resync only returns a result if it managed to # successfully resync and update the database. Updating the table # of users requiring resync isn't necessary here as # user_device_resync already does it (through # self.store.update_remote_device_list_cache). if result: logger.debug( "Successfully resynced the device list for %s", user_id, ) except Exception as e: # If there was an issue resyncing this user, e.g. if the remote # server sent a malformed result, just log the error instead of # aborting all the subsequent resyncs. logger.debug( "Could not resync the device list for %s: %s", user_id, e, ) finally: # Allow future calls to retry resyncinc out of sync device lists. self._resync_retry_in_progress = False async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. Returns: Dict from User ID to the same Dict as `user_device_resync`. """ if not user_ids: return {} origins = {UserID.from_string(user_id).domain for user_id in user_ids} if len(origins) != 1: raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}") result = {} failed = set() # TODO(Perf): Actually batch these up for user_id in user_ids: async with self._resync_linearizer.queue(user_id): ( user_result, user_failed, ) = await self._user_device_resync_returning_failed(user_id) result[user_id] = user_result if user_failed: failed.add(user_id) if mark_failed_as_stale: await self.store.mark_remote_users_device_caches_as_stale(failed) return result async def _user_device_resync_returning_failed( self, user_id: str ) -> Tuple[Optional[JsonMapping], bool]: """Fetches all devices for a user and updates the device cache with them. Args: user_id: The user's id whose device_list will be updated. Returns: - A dict with device info as under the "devices" in the result of this request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid None when we weren't able to fetch the device info for some reason, e.g. due to a connection problem. - True iff the resync failed and the device list should be marked as stale. """ # Check that we haven't gone and fetched the devices since we last # checked if we needed to resync these device lists. if await self.store.get_users_whose_devices_are_cached([user_id]): cached = await self.store.get_cached_devices_for_user(user_id) return cached, False logger.debug("Attempting to resync the device list for %s", user_id) log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: result = await self.federation.query_user_devices(origin, user_id) except NotRetryingDestination: return None, True except (RequestSendFailed, HttpResponseException) as e: logger.warning( "Failed to handle device list update for %s: %s", user_id, e, ) # We abort on exceptions rather than accepting the update # as otherwise synapse will 'forget' that its device list # is out of date. If we bail then we will retry the resync # next time we get a device list update for this user_id. # This makes it more likely that the device lists will # eventually become consistent. return None, True except FederationDeniedError as e: set_tag("error", True) log_kv({"reason": "FederationDeniedError"}) logger.info(e) return None, False except Exception as e: set_tag("error", True) log_kv( {"message": "Exception raised by federation request", "exception": e} ) logger.exception("Failed to handle device list update for %s", user_id) return None, True log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] # Get the master key and the self-signing key for this user if provided in the # response (None if not in the response). # The response will not contain the user signing key, as this key is only used by # its owner, thus it doesn't make sense to send it over federation. master_key = result.get("master_key") self_signing_key = result.get("self_signing_key") ignore_devices = False # If the remote server has more than ~1000 devices for this user # we assume that something is going horribly wrong (e.g. a bot # that logs in and creates a new device every time it tries to # send a message). Maintaining lots of devices per user in the # cache can cause serious performance issues as if this request # takes more than 60s to complete, internal replication from the # inbound federation worker to the synapse master may time out # causing the inbound federation to fail and causing the remote # server to retry, causing a DoS. So in this scenario we give # up on storing the total list of devices and only handle the # delta instead. if len(devices) > 1000: logger.warning( "Ignoring device list snapshot for %s as it has >1K devs (%d)", user_id, len(devices), ) devices = [] ignore_devices = True else: prev_stream_id = await self.store.get_device_list_last_stream_id_for_remote( user_id ) cached_devices = await self.store.get_cached_devices_for_user(user_id) # To ensure that a user with no devices is cached, we skip the resync only # if we have a stream_id from previously writing a cache entry. if prev_stream_id is not None and cached_devices == { d["device_id"]: d for d in devices }: logging.info( "Skipping device list resync for %s, as our cache matches already", user_id, ) devices = [] ignore_devices = True for device in devices: logger.debug( "Handling resync update %r/%r, ID: %r", user_id, device["device_id"], stream_id, ) if not ignore_devices: await self.store.update_remote_device_list_cache( user_id, devices, stream_id ) # mark the cache as valid, whether or not we actually processed any device # list updates. await self.store.mark_remote_user_device_cache_as_valid(user_id) device_ids = [device["device_id"] for device in devices] # Handle cross-signing keys. cross_signing_device_ids = await self.process_cross_signing_key_update( user_id, master_key, self_signing_key, ) device_ids = device_ids + cross_signing_device_ids if device_ids: await self.device_handler.notify_device_update(user_id, device_ids) # We clobber the seen updates since we've re-synced from a given # point. self._seen_updates[user_id] = {stream_id} return result, False async def process_cross_signing_key_update( self, user_id: str, master_key: Optional[JsonDict], self_signing_key: Optional[JsonDict], ) -> List[str]: """Process the given new master and self-signing key for the given remote user. Args: user_id: The ID of the user these keys are for. master_key: The dict of the cross-signing master key as returned by the remote server. self_signing_key: The dict of the cross-signing self-signing key as returned by the remote server. Return: The device IDs for the given keys. """ device_ids = [] current_keys_map = await self.store.get_e2e_cross_signing_keys_bulk([user_id]) current_keys = current_keys_map.get(user_id) or {} if master_key and master_key != current_keys.get("master"): await self.store.set_e2e_cross_signing_key(user_id, "master", master_key) _, verify_key = get_verify_key_from_cross_signing_key(master_key) # verify_key is a VerifyKey from signedjson, which uses # .version to denote the portion of the key ID after the # algorithm and colon, which is the device ID device_ids.append(verify_key.version) if self_signing_key and self_signing_key != current_keys.get("self_signing"): await self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key) device_ids.append(verify_key.version) return device_ids async def handle_room_un_partial_stated(self, room_id: str) -> None: """Handles sending appropriate device list updates in a room that has gone from partial to full state. """ pending_updates = ( await self.store.get_pending_remote_device_list_updates_for_room(room_id) ) for user_id, device_id in pending_updates: logger.info( "Got pending device list update in room %s: %s / %s", room_id, user_id, device_id, ) position = await self.store.add_device_change_to_streams( user_id, [device_id], room_ids=[room_id], ) if not position: # This should only happen if there are no updates, which # shouldn't happen when we've passed in a non-empty set of # device IDs. continue self.device_handler.notifier.on_new_event( StreamKeyType.DEVICE_LIST, position, rooms=[room_id] )