From 1f5473465d4cb08239bcc97dbbbf185af6841863 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 16 Mar 2023 11:44:11 +0000 Subject: [PATCH] Refresh remote profiles that have been marked as stale, in order to fill the user directory. [rei:userdirpriv] (#14756) * Scaffolding for background process to refresh profiles * Add scaffolding for background process to refresh profiles for a given server * Implement the code to select servers to refresh from * Ensure we don't build up multiple looping calls * Make `get_profile` able to respect backoffs * Add logic for refreshing users * When backing off, schedule a refresh when the backoff is over * Wake up the background processes when we receive an interesting state event * Add tests * Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) * Add comment about 1<<62 --------- Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/14756.bugfix | 1 + synapse/handlers/profile.py | 4 +- synapse/handlers/user_directory.py | 242 ++++++++++++++++++ .../storage/databases/main/user_directory.py | 74 ++++++ tests/handlers/test_user_directory.py | 187 +++++++++++++- 5 files changed, 504 insertions(+), 4 deletions(-) create mode 100644 changelog.d/14756.bugfix diff --git a/changelog.d/14756.bugfix b/changelog.d/14756.bugfix new file mode 100644 index 000000000..12f979e9d --- /dev/null +++ b/changelog.d/14756.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change. \ No newline at end of file diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 4bf9a047a..9a81a77cb 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -63,7 +63,7 @@ class ProfileHandler: self._third_party_rules = hs.get_third_party_event_rules() - async def get_profile(self, user_id: str) -> JsonDict: + async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict: target_user = UserID.from_string(user_id) if self.hs.is_mine(target_user): @@ -81,7 +81,7 @@ class ProfileHandler: destination=target_user.domain, query_type="profile", args={"user_id": user_id}, - ignore_backoff=True, + ignore_backoff=ignore_backoff, ) return result except RequestSendFailed as e: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0815be79f..28a92d41d 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -13,15 +13,22 @@ # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +from twisted.internet.interfaces import IDelayedCall + import synapse.metrics from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership +from synapse.api.errors import Codes, SynapseError from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo +from synapse.types import UserID from synapse.util.metrics import Measure +from synapse.util.retryutils import NotRetryingDestination +from synapse.util.stringutils import non_null_str_or_none if TYPE_CHECKING: from synapse.server import HomeServer @@ -33,6 +40,25 @@ logger = logging.getLogger(__name__) # then be coalesced such that only one /profile request is made). USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000 +# Maximum number of remote servers that we will attempt to refresh profiles for +# in one go. +MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5 + +# As long as we have servers to refresh (without backoff), keep adding more +# every 15 seconds. +INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15 + + +def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int: + """ + Calculates the time of a next retry given `now_ts` in ms and the number + of failures encountered thus far. + + Currently the sequence goes: + 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week + """ + return now_ts + 60_000 * (5 ** min(retry_count, 7)) + class UserDirectoryHandler(StateDeltasHandler): """Handles queries and updates for the user_directory. @@ -69,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler): self.update_user_directory = hs.config.worker.should_update_user_directory self.search_all_users = hs.config.userdirectory.user_directory_search_all_users self.spam_checker = hs.get_spam_checker() + self._hs = hs + # The current position in the current_state_delta stream self.pos: Optional[int] = None # Guard to ensure we only process deltas one at a time self._is_processing = False + # Guard to ensure we only have one process for refreshing remote profiles + self._is_refreshing_remote_profiles = False + # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process` + self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None + + # Guard to ensure we only have one process for refreshing remote profiles + # for the given servers. + # Set of server names. + self._is_refreshing_remote_profiles_for_servers: Set[str] = set() + if self.update_user_directory: self.notifier.add_replication_callback(self.notify_new_event) @@ -82,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) + # Kick off the profile refresh process on startup + self._refresh_remote_profiles_call_later = self.clock.call_later( + 10, self.kick_off_remote_profile_refresh_process + ) + async def search_users( self, user_id: str, search_term: str, limit: int ) -> SearchResult: @@ -483,6 +526,20 @@ class UserDirectoryHandler(StateDeltasHandler): next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS, retry_counter=0, ) + # Schedule a wake-up to refresh the user directory for this server. + # We intentionally wake up this server directly because we don't want + # other servers ahead of it in the queue to get in the way of updating + # the profile if the server only just sent us an event. + self.clock.call_later( + USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + self.kick_off_remote_profile_refresh_process_for_remote_server, + UserID.from_string(user_id).domain, + ) + # Schedule a wake-up to handle any backoffs that may occur in the future. + self.clock.call_later( + 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + self.kick_off_remote_profile_refresh_process, + ) return prev_name = prev_event.content.get("displayname") @@ -505,3 +562,188 @@ class UserDirectoryHandler(StateDeltasHandler): # Only update if something has changed, or we didn't have a previous event # in the first place. await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + + def kick_off_remote_profile_refresh_process(self) -> None: + """Called when there may be remote users with stale profiles to be refreshed""" + if not self.update_user_directory: + return + + if self._is_refreshing_remote_profiles: + return + + if self._refresh_remote_profiles_call_later: + if self._refresh_remote_profiles_call_later.active(): + self._refresh_remote_profiles_call_later.cancel() + self._refresh_remote_profiles_call_later = None + + async def process() -> None: + try: + await self._unsafe_refresh_remote_profiles() + finally: + self._is_refreshing_remote_profiles = False + + self._is_refreshing_remote_profiles = True + run_as_background_process("user_directory.refresh_remote_profiles", process) + + async def _unsafe_refresh_remote_profiles(self) -> None: + limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len( + self._is_refreshing_remote_profiles_for_servers + ) + if limit <= 0: + # nothing to do: already refreshing the maximum number of servers + # at once. + # Come back later. + self._refresh_remote_profiles_call_later = self.clock.call_later( + INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, + self.kick_off_remote_profile_refresh_process, + ) + return + + servers_to_refresh = ( + await self.store.get_remote_servers_with_profiles_to_refresh( + now_ts=self.clock.time_msec(), limit=limit + ) + ) + + if not servers_to_refresh: + # Do we have any backing-off servers that we should try again + # for eventually? + # By setting `now` is a point in the far future, we can ask for + # which server/user is next to be refreshed, even though it is + # not actually refreshable *now*. + end_of_time = 1 << 62 + backing_off_servers = ( + await self.store.get_remote_servers_with_profiles_to_refresh( + now_ts=end_of_time, limit=1 + ) + ) + if backing_off_servers: + # Find out when the next user is refreshable and schedule a + # refresh then. + backing_off_server_name = backing_off_servers[0] + users = await self.store.get_remote_users_to_refresh_on_server( + backing_off_server_name, now_ts=end_of_time, limit=1 + ) + if not users: + return + _, _, next_try_at_ts = users[0] + self._refresh_remote_profiles_call_later = self.clock.call_later( + ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2, + self.kick_off_remote_profile_refresh_process, + ) + + return + + for server_to_refresh in servers_to_refresh: + self.kick_off_remote_profile_refresh_process_for_remote_server( + server_to_refresh + ) + + self._refresh_remote_profiles_call_later = self.clock.call_later( + INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, + self.kick_off_remote_profile_refresh_process, + ) + + def kick_off_remote_profile_refresh_process_for_remote_server( + self, server_name: str + ) -> None: + """Called when there may be remote users with stale profiles to be refreshed + on the given server.""" + if not self.update_user_directory: + return + + if server_name in self._is_refreshing_remote_profiles_for_servers: + return + + async def process() -> None: + try: + await self._unsafe_refresh_remote_profiles_for_remote_server( + server_name + ) + finally: + self._is_refreshing_remote_profiles_for_servers.remove(server_name) + + self._is_refreshing_remote_profiles_for_servers.add(server_name) + run_as_background_process( + "user_directory.refresh_remote_profiles_for_remote_server", process + ) + + async def _unsafe_refresh_remote_profiles_for_remote_server( + self, server_name: str + ) -> None: + logger.info("Refreshing profiles in user directory for %s", server_name) + + while True: + # Get a handful of users to process. + next_batch = await self.store.get_remote_users_to_refresh_on_server( + server_name, now_ts=self.clock.time_msec(), limit=10 + ) + if not next_batch: + # Finished for now + return + + for user_id, retry_counter, _ in next_batch: + # Request the profile of the user. + try: + profile = await self._hs.get_profile_handler().get_profile( + user_id, ignore_backoff=False + ) + except NotRetryingDestination as e: + logger.info( + "Failed to refresh profile for %r because the destination is undergoing backoff", + user_id, + ) + # As a special-case, we back off until the destination is no longer + # backed off from. + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + e.retry_last_ts + e.retry_interval, + retry_counter=retry_counter + 1, + ) + continue + except SynapseError as e: + if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND: + # The profile doesn't exist. + # TODO Does this mean we should clear it from our user + # directory? + await self.store.clear_remote_user_profile_in_user_dir_stale( + user_id + ) + logger.warning( + "Refresh of remote profile %r: not found (%r)", + user_id, + e.msg, + ) + continue + + logger.warning( + "Failed to refresh profile for %r because %r", user_id, e + ) + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + calculate_time_of_next_retry( + self.clock.time_msec(), retry_counter + 1 + ), + retry_counter=retry_counter + 1, + ) + continue + except Exception: + logger.error( + "Failed to refresh profile for %r due to unhandled exception", + user_id, + exc_info=True, + ) + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + calculate_time_of_next_retry( + self.clock.time_msec(), retry_counter + 1 + ), + retry_counter=retry_counter + 1, + ) + continue + + await self.store.update_profile_in_user_dir( + user_id, + display_name=non_null_str_or_none(profile.get("displayname")), + avatar_url=non_null_str_or_none(profile.get("avatar_url")), + ) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 9cf01b7f3..97f09b73d 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): desc="set_remote_user_profile_in_user_dir_stale", ) + async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None: + """ + Marks a remote user as no longer having a possibly-stale user directory profile. + + Args: + user_id: the remote user who no longer has a stale profile on this server. + """ + await self.db_pool.simple_delete( + table="user_directory_stale_remote_users", + keyvalues={"user_id": user_id}, + desc="clear_remote_user_profile_in_user_dir_stale", + ) + + async def get_remote_servers_with_profiles_to_refresh( + self, now_ts: int, limit: int + ) -> List[str]: + """ + Get a list of up to `limit` server names which have users whose + locally-cached profiles we believe to be stale + and are refreshable given the current time `now_ts` in milliseconds. + """ + + def _get_remote_servers_with_refreshable_profiles_txn( + txn: LoggingTransaction, + ) -> List[str]: + sql = """ + SELECT user_server_name + FROM user_directory_stale_remote_users + WHERE next_try_at_ts < ? + GROUP BY user_server_name + ORDER BY MIN(next_try_at_ts), user_server_name + LIMIT ? + """ + txn.execute(sql, (now_ts, limit)) + return [row[0] for row in txn] + + return await self.db_pool.runInteraction( + "get_remote_servers_with_profiles_to_refresh", + _get_remote_servers_with_refreshable_profiles_txn, + ) + + async def get_remote_users_to_refresh_on_server( + self, server_name: str, now_ts: int, limit: int + ) -> List[Tuple[str, int, int]]: + """ + Get a list of up to `limit` user IDs from the server `server_name` + whose locally-cached profiles we believe to be stale + and are refreshable given the current time `now_ts` in milliseconds. + + Returns: + tuple of: + - User ID + - Retry counter (number of failures so far) + - Time the retry is scheduled for, in milliseconds + """ + + def _get_remote_users_to_refresh_on_server_txn( + txn: LoggingTransaction, + ) -> List[Tuple[str, int, int]]: + sql = """ + SELECT user_id, retry_counter, next_try_at_ts + FROM user_directory_stale_remote_users + WHERE user_server_name = ? AND next_try_at_ts < ? + ORDER BY next_try_at_ts + LIMIT ? + """ + txn.execute(sql, (server_name, now_ts, limit)) + return cast(List[Tuple[str, int, int]], txn.fetchall()) + + return await self.db_pool.runInteraction( + "get_remote_users_to_refresh_on_server", + _get_remote_users_to_refresh_on_server_txn, + ) + async def update_profile_in_user_dir( self, user_id: str, display_name: Optional[str], avatar_url: Optional[str] ) -> None: diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index a02c1c622..da4d24082 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -19,17 +19,18 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.constants import UserTypes +from synapse.api.errors import SynapseError from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.appservice import ApplicationService from synapse.rest.client import login, register, room, user_directory from synapse.server import HomeServer from synapse.storage.roommember import ProfileInfo -from synapse.types import UserProfile, create_requester +from synapse.types import JsonDict, UserProfile, create_requester from synapse.util import Clock from tests import unittest from tests.storage.test_user_directory import GetUserDirectoryTables -from tests.test_utils import make_awaitable +from tests.test_utils import event_injection, make_awaitable from tests.test_utils.event_injection import inject_member_event from tests.unittest import override_config @@ -1103,3 +1104,185 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase): ) self.assertEqual(200, channel.code, channel.result) self.assertTrue(len(channel.json_body["results"]) == 0) + + +class UserDirectoryRemoteProfileTestCase(unittest.HomeserverTestCase): + servlets = [ + login.register_servlets, + synapse.rest.admin.register_servlets, + register.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Re-enables updating the user directory, as that functionality is needed below. + config["update_user_directory_from_worker"] = None + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.alice = self.register_user("alice", "alice123") + self.alice_tok = self.login("alice", "alice123") + self.user_dir_helper = GetUserDirectoryTables(self.store) + self.user_dir_handler = hs.get_user_directory_handler() + self.profile_handler = hs.get_profile_handler() + + # Cancel the startup call: in the steady-state case we can't rely on it anyway. + assert self.user_dir_handler._refresh_remote_profiles_call_later is not None + self.user_dir_handler._refresh_remote_profiles_call_later.cancel() + + def test_public_rooms_have_profiles_collected(self) -> None: + """ + In a public room, member state events are treated as reflecting the user's + real profile and they are accepted. + (The main motivation for accepting this is to prevent having to query + *every* single profile change over federation.) + """ + room_id = self.helper.create_room_as( + self.alice, is_public=True, tok=self.alice_tok + ) + self.get_success( + event_injection.inject_member_event( + self.hs, + room_id, + "@bruce:remote", + "join", + "@bruce:remote", + extra_content={ + "displayname": "Bruce!", + "avatar_url": "mxc://remote/123", + }, + ) + ) + # Sending this event makes the streams move forward after the injection... + self.helper.send(room_id, "Test", tok=self.alice_tok) + self.pump(0.1) + + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertEqual( + profiles.get("@bruce:remote"), + ProfileInfo(display_name="Bruce!", avatar_url="mxc://remote/123"), + ) + + def test_private_rooms_do_not_have_profiles_collected(self) -> None: + """ + In a private room, member state events are not pulled out and used to populate + the user directory. + """ + room_id = self.helper.create_room_as( + self.alice, is_public=False, tok=self.alice_tok + ) + self.get_success( + event_injection.inject_member_event( + self.hs, + room_id, + "@bruce:remote", + "join", + "@bruce:remote", + extra_content={ + "displayname": "super-duper bruce", + "avatar_url": "mxc://remote/456", + }, + ) + ) + # Sending this event makes the streams move forward after the injection... + self.helper.send(room_id, "Test", tok=self.alice_tok) + self.pump(0.1) + + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertNotIn("@bruce:remote", profiles) + + def test_private_rooms_have_profiles_requested(self) -> None: + """ + When a name changes in a private room, the homeserver instead requests + the user's global profile over federation. + """ + + async def get_remote_profile( + user_id: str, ignore_backoff: bool = True + ) -> JsonDict: + if user_id == "@bruce:remote": + return { + "displayname": "Sir Bruce Bruceson", + "avatar_url": "mxc://remote/789", + } + else: + raise ValueError(f"unable to fetch {user_id}") + + with patch.object(self.profile_handler, "get_profile", get_remote_profile): + # Continue from the earlier test... + self.test_private_rooms_do_not_have_profiles_collected() + + # Advance by a minute + self.reactor.advance(61.0) + + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertEqual( + profiles.get("@bruce:remote"), + ProfileInfo( + display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789" + ), + ) + + def test_profile_requests_are_retried(self) -> None: + """ + When we fail to fetch the user's profile over federation, + we try again later. + """ + has_failed_once = False + + async def get_remote_profile( + user_id: str, ignore_backoff: bool = True + ) -> JsonDict: + nonlocal has_failed_once + if user_id == "@bruce:remote": + if not has_failed_once: + has_failed_once = True + raise SynapseError(502, "temporary network problem") + + return { + "displayname": "Sir Bruce Bruceson", + "avatar_url": "mxc://remote/789", + } + else: + raise ValueError(f"unable to fetch {user_id}") + + with patch.object(self.profile_handler, "get_profile", get_remote_profile): + # Continue from the earlier test... + self.test_private_rooms_do_not_have_profiles_collected() + + # Advance by a minute + self.reactor.advance(61.0) + + # The request has already failed once + self.assertTrue(has_failed_once) + + # The profile has yet to be updated. + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertNotIn( + "@bruce:remote", + profiles, + ) + + # Advance by five minutes, after the backoff has finished + self.reactor.advance(301.0) + + # The profile should have been updated now + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertEqual( + profiles.get("@bruce:remote"), + ProfileInfo( + display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789" + ), + )