Stub out ServerNoticesSender on the workers

... and have the sync endpoints call it directly rather than obsure indirection
via PresenceHandler
This commit is contained in:
Richard van der Hoff 2018-05-22 10:57:56 +01:00
parent d5dca9a04f
commit 8810685df9
6 changed files with 66 additions and 8 deletions

View File

@ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
@ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down. If `only_keys` is not None, events from keys will be sent down.
""" """
# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(auth_user_id)
auth_user = UserID.from_string(auth_user_id) auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler() presence_handler = self.hs.get_presence_handler()

View File

@ -100,7 +100,6 @@ class PresenceHandler(object):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender() self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()
federation_registry = hs.get_federation_registry() federation_registry = hs.get_federation_registry()
@ -433,9 +432,6 @@ class PresenceHandler(object):
last_user_sync_ts=self.clock.time_msec(), last_user_sync_ts=self.clock.time_msec(),
)]) )])
# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(user_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def _end(): def _end():
try: try:

View File

@ -85,6 +85,7 @@ class SyncRestServlet(RestServlet):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.filtering = hs.get_filtering() self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler() self.presence_handler = hs.get_presence_handler()
self._server_notices_sender = hs.get_server_notices_sender()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
@ -149,6 +150,9 @@ class SyncRestServlet(RestServlet):
else: else:
since_token = None since_token = None
# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(user.to_string())
affect_presence = set_presence != PresenceState.OFFLINE affect_presence = set_presence != PresenceState.OFFLINE
if affect_presence: if affect_presence:

View File

@ -74,6 +74,9 @@ from synapse.rest.media.v1.media_repository import (
) )
from synapse.server_notices.server_notices_manager import ServerNoticesManager from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import (
WorkerServerNoticesSender,
)
from synapse.state import StateHandler, StateResolutionHandler from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.streams.events import EventSources from synapse.streams.events import EventSources
@ -403,9 +406,13 @@ class HomeServer(object):
return FederationHandlerRegistry() return FederationHandlerRegistry()
def build_server_notices_manager(self): def build_server_notices_manager(self):
if self.config.worker_app:
raise Exception("Workers cannot send server notices")
return ServerNoticesManager(self) return ServerNoticesManager(self)
def build_server_notices_sender(self): def build_server_notices_sender(self):
if self.config.worker_app:
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self) return ServerNoticesSender(self)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):

View File

@ -31,9 +31,6 @@ class ServerNoticesSender(object):
def on_user_syncing(self, user_id): def on_user_syncing(self, user_id):
"""Called when the user performs a sync operation. """Called when the user performs a sync operation.
This is only called when /sync (or /events) is called on the synapse
master. In a deployment with synchrotrons, on_user_ip is called
Args: Args:
user_id (str): mxid of user who synced user_id (str): mxid of user who synced
@ -45,7 +42,7 @@ class ServerNoticesSender(object):
) )
def on_user_ip(self, user_id): def on_user_ip(self, user_id):
"""Called when a worker process saw a client request. """Called on the master when a worker process saw a client request.
Args: Args:
user_id (str): mxid user_id (str): mxid
@ -53,6 +50,9 @@ class ServerNoticesSender(object):
Returns: Returns:
Deferred Deferred
""" """
# The synchrotrons use a stubbed version of ServerNoticesSender, so
# we check for notices to send to the user in on_user_ip as well as
# in on_user_syncing
return self._consent_server_notices.maybe_send_server_notice_to_user( return self._consent_server_notices.maybe_send_server_notice_to_user(
user_id, user_id,
) )

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
class WorkerServerNoticesSender(object):
"""Stub impl of ServerNoticesSender which does nothing"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
def on_user_syncing(self, user_id):
"""Called when the user performs a sync operation.
Args:
user_id (str): mxid of user who synced
Returns:
Deferred
"""
return defer.succeed()
def on_user_ip(self, user_id):
"""Called on the master when a worker process saw a client request.
Args:
user_id (str): mxid
Returns:
Deferred
"""
raise AssertionError("on_user_ip unexpectedly called on worker")