Wake up transaction queue when remote server comes back online (#6706)

This will be used to retry outbound transactions to a remote server if
we think it might have come back up.
This commit is contained in:
Erik Johnston 2020-01-17 10:27:19 +00:00 committed by GitHub
parent 5ce0b17e38
commit a8a50f5b57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 135 additions and 8 deletions

1
changelog.d/6706.misc Normal file
View File

@ -0,0 +1 @@
Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data.

View File

@ -234,6 +234,10 @@ in which case the `<token>` must be set to `NOW`.
Used exclusively in tests Used exclusively in tests
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.
See `synapse/replication/tcp/commands.py` for a detailed description and See `synapse/replication/tcp/commands.py` for a detailed description and
the format of each command. the format of each command.

View File

@ -158,6 +158,13 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
args.update(self.send_handler.stream_positions()) args.update(self.send_handler.stream_positions())
return args return args
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
# Let's wake up the transaction queue for the server in case we have
# pending stuff to send to it.
self.send_handler.wake_destination(server)
def start(config_options): def start(config_options):
try: try:
@ -205,7 +212,7 @@ class FederationSenderHandler(object):
to the federation sender. to the federation sender.
""" """
def __init__(self, hs, replication_client): def __init__(self, hs: FederationSenderServer, replication_client):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
@ -226,6 +233,9 @@ class FederationSenderHandler(object):
self.store.get_room_max_stream_ordering() self.store.get_room_max_stream_ordering()
) )
def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)
def stream_positions(self): def stream_positions(self):
return {"federation": self.federation_position} return {"federation": self.federation_position}

View File

@ -21,6 +21,7 @@ from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
import synapse
import synapse.metrics import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.sender.transaction_manager import TransactionManager
@ -54,7 +55,7 @@ sent_pdus_destination_dist_total = Counter(
class FederationSender(object): class FederationSender(object):
def __init__(self, hs): def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs self.hs = hs
self.server_name = hs.hostname self.server_name = hs.hostname
@ -482,7 +483,20 @@ class FederationSender(object):
def send_device_messages(self, destination): def send_device_messages(self, destination):
if destination == self.server_name: if destination == self.server_name:
logger.info("Not sending device update to ourselves") logger.warning("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
might have come back.
"""
if destination == self.server_name:
logger.warning("Not waking up ourselves")
return return
self._get_per_destination_queue(destination).attempt_new_transaction() self._get_per_destination_queue(destination).attempt_new_transaction()

View File

@ -44,6 +44,7 @@ from synapse.logging.opentracing import (
tags, tags,
whitelisted_homeserver, whitelisted_homeserver,
) )
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -101,12 +102,17 @@ class NoAuthenticationError(AuthenticationError):
class Authenticator(object): class Authenticator(object):
def __init__(self, hs): def __init__(self, hs: HomeServer):
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.keyring = hs.get_keyring() self.keyring = hs.get_keyring()
self.server_name = hs.hostname self.server_name = hs.hostname
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist self.federation_domain_whitelist = hs.config.federation_domain_whitelist
self.notifer = hs.get_notifier()
self.replication_client = None
if hs.config.worker.worker_app:
self.replication_client = hs.get_tcp_replication()
# A method just so we can pass 'self' as the authenticator to the Servlets # A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(self, request, content): async def authenticate_request(self, request, content):
@ -166,6 +172,17 @@ class Authenticator(object):
try: try:
logger.info("Marking origin %r as up", origin) logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0) await self.store.set_destination_retry_timings(origin, None, 0, 0)
# Inform the relevant places that the remote server is back up.
self.notifer.notify_remote_server_up(origin)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(origin)
except Exception: except Exception:
logger.exception("Error resetting retry timings on %s", origin) logger.exception("Error resetting retry timings on %s", origin)

View File

@ -15,11 +15,13 @@
import logging import logging
from collections import namedtuple from collections import namedtuple
from typing import Callable, List
from prometheus_client import Counter from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
import synapse.server
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state from synapse.handlers.presence import format_user_presence_state
@ -154,7 +156,7 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs): def __init__(self, hs: "synapse.server.HomeServer"):
self.user_to_user_stream = {} self.user_to_user_stream = {}
self.room_to_user_streams = {} self.room_to_user_streams = {}
@ -164,7 +166,12 @@ class Notifier(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.pending_new_room_events = [] self.pending_new_room_events = []
self.replication_callbacks = [] # Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
# Called when remote servers have come back online after having been
# down.
self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]]
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler() self.appservice_handler = hs.get_application_service_handler()
@ -205,7 +212,7 @@ class Notifier(object):
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
) )
def add_replication_callback(self, cb): def add_replication_callback(self, cb: Callable[[], None]):
"""Add a callback that will be called when some new data is available. """Add a callback that will be called when some new data is available.
Callback is not given any arguments. It should *not* return a Deferred - if Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and it needs to do any asynchronous work, a background thread should be started and
@ -213,6 +220,12 @@ class Notifier(object):
""" """
self.replication_callbacks.append(cb) self.replication_callbacks.append(cb)
def add_remote_server_up_callback(self, cb: Callable[[str], None]):
"""Add a callback that will be called when synapse detects a server
has been
"""
self.remote_server_up_callbacks.append(cb)
def on_new_room_event( def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[] self, event, room_stream_id, max_room_stream_id, extra_users=[]
): ):
@ -522,3 +535,15 @@ class Notifier(object):
"""Notify the any replication listeners that there's a new event""" """Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks: for cb in self.replication_callbacks:
cb() cb()
def notify_remote_server_up(self, server: str):
"""Notify any replication that a remote server has come back up
"""
# We call federation_sender directly rather than registering as a
# callback as a) we already have a reference to it and b) it introduces
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)
for cb in self.remote_server_up_callbacks:
cb(server)

View File

@ -143,6 +143,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
if d: if d:
d.callback(data) d.callback(data)
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
def get_streams_to_replicate(self) -> Dict[str, int]: def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to """Called when a new connection has been established and we need to
subscribe to streams. subscribe to streams.

View File

@ -387,6 +387,20 @@ class UserIpCommand(Command):
) )
class RemoteServerUpCommand(Command):
"""Sent when a worker has detected that a remote server is no longer
"down" and retry timings should be reset.
If sent from a client the server will relay to all other workers.
Format::
REMOTE_SERVER_UP <server>
"""
NAME = "REMOTE_SERVER_UP"
_COMMANDS = ( _COMMANDS = (
ServerCommand, ServerCommand,
RdataCommand, RdataCommand,
@ -401,6 +415,7 @@ _COMMANDS = (
RemovePusherCommand, RemovePusherCommand,
InvalidateCacheCommand, InvalidateCacheCommand,
UserIpCommand, UserIpCommand,
RemoteServerUpCommand,
) # type: Tuple[Type[Command], ...] ) # type: Tuple[Type[Command], ...]
# Map of command name to command type. # Map of command name to command type.
@ -414,6 +429,7 @@ VALID_SERVER_COMMANDS = (
ErrorCommand.NAME, ErrorCommand.NAME,
PingCommand.NAME, PingCommand.NAME,
SyncCommand.NAME, SyncCommand.NAME,
RemoteServerUpCommand.NAME,
) )
# The commands the client is allowed to send # The commands the client is allowed to send
@ -427,4 +443,5 @@ VALID_CLIENT_COMMANDS = (
InvalidateCacheCommand.NAME, InvalidateCacheCommand.NAME,
UserIpCommand.NAME, UserIpCommand.NAME,
ErrorCommand.NAME, ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
) )

View File

@ -76,6 +76,7 @@ from synapse.replication.tcp.commands import (
PingCommand, PingCommand,
PositionCommand, PositionCommand,
RdataCommand, RdataCommand,
RemoteServerUpCommand,
ReplicateCommand, ReplicateCommand,
ServerCommand, ServerCommand,
SyncCommand, SyncCommand,
@ -460,6 +461,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
async def on_INVALIDATE_CACHE(self, cmd): async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)
async def on_USER_IP(self, cmd): async def on_USER_IP(self, cmd):
self.streamer.on_user_ip( self.streamer.on_user_ip(
cmd.user_id, cmd.user_id,
@ -555,6 +559,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
def send_sync(self, data): def send_sync(self, data):
self.send_command(SyncCommand(data)) self.send_command(SyncCommand(data))
def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))
def on_connection_closed(self): def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self) BaseReplicationStreamProtocol.on_connection_closed(self)
self.streamer.lost_connection(self) self.streamer.lost_connection(self)
@ -588,6 +595,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""Called when get a new SYNC command.""" """Called when get a new SYNC command."""
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
async def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def get_streams_to_replicate(self): def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to """Called when a new connection has been established and we need to
@ -707,6 +719,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
async def on_SYNC(self, cmd): async def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data) self.handler.on_sync(cmd.data)
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.handler.on_remote_server_up(cmd.data)
def replicate(self, stream_name, token): def replicate(self, stream_name, token):
"""Send the subscription request to the server """Send the subscription request to the server
""" """

View File

@ -120,6 +120,7 @@ class ReplicationStreamer(object):
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self.notifier.add_replication_callback(self.on_notifier_poke) self.notifier.add_replication_callback(self.on_notifier_poke)
self.notifier.add_remote_server_up_callback(self.send_remote_server_up)
# Keeps track of whether we are currently checking for updates # Keeps track of whether we are currently checking for updates
self.is_looping = False self.is_looping = False
@ -288,6 +289,14 @@ class ReplicationStreamer(object):
) )
await self._server_notices_sender.on_user_ip(user_id) await self._server_notices_sender.on_user_ip(user_id)
@measure_func("repl.on_remote_server_up")
def on_remote_server_up(self, server: str):
self.notifier.notify_remote_server_up(server)
def send_remote_server_up(self, server: str):
for conn in self.connections:
conn.send_remote_server_up(server)
def send_sync_to_all_connections(self, data): def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients. """Sends a SYNC command to all clients.

View File

@ -1,3 +1,5 @@
import twisted.internet
import synapse.api.auth import synapse.api.auth
import synapse.config.homeserver import synapse.config.homeserver
import synapse.federation.sender import synapse.federation.sender
@ -9,10 +11,12 @@ import synapse.handlers.deactivate_account
import synapse.handlers.device import synapse.handlers.device
import synapse.handlers.e2e_keys import synapse.handlers.e2e_keys
import synapse.handlers.message import synapse.handlers.message
import synapse.handlers.presence
import synapse.handlers.room import synapse.handlers.room
import synapse.handlers.room_member import synapse.handlers.room_member
import synapse.handlers.set_password import synapse.handlers.set_password
import synapse.http.client import synapse.http.client
import synapse.notifier
import synapse.rest.media.v1.media_repository import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager import synapse.server_notices.server_notices_manager
import synapse.server_notices.server_notices_sender import synapse.server_notices.server_notices_sender
@ -85,3 +89,11 @@ class HomeServer(object):
self, self,
) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: ) -> synapse.server_notices.server_notices_sender.ServerNoticesSender:
pass pass
def get_notifier(self) -> synapse.notifier.Notifier:
pass
def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler:
pass
def get_clock(self) -> synapse.util.Clock:
pass
def get_reactor(self) -> twisted.internet.base.ReactorBase:
pass