Add a Synapse Module for configuring presence update routing (#9491)

At the moment, if you'd like to share presence between local or remote users, those users must be sharing a room together. This isn't always the most convenient or useful situation though.

This PR adds a module to Synapse that will allow deployments to set up extra logic on where presence updates should be routed. The module must implement two methods, `get_users_for_states` and `get_interested_users`. These methods are given presence updates or user IDs and must return information that Synapse will use to grant passing presence updates around.

A method is additionally added to `ModuleApi` which allows triggering a set of users to receive the current, online presence information for all users they are considered interested in. This is the equivalent of that user receiving presence information during an initial sync. 

The goal of this module is to be fairly generic and useful for a variety of applications, with hard requirements being:

* Sending state for a specific set or all known users to a defined set of local and remote users.
* The ability to trigger an initial sync for specific users, so they receive all current state.
This commit is contained in:
Andrew Morgan 2021-04-06 14:38:30 +01:00 committed by GitHub
parent 44bb881096
commit 04819239ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1283 additions and 65 deletions

View file

@ -281,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler):
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
# The number of ongoing syncs on this process, by user id.
@ -395,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler):
return _user_syncing()
async def notify_from_replication(self, states, stream_id):
parties = await get_interested_parties(self.store, states)
parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(

View file

@ -27,6 +27,7 @@ import yaml
from netaddr import AddrFormatError, IPNetwork, IPSet
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.util.module_loader import load_module
from synapse.util.stringutils import parse_and_validate_server_name
from ._base import Config, ConfigError
@ -238,7 +239,20 @@ class ServerConfig(Config):
self.public_baseurl = config.get("public_baseurl")
# Whether to enable user presence.
self.use_presence = config.get("use_presence", True)
presence_config = config.get("presence") or {}
self.use_presence = presence_config.get("enabled")
if self.use_presence is None:
self.use_presence = config.get("use_presence", True)
# Custom presence router module
self.presence_router_module_class = None
self.presence_router_config = None
presence_router_config = presence_config.get("presence_router")
if presence_router_config:
(
self.presence_router_module_class,
self.presence_router_config,
) = load_module(presence_router_config, ("presence", "presence_router"))
# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
@ -834,9 +848,28 @@ class ServerConfig(Config):
#
#soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
# Presence tracking allows users to see the state (e.g online/offline)
# of other local and remote users.
#
#use_presence: false
presence:
# Uncomment to disable presence tracking on this homeserver. This option
# replaces the previous top-level 'use_presence' option.
#
#enabled: false
# Presence routers are third-party modules that can specify additional logic
# to where presence updates from users are routed.
#
presence_router:
# The custom module's class. Uncomment to use a custom presence router module.
#
#module: "my_custom_router.PresenceRouter"
# Configuration options of the custom module. Refer to your module's
# documentation for available options.
#
#config:
# example_option: 'something'
# Whether to require authentication to retrieve profile data (avatars,
# display names) of other users through the client API. Defaults to

View file

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
from synapse.api.presence import UserPresenceState
if TYPE_CHECKING:
from synapse.server import HomeServer
class PresenceRouter:
"""
A module that the homeserver will call upon to help route user presence updates to
additional destinations. If a custom presence router is configured, calls will be
passed to that instead.
"""
ALL_USERS = "ALL"
def __init__(self, hs: "HomeServer"):
self.custom_presence_router = None
# Check whether a custom presence router module has been configured
if hs.config.presence_router_module_class:
# Initialise the module
self.custom_presence_router = hs.config.presence_router_module_class(
config=hs.config.presence_router_config, module_api=hs.get_module_api()
)
# Ensure the module has implemented the required methods
required_methods = ["get_users_for_states", "get_interested_users"]
for method_name in required_methods:
if not hasattr(self.custom_presence_router, method_name):
raise Exception(
"PresenceRouter module '%s' must implement all required methods: %s"
% (
hs.config.presence_router_module_class.__name__,
", ".join(required_methods),
)
)
async def get_users_for_states(
self,
state_updates: Iterable[UserPresenceState],
) -> Dict[str, Set[UserPresenceState]]:
"""
Given an iterable of user presence updates, determine where each one
needs to go.
Args:
state_updates: An iterable of user presence state updates.
Returns:
A dictionary of user_id -> set of UserPresenceState, indicating which
presence updates each user should receive.
"""
if self.custom_presence_router is not None:
# Ask the custom module
return await self.custom_presence_router.get_users_for_states(
state_updates=state_updates
)
# Don't include any extra destinations for presence updates
return {}
async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]:
"""
Retrieve a list of users that `user_id` is interested in receiving the
presence of. This will be in addition to those they share a room with.
Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate
that this user should receive all incoming local and remote presence updates.
Note that this method will only be called for local users, but can return users
that are local or remote.
Args:
user_id: A user requesting presence updates.
Returns:
A set of user IDs to return presence updates for, or ALL_USERS to return all
known updates.
"""
if self.custom_presence_router is not None:
# Ask the custom module for interested users
return await self.custom_presence_router.get_interested_users(
user_id=user_id
)
# A custom presence router is not defined.
# Don't report any additional interested users
return set()

View file

@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
self._presence_router = None # type: Optional[PresenceRouter]
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
hosts_and_states = await get_interested_remotes(self.store, states, self.state)
# We pull the presence router here instead of __init__
# to prevent a dependency cycle:
#
# AuthHandler -> Notifier -> FederationSender
# -> PresenceRouter -> ModuleApi -> AuthHandler
if self._presence_router is None:
self._presence_router = self.hs.get_presence_router()
assert self._presence_router is not None
hosts_and_states = await get_interested_remotes(
self.store,
self._presence_router,
states,
self.state,
)
for destinations, states in hosts_and_states:
for destination in destinations:

View file

@ -25,7 +25,17 @@ The methods that define policy are:
import abc
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
from typing import (
TYPE_CHECKING,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
from prometheus_client import Counter
from typing_extensions import ContextManager
@ -34,6 +44,7 @@ import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
@ -42,7 +53,7 @@ from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@ -209,6 +220,7 @@ class PresenceHandler(BasePresenceHandler):
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
federation_registry = hs.get_federation_registry()
@ -653,7 +665,7 @@ class PresenceHandler(BasePresenceHandler):
"""
stream_id, max_token = await self.store.update_presence(states)
parties = await get_interested_parties(self.store, states)
parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
@ -1041,7 +1053,12 @@ class PresenceEventSource:
#
# Presence -> Notifier -> PresenceEventSource -> Presence
#
# Same with get_module_api, get_presence_router
#
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.get_presence_handler = hs.get_presence_handler
self.get_module_api = hs.get_module_api
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@ -1055,7 +1072,7 @@ class PresenceEventSource:
include_offline=True,
explicit_room_id=None,
**kwargs
):
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
@ -1068,7 +1085,17 @@ class PresenceEventSource:
# We don't try and limit the presence updates by the current token, as
# sending down the rare duplicate is not a concern.
user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache
with Measure(self.clock, "presence.get_new_events"):
if user_id in self.get_module_api()._send_full_presence_to_local_users:
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False
if from_key is not None:
from_key = int(from_key)
@ -1091,59 +1118,209 @@ class PresenceEventSource:
# doesn't return. C.f. #5503.
return [], max_token
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
# Figure out which other users this user should receive updates for
users_interested_in = await self._get_interested_in(user, explicit_room_id)
user_ids_changed = set() # type: Collection[str]
changed = None
if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
# We have a set of users that we're interested in the presence of. We want to
# cross-reference that with the users that have actually changed their presence.
if changed is not None and len(changed) < 500:
assert isinstance(user_ids_changed, set)
# Check whether this user should see all user updates
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
get_updates_counter.labels("stream").inc()
for other_user_id in changed:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.labels("full").inc()
if users_interested_in == PresenceRouter.ALL_USERS:
# Provide presence state for all users
presence_updates = await self._filter_all_presence_updates_for_user(
user_id, include_offline, from_key
)
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
users_interested_in, from_key
# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(
user_id
)
return presence_updates, max_token
# Make mypy happy. users_interested_in should now be a set
assert not isinstance(users_interested_in, str)
# The set of users that we're interested in and that have had a presence update.
# We'll actually pull the presence updates for these users at the end.
interested_and_updated_users = (
set()
) # type: Union[Set[str], FrozenSet[str]]
if from_key:
# First get all users that have had a presence update
updated_users = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates.
# Use a slightly-optimised method for processing smaller sets of updates.
if updated_users is not None and len(updated_users) < 500:
# For small deltas, it's quicker to get all changes and then
# cross-reference with the users we're interested in
get_updates_counter.labels("stream").inc()
for other_user_id in updated_users:
if other_user_id in users_interested_in:
# mypy thinks this variable could be a FrozenSet as it's possibly set
# to one in the `get_entities_changed` call below, and `add()` is not
# method on a FrozenSet. That doesn't affect us here though, as
# `interested_and_updated_users` is clearly a set() above.
interested_and_updated_users.add(other_user_id) # type: ignore
else:
user_ids_changed = users_interested_in
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.labels("full").inc()
updates = await presence.current_state_for_users(user_ids_changed)
interested_and_updated_users = (
stream_change_cache.get_entities_changed(
users_interested_in, from_key
)
)
else:
# No from_key has been specified. Return the presence for all users
# this user is interested in
interested_and_updated_users = users_interested_in
if include_offline:
return (list(updates.values()), max_token)
else:
return (
[s for s in updates.values() if s.state != PresenceState.OFFLINE],
max_token,
# Retrieve the current presence state for each user
users_to_state = await self.get_presence_handler().current_state_for_users(
interested_and_updated_users
)
presence_updates = list(users_to_state.values())
# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
if not include_offline:
# Filter out offline presence states
presence_updates = self._filter_offline_presence_state(presence_updates)
return presence_updates, max_token
async def _filter_all_presence_updates_for_user(
self,
user_id: str,
include_offline: bool,
from_key: Optional[int] = None,
) -> List[UserPresenceState]:
"""
Computes the presence updates a user should receive.
First pulls presence updates from the database. Then consults PresenceRouter
for whether any updates should be excluded by user ID.
Args:
user_id: The User ID of the user to compute presence updates for.
include_offline: Whether to include offline presence states from the results.
from_key: The minimum stream ID of updates to pull from the database
before filtering.
Returns:
A list of presence states for the given user to receive.
"""
if from_key:
# Only return updates since the last sync
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
from_key
)
if not updated_users:
updated_users = []
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
)
presence_updates = list(users_to_state.values())
if not include_offline:
# Filter out offline states
presence_updates = self._filter_offline_presence_state(presence_updates)
else:
users_to_state = await self.store.get_presence_for_all_users(
include_offline=include_offline
)
presence_updates = list(users_to_state.values())
# TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
# module for information on a number of users when we then only take the info
# for a single user
# Filter through the presence router
users_to_state_set = await self.get_presence_router().get_users_for_states(
presence_updates
)
# We only want the mapping for the syncing user
presence_updates = list(users_to_state_set[user_id])
# Return presence information for all users
return presence_updates
def _filter_offline_presence_state(
self, presence_updates: Iterable[UserPresenceState]
) -> List[UserPresenceState]:
"""Given an iterable containing user presence updates, return a list with any offline
presence states removed.
Args:
presence_updates: Presence states to filter
Returns:
A new list with any offline presence states removed.
"""
return [
update
for update in presence_updates
if update.state != PresenceState.OFFLINE
]
def get_current_key(self):
return self.store.get_current_presence_token()
@cached(num_args=2, cache_context=True)
async def _get_interested_in(self, user, explicit_room_id, cache_context):
async def _get_interested_in(
self,
user: UserID,
explicit_room_id: Optional[str] = None,
cache_context: Optional[_CacheContext] = None,
) -> Union[Set[str], str]:
"""Returns the set of users that the given user should see presence
updates for
updates for.
Args:
user: The user to retrieve presence updates for.
explicit_room_id: The users that are in the room will be returned.
Returns:
A set of user IDs to return presence updates for, or "ALL" to return all
known updates.
"""
user_id = user.to_string()
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
# cache_context isn't likely to ever be None due to the @cached decorator,
# but we can't have a non-optional argument after the optional argument
# explicit_room_id either. Assert cache_context is not None so we can use it
# without mypy complaining.
assert cache_context
# Check with the presence router whether we should poll additional users for
# their presence information
additional_users = await self.get_presence_router().get_interested_users(
user.to_string()
)
if additional_users == PresenceRouter.ALL_USERS:
# If the module requested that this user see the presence updates of *all*
# users, then simply return that instead of calculating what rooms this
# user shares
return PresenceRouter.ALL_USERS
# Add the additional users from the router
users_interested_in.update(additional_users)
# Find the users who share a room with this user
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate
)
@ -1314,14 +1491,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
async def get_interested_parties(
store: DataStore, states: List[UserPresenceState]
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
"""Given a list of states return which entities (rooms, users)
are interested in the given states.
Args:
store
states
store: The homeserver's data store.
presence_router: A module for augmenting the destinations for presence updates.
states: A list of incoming user presence updates.
Returns:
A 2-tuple of `(room_ids_to_states, users_to_states)`,
@ -1337,11 +1515,22 @@ async def get_interested_parties(
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
# Ask a presence routing module for any additional parties if one
# is loaded.
router_users_to_states = await presence_router.get_users_for_states(states)
# Update the dictionaries with additional destinations and state to send
for user_id, user_states in router_users_to_states.items():
users_to_states.setdefault(user_id, []).extend(user_states)
return room_ids_to_states, users_to_states
async def get_interested_remotes(
store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
store: DataStore,
presence_router: PresenceRouter,
states: List[UserPresenceState],
state_handler: StateHandler,
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
"""Given a list of presence states figure out which remote servers
should be sent which.
@ -1349,9 +1538,10 @@ async def get_interested_remotes(
All the presence states should be for local users only.
Args:
store
states
state_handler
store: The homeserver's data store.
presence_router: A module for augmenting the destinations for presence updates.
states: A list of incoming user presence updates.
state_handler:
Returns:
A list of 2-tuples of destinations and states, where for
@ -1363,7 +1553,9 @@ async def get_interested_remotes(
# First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote
# hosts in those rooms.
room_ids_to_states, users_to_states = await get_interested_parties(store, states)
room_ids_to_states, users_to_states = await get_interested_parties(
store, presence_router, states
)
for room_id, states in room_ids_to_states.items():
hosts = await state_handler.get_current_hosts_in_room(room_id)

View file

@ -50,11 +50,20 @@ class ModuleApi:
self._auth = hs.get_auth()
self._auth_handler = auth_handler
self._server_name = hs.hostname
self._presence_stream = hs.get_event_sources().sources["presence"]
# We expose these as properties below in order to attach a helpful docstring.
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)
# The next time these users sync, they will receive the current presence
# state of all local users. Users are added by send_local_online_presence_to,
# and removed after a successful sync.
#
# We make this a private variable to deter modules from accessing it directly,
# though other classes in Synapse will still do so.
self._send_full_presence_to_local_users = set()
@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
@ -385,6 +394,47 @@ class ModuleApi:
return event
async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
"""
Forces the equivalent of a presence initial_sync for a set of local or remote
users. The users will receive presence for all currently online users that they
are considered interested in.
Updates to remote users will be sent immediately, whereas local users will receive
them on their next sync attempt.
Note that this method can only be run on the main or federation_sender worker
processes.
"""
if not self._hs.should_send_federation():
raise Exception(
"send_local_online_presence_to can only be run "
"on processes that send federation",
)
for user in users:
if self._hs.is_mine_id(user):
# Modify SyncHandler._generate_sync_entry_for_presence to call
# presence_source.get_new_events with an empty `from_key` if
# that user's ID were in a list modified by ModuleApi somewhere.
# That user would then get all presence state on next incremental sync.
# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
else:
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)
# Send to remote destinations
await make_deferred_yieldable(
# We pull the federation sender here as we can only do so on workers
# that support sending presence
self._hs.get_federation_sender().send_presence(presence_events)
)
class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room

View file

@ -51,6 +51,7 @@ from synapse.crypto import context_factory
from synapse.crypto.context_factory import RegularPolicyForHTTPS
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.events.presence_router import PresenceRouter
from synapse.events.spamcheck import SpamChecker
from synapse.events.third_party_rules import ThirdPartyEventRules
from synapse.events.utils import EventClientSerializer
@ -425,6 +426,10 @@ class HomeServer(metaclass=abc.ABCMeta):
else:
raise Exception("Workers cannot write typing")
@cache_in_self
def get_presence_router(self) -> PresenceRouter:
return PresenceRouter(self)
@cache_in_self
def get_typing_handler(self) -> FollowerTypingHandler:
if self.config.worker.writers.typing == self.get_instance_name():