mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-23 03:11:02 -05:00
Stop the master relaying USER_SYNC for other workers (#7318)
Long story short: if we're handling presence on the current worker, we shouldn't be sending USER_SYNC commands over replication. In an attempt to figure out what is going on here, I ended up refactoring some bits of the presencehandler code, so the first 4 commits here are non-functional refactors to move this code slightly closer to sanity. (There's still plenty to do here :/). Suggest reviewing individual commits. Fixes (I hope) #7257.
This commit is contained in:
parent
841c581c40
commit
71a1abb8a1
1
changelog.d/7318.misc
Normal file
1
changelog.d/7318.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Move catchup of replication streams logic to worker.
|
@ -196,7 +196,7 @@ Asks the server for the current position of all streams.
|
|||||||
|
|
||||||
#### USER_SYNC (C)
|
#### USER_SYNC (C)
|
||||||
|
|
||||||
A user has started or stopped syncing
|
A user has started or stopped syncing on this process.
|
||||||
|
|
||||||
#### CLEAR_USER_SYNC (C)
|
#### CLEAR_USER_SYNC (C)
|
||||||
|
|
||||||
@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
|
|||||||
|
|
||||||
Inform the server a cache should be invalidated
|
Inform the server a cache should be invalidated
|
||||||
|
|
||||||
#### SYNC (S, C)
|
|
||||||
|
|
||||||
Used exclusively in tests
|
|
||||||
|
|
||||||
### REMOTE_SERVER_UP (S, C)
|
### REMOTE_SERVER_UP (S, C)
|
||||||
|
|
||||||
Inform other processes that a remote server may have come back online.
|
Inform other processes that a remote server may have come back online.
|
||||||
|
@ -97,6 +97,8 @@ class EventTypes(object):
|
|||||||
|
|
||||||
Retention = "m.room.retention"
|
Retention = "m.room.retention"
|
||||||
|
|
||||||
|
Presence = "m.presence"
|
||||||
|
|
||||||
|
|
||||||
class RejectedReason(object):
|
class RejectedReason(object):
|
||||||
AUTH_ERROR = "auth_error"
|
AUTH_ERROR = "auth_error"
|
||||||
|
@ -17,6 +17,9 @@
|
|||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
from typing import Dict, Iterable
|
||||||
|
|
||||||
|
from typing_extensions import ContextManager
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.web.resource import NoResource
|
from twisted.web.resource import NoResource
|
||||||
@ -38,14 +41,14 @@ from synapse.config.homeserver import HomeServerConfig
|
|||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
from synapse.federation import send_queue
|
from synapse.federation import send_queue
|
||||||
from synapse.federation.transport.server import TransportLayerServer
|
from synapse.federation.transport.server import TransportLayerServer
|
||||||
from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
@ -225,23 +228,32 @@ class KeyUploadServlet(RestServlet):
|
|||||||
return 200, {"one_time_key_counts": result}
|
return 200, {"one_time_key_counts": result}
|
||||||
|
|
||||||
|
|
||||||
|
class _NullContextManager(ContextManager[None]):
|
||||||
|
"""A context manager which does nothing."""
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||||
|
|
||||||
|
|
||||||
class GenericWorkerPresence(object):
|
class GenericWorkerPresence(BasePresenceHandler):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
super().__init__(hs)
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
self.http_client = hs.get_simple_http_client()
|
self.http_client = hs.get_simple_http_client()
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.user_to_num_current_syncs = {}
|
self._presence_enabled = hs.config.use_presence
|
||||||
self.clock = hs.get_clock()
|
|
||||||
|
# The number of ongoing syncs on this process, by user id.
|
||||||
|
# Empty if _presence_enabled is false.
|
||||||
|
self._user_to_num_current_syncs = {} # type: Dict[str, int]
|
||||||
|
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.instance_id = hs.get_instance_id()
|
self.instance_id = hs.get_instance_id()
|
||||||
|
|
||||||
active_presence = self.store.take_presence_startup_info()
|
|
||||||
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
|
||||||
|
|
||||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
||||||
# but we haven't notified the master of that yet
|
# but we haven't notified the master of that yet
|
||||||
self.users_going_offline = {}
|
self.users_going_offline = {}
|
||||||
@ -259,13 +271,13 @@ class GenericWorkerPresence(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _on_shutdown(self):
|
def _on_shutdown(self):
|
||||||
if self.hs.config.use_presence:
|
if self._presence_enabled:
|
||||||
self.hs.get_tcp_replication().send_command(
|
self.hs.get_tcp_replication().send_command(
|
||||||
ClearUserSyncsCommand(self.instance_id)
|
ClearUserSyncsCommand(self.instance_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||||
if self.hs.config.use_presence:
|
if self._presence_enabled:
|
||||||
self.hs.get_tcp_replication().send_user_sync(
|
self.hs.get_tcp_replication().send_user_sync(
|
||||||
self.instance_id, user_id, is_syncing, last_sync_ms
|
self.instance_id, user_id, is_syncing, last_sync_ms
|
||||||
)
|
)
|
||||||
@ -307,28 +319,33 @@ class GenericWorkerPresence(object):
|
|||||||
# TODO Hows this supposed to work?
|
# TODO Hows this supposed to work?
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
get_states = __func__(PresenceHandler.get_states)
|
async def user_syncing(
|
||||||
get_state = __func__(PresenceHandler.get_state)
|
self, user_id: str, affect_presence: bool
|
||||||
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
|
) -> ContextManager[None]:
|
||||||
|
"""Record that a user is syncing.
|
||||||
|
|
||||||
def user_syncing(self, user_id, affect_presence):
|
Called by the sync and events servlets to record that a user has connected to
|
||||||
if affect_presence:
|
this worker and is waiting for some events.
|
||||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
"""
|
||||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
if not affect_presence or not self._presence_enabled:
|
||||||
|
return _NullContextManager()
|
||||||
|
|
||||||
|
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
|
||||||
|
self._user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||||
|
|
||||||
# If we went from no in flight sync to some, notify replication
|
# If we went from no in flight sync to some, notify replication
|
||||||
if self.user_to_num_current_syncs[user_id] == 1:
|
if self._user_to_num_current_syncs[user_id] == 1:
|
||||||
self.mark_as_coming_online(user_id)
|
self.mark_as_coming_online(user_id)
|
||||||
|
|
||||||
def _end():
|
def _end():
|
||||||
# We check that the user_id is in user_to_num_current_syncs because
|
# We check that the user_id is in user_to_num_current_syncs because
|
||||||
# user_to_num_current_syncs may have been cleared if we are
|
# user_to_num_current_syncs may have been cleared if we are
|
||||||
# shutting down.
|
# shutting down.
|
||||||
if affect_presence and user_id in self.user_to_num_current_syncs:
|
if user_id in self._user_to_num_current_syncs:
|
||||||
self.user_to_num_current_syncs[user_id] -= 1
|
self._user_to_num_current_syncs[user_id] -= 1
|
||||||
|
|
||||||
# If we went from one in flight sync to non, notify replication
|
# If we went from one in flight sync to non, notify replication
|
||||||
if self.user_to_num_current_syncs[user_id] == 0:
|
if self._user_to_num_current_syncs[user_id] == 0:
|
||||||
self.mark_as_going_offline(user_id)
|
self.mark_as_going_offline(user_id)
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
@ -338,7 +355,7 @@ class GenericWorkerPresence(object):
|
|||||||
finally:
|
finally:
|
||||||
_end()
|
_end()
|
||||||
|
|
||||||
return defer.succeed(_user_syncing())
|
return _user_syncing()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_from_replication(self, states, stream_id):
|
def notify_from_replication(self, states, stream_id):
|
||||||
@ -373,15 +390,12 @@ class GenericWorkerPresence(object):
|
|||||||
stream_id = token
|
stream_id = token
|
||||||
yield self.notify_from_replication(states, stream_id)
|
yield self.notify_from_replication(states, stream_id)
|
||||||
|
|
||||||
def get_currently_syncing_users(self):
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||||
if self.hs.config.use_presence:
|
|
||||||
return [
|
return [
|
||||||
user_id
|
user_id
|
||||||
for user_id, count in self.user_to_num_current_syncs.items()
|
for user_id, count in self._user_to_num_current_syncs.items()
|
||||||
if count > 0
|
if count > 0
|
||||||
]
|
]
|
||||||
else:
|
|
||||||
return set()
|
|
||||||
|
|
||||||
|
|
||||||
class GenericWorkerTyping(object):
|
class GenericWorkerTyping(object):
|
||||||
@ -625,8 +639,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
|||||||
|
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.typing_handler = hs.get_typing_handler()
|
self.typing_handler = hs.get_typing_handler()
|
||||||
# NB this is a SynchrotronPresence, not a normal PresenceHandler
|
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
|
||||||
self.presence_handler = hs.get_presence_handler()
|
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
|
|
||||||
self.notify_pushers = hs.config.start_pushers
|
self.notify_pushers = hs.config.start_pushers
|
||||||
|
@ -19,6 +19,7 @@ import random
|
|||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.errors import AuthError, SynapseError
|
from synapse.api.errors import AuthError, SynapseError
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
from synapse.handlers.presence import format_user_presence_state
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
from synapse.types import UserID
|
from synapse.types import UserID
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
@ -97,6 +98,8 @@ class EventStreamHandler(BaseHandler):
|
|||||||
explicit_room_id=room_id,
|
explicit_room_id=room_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
time_now = self.clock.time_msec()
|
||||||
|
|
||||||
# When the user joins a new room, or another user joins a currently
|
# When the user joins a new room, or another user joins a currently
|
||||||
# joined room, we need to send down presence for those users.
|
# joined room, we need to send down presence for those users.
|
||||||
to_add = []
|
to_add = []
|
||||||
@ -112,19 +115,20 @@ class EventStreamHandler(BaseHandler):
|
|||||||
users = await self.state.get_current_users_in_room(
|
users = await self.state.get_current_users_in_room(
|
||||||
event.room_id
|
event.room_id
|
||||||
)
|
)
|
||||||
states = await presence_handler.get_states(users, as_event=True)
|
|
||||||
to_add.extend(states)
|
|
||||||
else:
|
else:
|
||||||
|
users = [event.state_key]
|
||||||
|
|
||||||
ev = await presence_handler.get_state(
|
states = await presence_handler.get_states(users)
|
||||||
UserID.from_string(event.state_key), as_event=True
|
to_add.extend(
|
||||||
|
{
|
||||||
|
"type": EventTypes.Presence,
|
||||||
|
"content": format_user_presence_state(state, time_now),
|
||||||
|
}
|
||||||
|
for state in states
|
||||||
)
|
)
|
||||||
to_add.append(ev)
|
|
||||||
|
|
||||||
events.extend(to_add)
|
events.extend(to_add)
|
||||||
|
|
||||||
time_now = self.clock.time_msec()
|
|
||||||
|
|
||||||
chunks = await self._event_serializer.serialize_events(
|
chunks = await self._event_serializer.serialize_events(
|
||||||
events,
|
events,
|
||||||
time_now,
|
time_now,
|
||||||
|
@ -381,10 +381,16 @@ class InitialSyncHandler(BaseHandler):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
states = await presence_handler.get_states(
|
states = await presence_handler.get_states(
|
||||||
[m.user_id for m in room_members], as_event=True
|
[m.user_id for m in room_members]
|
||||||
)
|
)
|
||||||
|
|
||||||
return states
|
return [
|
||||||
|
{
|
||||||
|
"type": EventTypes.Presence,
|
||||||
|
"content": format_user_presence_state(s, time_now),
|
||||||
|
}
|
||||||
|
for s in states
|
||||||
|
]
|
||||||
|
|
||||||
async def get_receipts():
|
async def get_receipts():
|
||||||
receipts = await self.store.get_linearized_receipts_for_room(
|
receipts = await self.store.get_linearized_receipts_for_room(
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -21,10 +22,10 @@ The methods that define policy are:
|
|||||||
- PresenceHandler._handle_timeouts
|
- PresenceHandler._handle_timeouts
|
||||||
- should_notify
|
- should_notify
|
||||||
"""
|
"""
|
||||||
|
import abc
|
||||||
import logging
|
import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from typing import Dict, List, Set
|
from typing import Dict, Iterable, List, Set
|
||||||
|
|
||||||
from six import iteritems, itervalues
|
from six import iteritems, itervalues
|
||||||
|
|
||||||
@ -41,7 +42,7 @@ from synapse.logging.utils import log_function
|
|||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.presence import UserPresenceState
|
from synapse.storage.presence import UserPresenceState
|
||||||
from synapse.types import UserID, get_domain_from_id
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
@ -99,13 +100,106 @@ EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
|
|||||||
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
|
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
|
||||||
|
|
||||||
|
|
||||||
class PresenceHandler(object):
|
class BasePresenceHandler(abc.ABC):
|
||||||
|
"""Parts of the PresenceHandler that are shared between workers and master"""
|
||||||
|
|
||||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
active_presence = self.store.take_presence_startup_info()
|
||||||
|
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
async def user_syncing(
|
||||||
|
self, user_id: str, affect_presence: bool
|
||||||
|
) -> ContextManager[None]:
|
||||||
|
"""Returns a context manager that should surround any stream requests
|
||||||
|
from the user.
|
||||||
|
|
||||||
|
This allows us to keep track of who is currently streaming and who isn't
|
||||||
|
without having to have timers outside of this module to avoid flickering
|
||||||
|
when users disconnect/reconnect.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: the user that is starting a sync
|
||||||
|
affect_presence: If false this function will be a no-op.
|
||||||
|
Useful for streams that are not associated with an actual
|
||||||
|
client that is being used by a user.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||||
|
"""Get an iterable of syncing users on this worker, to send to the presence handler
|
||||||
|
|
||||||
|
This is called when a replication connection is established. It should return
|
||||||
|
a list of user ids, which are then sent as USER_SYNC commands to inform the
|
||||||
|
process handling presence about those users.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An iterable of user_id strings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def get_state(self, target_user: UserID) -> UserPresenceState:
|
||||||
|
results = await self.get_states([target_user.to_string()])
|
||||||
|
return results[0]
|
||||||
|
|
||||||
|
async def get_states(
|
||||||
|
self, target_user_ids: Iterable[str]
|
||||||
|
) -> List[UserPresenceState]:
|
||||||
|
"""Get the presence state for users."""
|
||||||
|
|
||||||
|
updates_d = await self.current_state_for_users(target_user_ids)
|
||||||
|
updates = list(updates_d.values())
|
||||||
|
|
||||||
|
for user_id in set(target_user_ids) - {u.user_id for u in updates}:
|
||||||
|
updates.append(UserPresenceState.default(user_id))
|
||||||
|
|
||||||
|
return updates
|
||||||
|
|
||||||
|
async def current_state_for_users(
|
||||||
|
self, user_ids: Iterable[str]
|
||||||
|
) -> Dict[str, UserPresenceState]:
|
||||||
|
"""Get the current presence state for multiple users.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: `user_id` -> `UserPresenceState`
|
||||||
|
"""
|
||||||
|
states = {
|
||||||
|
user_id: self.user_to_current_state.get(user_id, None)
|
||||||
|
for user_id in user_ids
|
||||||
|
}
|
||||||
|
|
||||||
|
missing = [user_id for user_id, state in iteritems(states) if not state]
|
||||||
|
if missing:
|
||||||
|
# There are things not in our in memory cache. Lets pull them out of
|
||||||
|
# the database.
|
||||||
|
res = await self.store.get_presence_for_users(missing)
|
||||||
|
states.update(res)
|
||||||
|
|
||||||
|
missing = [user_id for user_id, state in iteritems(states) if not state]
|
||||||
|
if missing:
|
||||||
|
new = {
|
||||||
|
user_id: UserPresenceState.default(user_id) for user_id in missing
|
||||||
|
}
|
||||||
|
states.update(new)
|
||||||
|
self.user_to_current_state.update(new)
|
||||||
|
|
||||||
|
return states
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
async def set_state(
|
||||||
|
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
|
||||||
|
) -> None:
|
||||||
|
"""Set the presence state of the user. """
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceHandler(BasePresenceHandler):
|
||||||
|
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||||
|
super().__init__(hs)
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.clock = hs.get_clock()
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.wheel_timer = WheelTimer()
|
self.wheel_timer = WheelTimer()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.federation = hs.get_federation_sender()
|
self.federation = hs.get_federation_sender()
|
||||||
@ -115,13 +209,6 @@ class PresenceHandler(object):
|
|||||||
|
|
||||||
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
|
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
|
||||||
|
|
||||||
active_presence = self.store.take_presence_startup_info()
|
|
||||||
|
|
||||||
# A dictionary of the current state of users. This is prefilled with
|
|
||||||
# non-offline presence from the DB. We should fetch from the DB if
|
|
||||||
# we can't find a users presence in here.
|
|
||||||
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
|
||||||
|
|
||||||
LaterGauge(
|
LaterGauge(
|
||||||
"synapse_handlers_presence_user_to_current_state_size",
|
"synapse_handlers_presence_user_to_current_state_size",
|
||||||
"",
|
"",
|
||||||
@ -130,7 +217,7 @@ class PresenceHandler(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
for state in active_presence:
|
for state in self.user_to_current_state.values():
|
||||||
self.wheel_timer.insert(
|
self.wheel_timer.insert(
|
||||||
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
|
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
|
||||||
)
|
)
|
||||||
@ -361,10 +448,18 @@ class PresenceHandler(object):
|
|||||||
|
|
||||||
timers_fired_counter.inc(len(states))
|
timers_fired_counter.inc(len(states))
|
||||||
|
|
||||||
|
syncing_user_ids = {
|
||||||
|
user_id
|
||||||
|
for user_id, count in self.user_to_num_current_syncs.items()
|
||||||
|
if count
|
||||||
|
}
|
||||||
|
for user_ids in self.external_process_to_current_syncs.values():
|
||||||
|
syncing_user_ids.update(user_ids)
|
||||||
|
|
||||||
changes = handle_timeouts(
|
changes = handle_timeouts(
|
||||||
states,
|
states,
|
||||||
is_mine_fn=self.is_mine_id,
|
is_mine_fn=self.is_mine_id,
|
||||||
syncing_user_ids=self.get_currently_syncing_users(),
|
syncing_user_ids=syncing_user_ids,
|
||||||
now=now,
|
now=now,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -462,22 +557,9 @@ class PresenceHandler(object):
|
|||||||
|
|
||||||
return _user_syncing()
|
return _user_syncing()
|
||||||
|
|
||||||
def get_currently_syncing_users(self):
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||||
"""Get the set of user ids that are currently syncing on this HS.
|
# since we are the process handling presence, there is nothing to do here.
|
||||||
Returns:
|
return []
|
||||||
set(str): A set of user_id strings.
|
|
||||||
"""
|
|
||||||
if self.hs.config.use_presence:
|
|
||||||
syncing_user_ids = {
|
|
||||||
user_id
|
|
||||||
for user_id, count in self.user_to_num_current_syncs.items()
|
|
||||||
if count
|
|
||||||
}
|
|
||||||
for user_ids in self.external_process_to_current_syncs.values():
|
|
||||||
syncing_user_ids.update(user_ids)
|
|
||||||
return syncing_user_ids
|
|
||||||
else:
|
|
||||||
return set()
|
|
||||||
|
|
||||||
async def update_external_syncs_row(
|
async def update_external_syncs_row(
|
||||||
self, process_id, user_id, is_syncing, sync_time_msec
|
self, process_id, user_id, is_syncing, sync_time_msec
|
||||||
@ -554,34 +636,6 @@ class PresenceHandler(object):
|
|||||||
res = await self.current_state_for_users([user_id])
|
res = await self.current_state_for_users([user_id])
|
||||||
return res[user_id]
|
return res[user_id]
|
||||||
|
|
||||||
async def current_state_for_users(self, user_ids):
|
|
||||||
"""Get the current presence state for multiple users.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: `user_id` -> `UserPresenceState`
|
|
||||||
"""
|
|
||||||
states = {
|
|
||||||
user_id: self.user_to_current_state.get(user_id, None)
|
|
||||||
for user_id in user_ids
|
|
||||||
}
|
|
||||||
|
|
||||||
missing = [user_id for user_id, state in iteritems(states) if not state]
|
|
||||||
if missing:
|
|
||||||
# There are things not in our in memory cache. Lets pull them out of
|
|
||||||
# the database.
|
|
||||||
res = await self.store.get_presence_for_users(missing)
|
|
||||||
states.update(res)
|
|
||||||
|
|
||||||
missing = [user_id for user_id, state in iteritems(states) if not state]
|
|
||||||
if missing:
|
|
||||||
new = {
|
|
||||||
user_id: UserPresenceState.default(user_id) for user_id in missing
|
|
||||||
}
|
|
||||||
states.update(new)
|
|
||||||
self.user_to_current_state.update(new)
|
|
||||||
|
|
||||||
return states
|
|
||||||
|
|
||||||
async def _persist_and_notify(self, states):
|
async def _persist_and_notify(self, states):
|
||||||
"""Persist states in the database, poke the notifier and send to
|
"""Persist states in the database, poke the notifier and send to
|
||||||
interested remote servers
|
interested remote servers
|
||||||
@ -669,40 +723,6 @@ class PresenceHandler(object):
|
|||||||
federation_presence_counter.inc(len(updates))
|
federation_presence_counter.inc(len(updates))
|
||||||
await self._update_states(updates)
|
await self._update_states(updates)
|
||||||
|
|
||||||
async def get_state(self, target_user, as_event=False):
|
|
||||||
results = await self.get_states([target_user.to_string()], as_event=as_event)
|
|
||||||
|
|
||||||
return results[0]
|
|
||||||
|
|
||||||
async def get_states(self, target_user_ids, as_event=False):
|
|
||||||
"""Get the presence state for users.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
target_user_ids (list)
|
|
||||||
as_event (bool): Whether to format it as a client event or not.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list
|
|
||||||
"""
|
|
||||||
|
|
||||||
updates = await self.current_state_for_users(target_user_ids)
|
|
||||||
updates = list(updates.values())
|
|
||||||
|
|
||||||
for user_id in set(target_user_ids) - {u.user_id for u in updates}:
|
|
||||||
updates.append(UserPresenceState.default(user_id))
|
|
||||||
|
|
||||||
now = self.clock.time_msec()
|
|
||||||
if as_event:
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"type": "m.presence",
|
|
||||||
"content": format_user_presence_state(state, now),
|
|
||||||
}
|
|
||||||
for state in updates
|
|
||||||
]
|
|
||||||
else:
|
|
||||||
return updates
|
|
||||||
|
|
||||||
async def set_state(self, target_user, state, ignore_status_msg=False):
|
async def set_state(self, target_user, state, ignore_status_msg=False):
|
||||||
"""Set the presence state of the user.
|
"""Set the presence state of the user.
|
||||||
"""
|
"""
|
||||||
@ -889,7 +909,7 @@ class PresenceHandler(object):
|
|||||||
user_ids = await self.state.get_current_users_in_room(room_id)
|
user_ids = await self.state.get_current_users_in_room(room_id)
|
||||||
user_ids = list(filter(self.is_mine_id, user_ids))
|
user_ids = list(filter(self.is_mine_id, user_ids))
|
||||||
|
|
||||||
states = await self.current_state_for_users(user_ids)
|
states_d = await self.current_state_for_users(user_ids)
|
||||||
|
|
||||||
# Filter out old presence, i.e. offline presence states where
|
# Filter out old presence, i.e. offline presence states where
|
||||||
# the user hasn't been active for a week. We can change this
|
# the user hasn't been active for a week. We can change this
|
||||||
@ -899,7 +919,7 @@ class PresenceHandler(object):
|
|||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
states = [
|
states = [
|
||||||
state
|
state
|
||||||
for state in states.values()
|
for state in states_d.values()
|
||||||
if state.state != PresenceState.OFFLINE
|
if state.state != PresenceState.OFFLINE
|
||||||
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
|
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
|
||||||
or state.status_msg is not None
|
or state.status_msg is not None
|
||||||
|
@ -210,7 +210,10 @@ class ReplicateCommand(Command):
|
|||||||
|
|
||||||
class UserSyncCommand(Command):
|
class UserSyncCommand(Command):
|
||||||
"""Sent by the client to inform the server that a user has started or
|
"""Sent by the client to inform the server that a user has started or
|
||||||
stopped syncing. Used to calculate presence on the master.
|
stopped syncing on this process.
|
||||||
|
|
||||||
|
This is used by the process handling presence (typically the master) to
|
||||||
|
calculate who is online and who is not.
|
||||||
|
|
||||||
Includes a timestamp of when the last user sync was.
|
Includes a timestamp of when the last user sync was.
|
||||||
|
|
||||||
@ -218,7 +221,7 @@ class UserSyncCommand(Command):
|
|||||||
|
|
||||||
USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>
|
USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>
|
||||||
|
|
||||||
Where <state> is either "start" or "stop"
|
Where <state> is either "start" or "end"
|
||||||
"""
|
"""
|
||||||
|
|
||||||
NAME = "USER_SYNC"
|
NAME = "USER_SYNC"
|
||||||
|
@ -337,13 +337,6 @@ class ReplicationCommandHandler:
|
|||||||
if self._is_master:
|
if self._is_master:
|
||||||
self._notifier.notify_remote_server_up(cmd.data)
|
self._notifier.notify_remote_server_up(cmd.data)
|
||||||
|
|
||||||
def get_currently_syncing_users(self):
|
|
||||||
"""Get the list of currently syncing users (if any). This is called
|
|
||||||
when a connection has been established and we need to send the
|
|
||||||
currently syncing users.
|
|
||||||
"""
|
|
||||||
return self._presence_handler.get_currently_syncing_users()
|
|
||||||
|
|
||||||
def new_connection(self, connection: AbstractConnection):
|
def new_connection(self, connection: AbstractConnection):
|
||||||
"""Called when we have a new connection.
|
"""Called when we have a new connection.
|
||||||
"""
|
"""
|
||||||
@ -361,9 +354,11 @@ class ReplicationCommandHandler:
|
|||||||
if self._factory:
|
if self._factory:
|
||||||
self._factory.resetDelay()
|
self._factory.resetDelay()
|
||||||
|
|
||||||
# Tell the server if we have any users currently syncing (should only
|
# Tell the other end if we have any users currently syncing.
|
||||||
# happen on synchrotrons)
|
currently_syncing = (
|
||||||
currently_syncing = self.get_currently_syncing_users()
|
self._presence_handler.get_currently_syncing_users_for_replication()
|
||||||
|
)
|
||||||
|
|
||||||
now = self._clock.time_msec()
|
now = self._clock.time_msec()
|
||||||
for user_id in currently_syncing:
|
for user_id in currently_syncing:
|
||||||
connection.send_command(
|
connection.send_command(
|
||||||
|
@ -97,7 +97,7 @@ class HomeServer(object):
|
|||||||
pass
|
pass
|
||||||
def get_notifier(self) -> synapse.notifier.Notifier:
|
def get_notifier(self) -> synapse.notifier.Notifier:
|
||||||
pass
|
pass
|
||||||
def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler:
|
def get_presence_handler(self) -> synapse.handlers.presence.BasePresenceHandler:
|
||||||
pass
|
pass
|
||||||
def get_clock(self) -> synapse.util.Clock:
|
def get_clock(self) -> synapse.util.Clock:
|
||||||
pass
|
pass
|
||||||
|
Loading…
Reference in New Issue
Block a user