Add type hints to presence handler (#9885)

This commit is contained in:
Erik Johnston 2021-04-28 11:07:47 +01:00 committed by GitHub
parent fe604a022a
commit dd2d32dcdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 70 deletions

1
changelog.d/9885.misc Normal file
View File

@ -0,0 +1 @@
Add type hints to presence handler.

View File

@ -28,6 +28,7 @@ from bisect import bisect
from contextlib import contextmanager from contextlib import contextmanager
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Callable,
Collection, Collection,
Dict, Dict,
FrozenSet, FrozenSet,
@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC):
""" """
async def update_external_syncs_row( async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
): ) -> None:
"""Update the syncing users for an external process as a delta. """Update the syncing users for an external process as a delta.
This is a no-op when presence is handled by a different worker. This is a no-op when presence is handled by a different worker.
Args: Args:
process_id (str): An identifier for the process the users are process_id: An identifier for the process the users are
syncing against. This allows synapse to process updates syncing against. This allows synapse to process updates
as user start and stop syncing against a given process. as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing user_id: The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing is_syncing: Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing sync_time_msec: Time in ms when the user was last syncing
""" """
pass pass
async def update_external_syncs_clear(self, process_id): async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process """Marks all users that had been marked as syncing by a given process
as offline. as offline.
@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]):
class WorkerPresenceHandler(BasePresenceHandler): class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.hs = hs self.hs = hs
@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
# user_id -> last_sync_ms. Lists the users that have stopped syncing but # user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet # we haven't notified the presence writer of that yet
self.users_going_offline = {} self.users_going_offline = {} # type: Dict[str, int]
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs)
@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler):
self._on_shutdown, self._on_shutdown,
) )
def _on_shutdown(self): def _on_shutdown(self) -> None:
if self._presence_enabled: if self._presence_enabled:
self.hs.get_tcp_replication().send_command( self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id) ClearUserSyncsCommand(self.instance_id)
) )
def send_user_sync(self, user_id, is_syncing, last_sync_ms): def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled: if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync( self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms self.instance_id, user_id, is_syncing, last_sync_ms
) )
def mark_as_coming_online(self, user_id): def mark_as_coming_online(self, user_id: str) -> None:
"""A user has started syncing. Send a UserSync to the presence writer, """A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing. unless they had recently stopped syncing.
Args:
user_id (str)
""" """
going_offline = self.users_going_offline.pop(user_id, None) going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline: if not going_offline:
@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
# were offline # were offline
self.send_user_sync(user_id, True, self.clock.time_msec()) self.send_user_sync(user_id, True, self.clock.time_msec())
def mark_as_going_offline(self, user_id): def mark_as_going_offline(self, user_id: str) -> None:
"""A user has stopped syncing. We wait before notifying the presence """A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing sending a stopped syncing immediately followed by a started syncing
notification to the presence writer notification to the presence writer
Args:
user_id (str)
""" """
self.users_going_offline[user_id] = self.clock.time_msec() self.users_going_offline[user_id] = self.clock.time_msec()
def send_stop_syncing(self): def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and """Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them. haven't come back yet. If there are poke the presence writer about them.
""" """
@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
return _user_syncing() return _user_syncing()
async def notify_from_replication(self, states, stream_id): async def notify_from_replication(
self, states: List[UserPresenceState], stream_id: int
) -> None:
parties = await get_interested_parties(self.store, self.presence_router, states) parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties room_ids_to_states, users_to_states = parties
@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
if count > 0 if count > 0
] ]
async def set_state(self, target_user, state, ignore_status_msg=False): async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
) -> None:
"""Set the presence state of the user.""" """Set the presence state of the user."""
presence = state["presence"] presence = state["presence"]
@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
ignore_status_msg=ignore_status_msg, ignore_status_msg=ignore_status_msg,
) )
async def bump_presence_active_time(self, user): async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting """We've seen the user do something that indicates they're interacting
with the app. with the app.
""" """
@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler):
# we assume that all the sync requests on that process have stopped. # we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of # Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated. # process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]] self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[int, int] self.external_process_last_updated_ms = {} # type: Dict[str, int]
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler):
self._event_pos = self.store.get_current_events_token() self._event_pos = self.store.get_current_events_token()
self._event_processing = False self._event_processing = False
async def _on_shutdown(self): async def _on_shutdown(self) -> None:
"""Gets called when shutting down. This lets us persist any updates that """Gets called when shutting down. This lets us persist any updates that
we haven't yet persisted, e.g. updates that only changes some internal we haven't yet persisted, e.g. updates that only changes some internal
timers. This allows changes to persist across startup without having to timers. This allows changes to persist across startup without having to
@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler):
) )
logger.info("Finished _on_shutdown") logger.info("Finished _on_shutdown")
async def _persist_unpersisted_changes(self): async def _persist_unpersisted_changes(self) -> None:
"""We periodically persist the unpersisted changes, as otherwise they """We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times. may stack up and slow down shutdown times.
""" """
@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler):
states, destinations states, destinations
) )
async def _handle_timeouts(self): async def _handle_timeouts(self) -> None:
"""Checks the presence of users that have timed out and updates as """Checks the presence of users that have timed out and updates as
appropriate. appropriate.
""" """
@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler):
return await self._update_states(changes) return await self._update_states(changes)
async def bump_presence_active_time(self, user): async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting """We've seen the user do something that indicates they're interacting
with the app. with the app.
""" """
@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler):
return [] return []
async def update_external_syncs_row( async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
): ) -> None:
"""Update the syncing users for an external process as a delta. """Update the syncing users for an external process as a delta.
Args: Args:
process_id (str): An identifier for the process the users are process_id: An identifier for the process the users are
syncing against. This allows synapse to process updates syncing against. This allows synapse to process updates
as user start and stop syncing against a given process. as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing user_id: The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing is_syncing: Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing sync_time_msec: Time in ms when the user was last syncing
""" """
with (await self.external_sync_linearizer.queue(process_id)): with (await self.external_sync_linearizer.queue(process_id)):
prev_state = await self.current_state_for_user(user_id) prev_state = await self.current_state_for_user(user_id)
@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler):
self.external_process_last_updated_ms[process_id] = self.clock.time_msec() self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
async def update_external_syncs_clear(self, process_id): async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process """Marks all users that had been marked as syncing by a given process
as offline. as offline.
@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler):
) )
self.external_process_last_updated_ms.pop(process_id, None) self.external_process_last_updated_ms.pop(process_id, None)
async def current_state_for_user(self, user_id): async def current_state_for_user(self, user_id: str) -> UserPresenceState:
"""Get the current presence state for a user.""" """Get the current presence state for a user."""
res = await self.current_state_for_users([user_id]) res = await self.current_state_for_users([user_id])
return res[user_id] return res[user_id]
async def _persist_and_notify(self, states): async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
"""Persist states in the database, poke the notifier and send to """Persist states in the database, poke the notifier and send to
interested remote servers interested remote servers
""" """
@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler):
# stream (which is updated by `store.update_presence`). # stream (which is updated by `store.update_presence`).
await self.maybe_send_presence_to_interested_destinations(states) await self.maybe_send_presence_to_interested_destinations(states)
async def incoming_presence(self, origin, content): async def incoming_presence(self, origin: str, content: JsonDict) -> None:
"""Called when we receive a `m.presence` EDU from a remote server.""" """Called when we receive a `m.presence` EDU from a remote server."""
if not self._presence_enabled: if not self._presence_enabled:
return return
@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler):
federation_presence_counter.inc(len(updates)) federation_presence_counter.inc(len(updates))
await self._update_states(updates) await self._update_states(updates)
async def set_state(self, target_user, state, ignore_status_msg=False): async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
) -> None:
"""Set the presence state of the user.""" """Set the presence state of the user."""
status_msg = state.get("status_msg", None) status_msg = state.get("status_msg", None)
presence = state["presence"] presence = state["presence"]
@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler):
await self._update_states([prev_state.copy_and_replace(**new_fields)]) await self._update_states([prev_state.copy_and_replace(**new_fields)])
async def is_visible(self, observed_user, observer_user): async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
"""Returns whether a user can see another user's presence.""" """Returns whether a user can see another user's presence."""
observer_room_ids = await self.store.get_rooms_for_user( observer_room_ids = await self.store.get_rooms_for_user(
observer_user.to_string() observer_user.to_string()
@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler):
) )
return rows return rows
def notify_new_event(self): def notify_new_event(self) -> None:
"""Called when new events have happened. Handles users and servers """Called when new events have happened. Handles users and servers
joining rooms and require being sent presence. joining rooms and require being sent presence.
""" """
@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler):
run_as_background_process("presence.notify_new_event", _process_presence) run_as_background_process("presence.notify_new_event", _process_presence)
async def _unsafe_process(self): async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date # Loop round handling deltas until we're up to date
while True: while True:
with Measure(self.clock, "presence_delta"): with Measure(self.clock, "presence_delta"):
@ -1188,7 +1192,7 @@ class PresenceHandler(BasePresenceHandler):
max_pos max_pos
) )
async def _handle_state_delta(self, deltas): async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
"""Process current state deltas to find new joins that need to be """Process current state deltas to find new joins that need to be
handled. handled.
""" """
@ -1311,7 +1315,7 @@ class PresenceHandler(BasePresenceHandler):
return [remote_host], states return [remote_host], states
def should_notify(old_state, new_state): def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
"""Decides if a presence state change should be sent to interested parties.""" """Decides if a presence state change should be sent to interested parties."""
if old_state == new_state: if old_state == new_state:
return False return False
@ -1347,7 +1351,9 @@ def should_notify(old_state, new_state):
return False return False
def format_user_presence_state(state, now, include_user_id=True): def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
"""Convert UserPresenceState to a format that can be sent down to clients """Convert UserPresenceState to a format that can be sent down to clients
and to other servers. and to other servers.
@ -1385,11 +1391,11 @@ class PresenceEventSource:
@log_function @log_function
async def get_new_events( async def get_new_events(
self, self,
user, user: UserID,
from_key, from_key: Optional[int],
room_ids=None, room_ids: Optional[List[str]] = None,
include_offline=True, include_offline: bool = True,
explicit_room_id=None, explicit_room_id: Optional[str] = None,
**kwargs, **kwargs,
) -> Tuple[List[UserPresenceState], int]: ) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are: # The process for getting presence events are:
@ -1594,7 +1600,7 @@ class PresenceEventSource:
if update.state != PresenceState.OFFLINE if update.state != PresenceState.OFFLINE
] ]
def get_current_key(self): def get_current_key(self) -> int:
return self.store.get_current_presence_token() return self.store.get_current_presence_token()
@cached(num_args=2, cache_context=True) @cached(num_args=2, cache_context=True)
@ -1654,15 +1660,20 @@ class PresenceEventSource:
return users_interested_in return users_interested_in
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
syncing_user_ids: Set[str],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as """Checks the presence of users that have timed out and updates as
appropriate. appropriate.
Args: Args:
user_states(list): List of UserPresenceState's to check. user_states: List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours is_mine_fn: Function that returns if a user_id is ours
syncing_user_ids (set): Set of user_ids with active syncs. syncing_user_ids: Set of user_ids with active syncs.
now (int): Current time in ms. now: Current time in ms.
Returns: Returns:
List of UserPresenceState updates List of UserPresenceState updates
@ -1679,14 +1690,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
return list(changes.values()) return list(changes.values())
def handle_timeout(state, is_mine, syncing_user_ids, now): def handle_timeout(
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed """Checks the presence of the user to see if any of the timers have elapsed
Args: Args:
state (UserPresenceState) state
is_mine (bool): Whether the user is ours is_mine: Whether the user is ours
syncing_user_ids (set): Set of user_ids with active syncs. syncing_user_ids: Set of user_ids with active syncs.
now (int): Current time in ms. now: Current time in ms.
Returns: Returns:
A UserPresenceState update or None if no update. A UserPresenceState update or None if no update.
@ -1738,23 +1751,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
return state if changed else None return state if changed else None
def handle_update(prev_state, new_state, is_mine, wheel_timer, now): def handle_update(
prev_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
wheel_timer: WheelTimer,
now: int,
) -> Tuple[UserPresenceState, bool, bool]:
"""Given a presence update: """Given a presence update:
1. Add any appropriate timers. 1. Add any appropriate timers.
2. Check if we should notify anyone. 2. Check if we should notify anyone.
Args: Args:
prev_state (UserPresenceState) prev_state
new_state (UserPresenceState) new_state
is_mine (bool): Whether the user is ours is_mine: Whether the user is ours
wheel_timer (WheelTimer) wheel_timer
now (int): Time now in ms now: Time now in ms
Returns: Returns:
3-tuple: `(new_state, persist_and_notify, federation_ping)` where: 3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
- new_state: is the state to actually persist - new_state: is the state to actually persist
- persist_and_notify (bool): whether to persist and notify people - persist_and_notify: whether to persist and notify people
- federation_ping (bool): whether we should send a ping over federation - federation_ping: whether we should send a ping over federation
""" """
user_id = new_state.user_id user_id = new_state.user_id