Add ability to shard the federation sender (#7798)

This commit is contained in:
Erik Johnston 2020-07-10 18:26:36 +01:00 committed by GitHub
parent f1245dc3c0
commit f299441cc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 670 additions and 157 deletions

1
changelog.d/7798.feature Normal file
View File

@ -0,0 +1 @@
Add experimental support for running multiple federation sender processes.

View File

@ -118,38 +118,6 @@ pid_file: DATADIR/homeserver.pid
# #
#enable_search: false #enable_search: false
# Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit
# inbound federation traffic as early as possible, rather than relying
# purely on this application-layer restriction. If not specified, the
# default is to whitelist everything.
#
#federation_domain_whitelist:
# - lon.example.com
# - nyc.example.com
# - syd.example.com
# Prevent federation requests from being sent to the following
# blacklist IP address CIDR ranges. If this option is not specified, or
# specified with an empty list, no ip range blacklist will be enforced.
#
# As of Synapse v1.4.0 this option also affects any outbound requests to identity
# servers provided by user input.
#
# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly
# listed here, since they correspond to unroutable addresses.)
#
federation_ip_range_blacklist:
- '127.0.0.0/8'
- '10.0.0.0/8'
- '172.16.0.0/12'
- '192.168.0.0/16'
- '100.64.0.0/10'
- '169.254.0.0/16'
- '::1/128'
- 'fe80::/64'
- 'fc00::/7'
# List of ports that Synapse should listen on, their purpose and their # List of ports that Synapse should listen on, their purpose and their
# configuration. # configuration.
# #
@ -608,6 +576,39 @@ acme:
# Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit
# inbound federation traffic as early as possible, rather than relying
# purely on this application-layer restriction. If not specified, the
# default is to whitelist everything.
#
#federation_domain_whitelist:
# - lon.example.com
# - nyc.example.com
# - syd.example.com
# Prevent federation requests from being sent to the following
# blacklist IP address CIDR ranges. If this option is not specified, or
# specified with an empty list, no ip range blacklist will be enforced.
#
# As of Synapse v1.4.0 this option also affects any outbound requests to identity
# servers provided by user input.
#
# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly
# listed here, since they correspond to unroutable addresses.)
#
federation_ip_range_blacklist:
- '127.0.0.0/8'
- '10.0.0.0/8'
- '172.16.0.0/12'
- '192.168.0.0/16'
- '100.64.0.0/10'
- '169.254.0.0/16'
- '::1/128'
- 'fe80::/64'
- 'fc00::/7'
## Caching ## ## Caching ##
# Caching can be configured through the following options. # Caching can be configured through the following options.

View File

@ -511,25 +511,7 @@ class GenericWorkerSlavedStore(
SearchWorkerStore, SearchWorkerStore,
BaseSlavedStore, BaseSlavedStore,
): ):
def __init__(self, database, db_conn, hs): pass
super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs)
# We pull out the current federation stream position now so that we
# always have a known value for the federation position in memory so
# that we don't have to bounce via a deferred once when we start the
# replication streams.
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn):
sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()
txn.execute(sql, ("federation",))
rows = txn.fetchall()
txn.close()
return rows[0][0] if rows else -1
class GenericWorkerServer(HomeServer): class GenericWorkerServer(HomeServer):
@ -812,19 +794,11 @@ class FederationSenderHandler(object):
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self._hs = hs self._hs = hs
# if the worker is restarted, we want to pick up where we left off in # Stores the latest position in the federation stream we've gotten up
# the replication stream, so load the position from the database. # to. This is always set before we use it.
# self.federation_position = None
# XXX is this actually worthwhile? Whenever the master is restarted, we'll
# drop some rows anyway (which is mostly fine because we're only dropping
# typing and presence notifications). If the replication stream is
# unreliable, why do we do all this hoop-jumping to store the position in the
# database? See also https://github.com/matrix-org/synapse/issues/7535.
#
self.federation_position = self.store.federation_out_pos_startup
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position
def on_start(self): def on_start(self):
# There may be some events that are persisted but haven't been sent, # There may be some events that are persisted but haven't been sent,
@ -932,7 +906,6 @@ class FederationSenderHandler(object):
# We ACK this token over replication so that the master can drop # We ACK this token over replication so that the master can drop
# its in memory queues # its in memory queues
self._hs.get_tcp_replication().send_federation_ack(current_position) self._hs.get_tcp_replication().send_federation_ack(current_position)
self._last_ack = current_position
except Exception: except Exception:
logger.exception("Error updating federation stream position") logger.exception("Error updating federation stream position")
@ -960,7 +933,7 @@ def start(config_options):
) )
if config.worker_app == "synapse.app.appservice": if config.worker_app == "synapse.app.appservice":
if config.notify_appservices: if config.appservice.notify_appservices:
sys.stderr.write( sys.stderr.write(
"\nThe appservices must be disabled in the main synapse process" "\nThe appservices must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker." "\nbefore they can be run in a separate worker."
@ -970,13 +943,13 @@ def start(config_options):
sys.exit(1) sys.exit(1)
# Force the appservice to start since they will be disabled in the main config # Force the appservice to start since they will be disabled in the main config
config.notify_appservices = True config.appservice.notify_appservices = True
else: else:
# For other worker types we force this to off. # For other worker types we force this to off.
config.notify_appservices = False config.appservice.notify_appservices = False
if config.worker_app == "synapse.app.pusher": if config.worker_app == "synapse.app.pusher":
if config.start_pushers: if config.server.start_pushers:
sys.stderr.write( sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process" "\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker." "\nbefore they can be run in a separate worker."
@ -986,13 +959,13 @@ def start(config_options):
sys.exit(1) sys.exit(1)
# Force the pushers to start since they will be disabled in the main config # Force the pushers to start since they will be disabled in the main config
config.start_pushers = True config.server.start_pushers = True
else: else:
# For other worker types we force this to off. # For other worker types we force this to off.
config.start_pushers = False config.server.start_pushers = False
if config.worker_app == "synapse.app.user_dir": if config.worker_app == "synapse.app.user_dir":
if config.update_user_directory: if config.server.update_user_directory:
sys.stderr.write( sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process" "\nThe update_user_directory must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker." "\nbefore they can be run in a separate worker."
@ -1002,13 +975,13 @@ def start(config_options):
sys.exit(1) sys.exit(1)
# Force the pushers to start since they will be disabled in the main config # Force the pushers to start since they will be disabled in the main config
config.update_user_directory = True config.server.update_user_directory = True
else: else:
# For other worker types we force this to off. # For other worker types we force this to off.
config.update_user_directory = False config.server.update_user_directory = False
if config.worker_app == "synapse.app.federation_sender": if config.worker_app == "synapse.app.federation_sender":
if config.send_federation: if config.federation.send_federation:
sys.stderr.write( sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process" "\nThe send_federation must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker." "\nbefore they can be run in a separate worker."
@ -1018,10 +991,10 @@ def start(config_options):
sys.exit(1) sys.exit(1)
# Force the pushers to start since they will be disabled in the main config # Force the pushers to start since they will be disabled in the main config
config.send_federation = True config.federation.send_federation = True
else: else:
# For other worker types we force this to off. # For other worker types we force this to off.
config.send_federation = False config.federation.send_federation = False
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

View File

@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from hashlib import sha256
from typing import List, Optional
import attr
from netaddr import IPSet
from ._base import Config, ConfigError
@attr.s
class ShardedFederationSendingConfig:
"""Algorithm for choosing which federation sender instance is responsible
for which destionation host.
"""
instances = attr.ib(type=List[str])
def should_send_to(self, instance_name: str, destination: str) -> bool:
"""Whether this instance is responsible for sending transcations for
the given host.
"""
# If multiple federation senders are not defined we always return true.
if not self.instances or len(self.instances) == 1:
return True
# We shard by taking the hash, modulo it by the number of federation
# senders and then checking whether this instance matches the instance
# at that index.
#
# (Technically this introduces some bias and is not entirely uniform, but
# since the hash is so large the bias is ridiculously small).
dest_hash = sha256(destination.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
return self.instances[remainder] == instance_name
class FederationConfig(Config):
section = "federation"
def read_config(self, config, **kwargs):
# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
# "disable" federation
self.send_federation = config.get("send_federation", True)
federation_sender_instances = config.get("federation_sender_instances") or []
self.federation_shard_config = ShardedFederationSendingConfig(
federation_sender_instances
)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None # type: Optional[dict]
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
if federation_domain_whitelist is not None:
# turn the whitelist into a hash for speed of lookup
self.federation_domain_whitelist = {}
for domain in federation_domain_whitelist:
self.federation_domain_whitelist[domain] = True
self.federation_ip_range_blacklist = config.get(
"federation_ip_range_blacklist", []
)
# Attempt to create an IPSet from the given ranges
try:
self.federation_ip_range_blacklist = IPSet(
self.federation_ip_range_blacklist
)
# Always blacklist 0.0.0.0, ::
self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
except Exception as e:
raise ConfigError(
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
# Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit
# inbound federation traffic as early as possible, rather than relying
# purely on this application-layer restriction. If not specified, the
# default is to whitelist everything.
#
#federation_domain_whitelist:
# - lon.example.com
# - nyc.example.com
# - syd.example.com
# Prevent federation requests from being sent to the following
# blacklist IP address CIDR ranges. If this option is not specified, or
# specified with an empty list, no ip range blacklist will be enforced.
#
# As of Synapse v1.4.0 this option also affects any outbound requests to identity
# servers provided by user input.
#
# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly
# listed here, since they correspond to unroutable addresses.)
#
federation_ip_range_blacklist:
- '127.0.0.0/8'
- '10.0.0.0/8'
- '172.16.0.0/12'
- '192.168.0.0/16'
- '100.64.0.0/10'
- '169.254.0.0/16'
- '::1/128'
- 'fe80::/64'
- 'fc00::/7'
"""

View File

@ -23,6 +23,7 @@ from .cas import CasConfig
from .consent_config import ConsentConfig from .consent_config import ConsentConfig
from .database import DatabaseConfig from .database import DatabaseConfig
from .emailconfig import EmailConfig from .emailconfig import EmailConfig
from .federation import FederationConfig
from .groups import GroupsConfig from .groups import GroupsConfig
from .jwt_config import JWTConfig from .jwt_config import JWTConfig
from .key import KeyConfig from .key import KeyConfig
@ -57,6 +58,7 @@ class HomeServerConfig(RootConfig):
config_classes = [ config_classes = [
ServerConfig, ServerConfig,
TlsConfig, TlsConfig,
FederationConfig,
CacheConfig, CacheConfig,
DatabaseConfig, DatabaseConfig,
LoggingConfig, LoggingConfig,
@ -90,4 +92,5 @@ class HomeServerConfig(RootConfig):
ThirdPartyRulesConfig, ThirdPartyRulesConfig,
TracerConfig, TracerConfig,
RedisConfig, RedisConfig,
FederationConfig,
] ]

View File

@ -23,7 +23,6 @@ from typing import Any, Dict, Iterable, List, Optional
import attr import attr
import yaml import yaml
from netaddr import IPSet
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.endpoint import parse_and_validate_server_name
@ -136,11 +135,6 @@ class ServerConfig(Config):
self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl") self.public_baseurl = config.get("public_baseurl")
# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
# "disable" federation
self.send_federation = config.get("send_federation", True)
# Whether to enable user presence. # Whether to enable user presence.
self.use_presence = config.get("use_presence", True) self.use_presence = config.get("use_presence", True)
@ -263,34 +257,6 @@ class ServerConfig(Config):
# due to resource constraints # due to resource constraints
self.admin_contact = config.get("admin_contact", None) self.admin_contact = config.get("admin_contact", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None # type: Optional[dict]
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
if federation_domain_whitelist is not None:
# turn the whitelist into a hash for speed of lookup
self.federation_domain_whitelist = {}
for domain in federation_domain_whitelist:
self.federation_domain_whitelist[domain] = True
self.federation_ip_range_blacklist = config.get(
"federation_ip_range_blacklist", []
)
# Attempt to create an IPSet from the given ranges
try:
self.federation_ip_range_blacklist = IPSet(
self.federation_ip_range_blacklist
)
# Always blacklist 0.0.0.0, ::
self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
except Exception as e:
raise ConfigError(
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
)
if self.public_baseurl is not None: if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/": if self.public_baseurl[-1] != "/":
self.public_baseurl += "/" self.public_baseurl += "/"
@ -743,38 +709,6 @@ class ServerConfig(Config):
# #
#enable_search: false #enable_search: false
# Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit
# inbound federation traffic as early as possible, rather than relying
# purely on this application-layer restriction. If not specified, the
# default is to whitelist everything.
#
#federation_domain_whitelist:
# - lon.example.com
# - nyc.example.com
# - syd.example.com
# Prevent federation requests from being sent to the following
# blacklist IP address CIDR ranges. If this option is not specified, or
# specified with an empty list, no ip range blacklist will be enforced.
#
# As of Synapse v1.4.0 this option also affects any outbound requests to identity
# servers provided by user input.
#
# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly
# listed here, since they correspond to unroutable addresses.)
#
federation_ip_range_blacklist:
- '127.0.0.0/8'
- '10.0.0.0/8'
- '172.16.0.0/12'
- '192.168.0.0/16'
- '100.64.0.0/10'
- '169.254.0.0/16'
- '::1/128'
- 'fe80::/64'
- 'fc00::/7'
# List of ports that Synapse should listen on, their purpose and their # List of ports that Synapse should listen on, their purpose and their
# configuration. # configuration.
# #

View File

@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id self.is_mine_id = hs.is_mine_id
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.federation.federation_shard_config.instances
self._sender_positions = {}
# Pending presence map user_id -> UserPresenceState # Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState] self.presence_map = {} # type: Dict[str, UserPresenceState]
@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object):
def get_current_token(self): def get_current_token(self):
return self.pos - 1 return self.pos - 1
def federation_ack(self, token): def federation_ack(self, instance_name, token):
if self._sender_instances:
# If we have configured multiple federation sender instances we need
# to track their positions separately, and only clear the queue up
# to the token all instances have acked.
self._sender_positions[instance_name] = token
token = min(self._sender_positions.values())
self._clear_queue_before_pos(token) self._clear_queue_before_pos(token)
async def get_replication_rows( async def get_replication_rows(

View File

@ -69,6 +69,9 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs) self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.federation.federation_shard_config
# map from destination to PerDestinationQueue # map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
@ -191,7 +194,13 @@ class FederationSender(object):
) )
return return
destinations = set(destinations) destinations = {
d
for d in destinations
if self._federation_shard_config.should_send_to(
self._instance_name, d
)
}
if send_on_behalf_of is not None: if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server # If we are sending the event on behalf of another server
@ -322,7 +331,12 @@ class FederationSender(object):
# Work out which remote servers should be poked and poke them. # Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(room_id) domains = yield self.state.get_current_hosts_in_room(room_id)
domains = [d for d in domains if d != self.server_name] domains = [
d
for d in domains
if d != self.server_name
and self._federation_shard_config.should_send_to(self._instance_name, d)
]
if not domains: if not domains:
return return
@ -427,6 +441,10 @@ class FederationSender(object):
for destination in destinations: for destination in destinations:
if destination == self.server_name: if destination == self.server_name:
continue continue
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
continue
self._get_per_destination_queue(destination).send_presence(states) self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence") @measure_func("txnqueue._process_presence")
@ -441,6 +459,12 @@ class FederationSender(object):
for destination in destinations: for destination in destinations:
if destination == self.server_name: if destination == self.server_name:
continue continue
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
continue
self._get_per_destination_queue(destination).send_presence(states) self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu( def build_and_send_edu(
@ -462,6 +486,11 @@ class FederationSender(object):
logger.info("Not sending EDU to ourselves") logger.info("Not sending EDU to ourselves")
return return
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
return
edu = Edu( edu = Edu(
origin=self.server_name, origin=self.server_name,
destination=destination, destination=destination,
@ -478,6 +507,11 @@ class FederationSender(object):
edu: edu to send edu: edu to send
key: clobbering key for this edu key: clobbering key for this edu
""" """
if not self._federation_shard_config.should_send_to(
self._instance_name, edu.destination
):
return
queue = self._get_per_destination_queue(edu.destination) queue = self._get_per_destination_queue(edu.destination)
if key: if key:
queue.send_keyed_edu(edu, key) queue.send_keyed_edu(edu, key)
@ -489,6 +523,11 @@ class FederationSender(object):
logger.warning("Not sending device update to ourselves") logger.warning("Not sending device update to ourselves")
return return
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
return
self._get_per_destination_queue(destination).attempt_new_transaction() self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str): def wake_destination(self, destination: str):
@ -502,6 +541,11 @@ class FederationSender(object):
logger.warning("Not waking up ourselves") logger.warning("Not waking up ourselves")
return return
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
return
self._get_per_destination_queue(destination).attempt_new_transaction() self._get_per_destination_queue(destination).attempt_new_transaction()
@staticmethod @staticmethod

View File

@ -74,6 +74,20 @@ class PerDestinationQueue(object):
self._clock = hs.get_clock() self._clock = hs.get_clock()
self._store = hs.get_datastore() self._store = hs.get_datastore()
self._transaction_manager = transaction_manager self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.federation.federation_shard_config
self._should_send_on_this_instance = True
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
# We don't raise an exception here to avoid taking out any other
# processing. We have a guard in `attempt_new_transaction` that
# ensure we don't start sending stuff.
logger.error(
"Create a per destination queue for %s on wrong worker", destination,
)
self._should_send_on_this_instance = False
self._destination = destination self._destination = destination
self.transmission_loop_running = False self.transmission_loop_running = False
@ -180,6 +194,14 @@ class PerDestinationQueue(object):
logger.debug("TX [%s] Transaction already in progress", self._destination) logger.debug("TX [%s] Transaction already in progress", self._destination)
return return
if not self._should_send_on_this_instance:
# We don't raise an exception here to avoid taking out any other
# processing.
logger.error(
"Trying to start a transaction to %s on wrong worker", self._destination
)
return
logger.debug("TX [%s] Starting transaction loop", self._destination) logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process( run_as_background_process(

View File

@ -293,20 +293,22 @@ class FederationAckCommand(Command):
Format:: Format::
FEDERATION_ACK <token> FEDERATION_ACK <instance_name> <token>
""" """
NAME = "FEDERATION_ACK" NAME = "FEDERATION_ACK"
def __init__(self, token): def __init__(self, instance_name, token):
self.instance_name = instance_name
self.token = token self.token = token
@classmethod @classmethod
def from_line(cls, line): def from_line(cls, line):
return cls(int(line)) instance_name, token = line.split(" ")
return cls(instance_name, int(token))
def to_line(self): def to_line(self):
return str(self.token) return "%s %s" % (self.instance_name, self.token)
class RemovePusherCommand(Command): class RemovePusherCommand(Command):

View File

@ -238,7 +238,7 @@ class ReplicationCommandHandler:
federation_ack_counter.inc() federation_ack_counter.inc()
if self._federation_sender: if self._federation_sender:
self._federation_sender.federation_ack(cmd.token) self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
async def on_REMOVE_PUSHER( async def on_REMOVE_PUSHER(
self, conn: AbstractConnection, cmd: RemovePusherCommand self, conn: AbstractConnection, cmd: RemovePusherCommand
@ -527,7 +527,7 @@ class ReplicationCommandHandler:
"""Ack data for the federation stream. This allows the master to drop """Ack data for the federation stream. This allows the master to drop
data stored purely in memory. data stored purely in memory.
""" """
self.send_command(FederationAckCommand(token)) self.send_command(FederationAckCommand(self._instance_name, token))
def send_user_sync( def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int

View File

@ -0,0 +1,22 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- We need to store the stream positions by instance in a sharded config world.
--
-- We default to master as we want the column to be NOT NULL and we correctly
-- reset the instance name to match the config each time we start up.
ALTER TABLE federation_stream_position ADD COLUMN instance_name TEXT NOT NULL DEFAULT 'master';
CREATE UNIQUE INDEX federation_stream_position_instance ON federation_stream_position(type, instance_name);

View File

@ -45,7 +45,7 @@ from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.database import Database from synapse.storage.database import Database, make_in_list_sql_clause
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
@ -253,6 +253,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):
super(StreamWorkerStore, self).__init__(database, db_conn, hs) super(StreamWorkerStore, self).__init__(database, db_conn, hs)
self._instance_name = hs.get_instance_name()
self._send_federation = hs.should_send_federation()
self._federation_shard_config = hs.config.federation.federation_shard_config
# If we're a process that sends federation we may need to reset the
# `federation_stream_position` table to match the current sharding
# config. We don't do this now as otherwise two processes could conflict
# during startup which would cause one to die.
self._need_to_reset_federation_stream_positions = self._send_federation
events_max = self.get_room_max_stream_ordering() events_max = self.get_room_max_stream_ordering()
event_cache_prefill, min_event_val = self.db.get_cache_dict( event_cache_prefill, min_event_val = self.db.get_cache_dict(
db_conn, db_conn,
@ -793,22 +803,95 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return upper_bound, events return upper_bound, events
def get_federation_out_pos(self, typ): async def get_federation_out_pos(self, typ: str) -> int:
return self.db.simple_select_one_onecol( if self._need_to_reset_federation_stream_positions:
await self.db.runInteraction(
"_reset_federation_positions_txn", self._reset_federation_positions_txn
)
self._need_to_reset_federation_stream_positions = False
return await self.db.simple_select_one_onecol(
table="federation_stream_position", table="federation_stream_position",
retcol="stream_id", retcol="stream_id",
keyvalues={"type": typ}, keyvalues={"type": typ, "instance_name": self._instance_name},
desc="get_federation_out_pos", desc="get_federation_out_pos",
) )
def update_federation_out_pos(self, typ, stream_id): async def update_federation_out_pos(self, typ, stream_id):
return self.db.simple_update_one( if self._need_to_reset_federation_stream_positions:
await self.db.runInteraction(
"_reset_federation_positions_txn", self._reset_federation_positions_txn
)
self._need_to_reset_federation_stream_positions = False
return await self.db.simple_update_one(
table="federation_stream_position", table="federation_stream_position",
keyvalues={"type": typ}, keyvalues={"type": typ, "instance_name": self._instance_name},
updatevalues={"stream_id": stream_id}, updatevalues={"stream_id": stream_id},
desc="update_federation_out_pos", desc="update_federation_out_pos",
) )
def _reset_federation_positions_txn(self, txn):
"""Fiddles with the `federation_stream_position` table to make it match
the configured federation sender instances during start up.
"""
# The federation sender instances may have changed, so we need to
# massage the `federation_stream_position` table to have a row per type
# per instance sending federation. If there is a mismatch we update the
# table with the correct rows using the *minimum* stream ID seen. This
# may result in resending of events/EDUs to remote servers, but that is
# preferable to dropping them.
if not self._send_federation:
return
# Pull out the configured instances. If we don't have a shard config then
# we assume that we're the only instance sending.
configured_instances = self._federation_shard_config.instances
if not configured_instances:
configured_instances = [self._instance_name]
elif self._instance_name not in configured_instances:
return
instances_in_table = self.db.simple_select_onecol_txn(
txn,
table="federation_stream_position",
keyvalues={},
retcol="instance_name",
)
if set(instances_in_table) == set(configured_instances):
# Nothing to do
return
sql = """
SELECT type, MIN(stream_id) FROM federation_stream_position
GROUP BY type
"""
txn.execute(sql)
min_positions = dict(txn) # Map from type -> min position
# Ensure we do actually have some values here
assert set(min_positions) == {"federation", "events"}
sql = """
DELETE FROM federation_stream_position
WHERE NOT (%s)
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "instance_name", configured_instances
)
txn.execute(sql % (clause,), args)
for typ, stream_id in min_positions.items():
self.db.simple_upsert_txn(
txn,
table="federation_stream_position",
keyvalues={"type": typ, "instance_name": self._instance_name},
values={"stream_id": stream_id},
)
def has_room_changed_since(self, room_id, stream_id): def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id) return self._events_stream_cache.has_entity_changed(room_id, stream_id)

View File

@ -32,6 +32,7 @@ class FederationAckTestCase(HomeserverTestCase):
def make_homeserver(self, reactor, clock): def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer) hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
return hs return hs
def test_federation_ack_sent(self): def test_federation_ack_sent(self):

View File

@ -0,0 +1,286 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.app.generic_worker import GenericWorkerServer
from synapse.events.builder import EventBuilderFactory
from synapse.replication.http import streams
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client.v1 import login, room
from synapse.types import UserID
from tests import unittest
from tests.server import FakeTransport
logger = logging.getLogger(__name__)
class BaseStreamTestCase(unittest.HomeserverTestCase):
"""Base class for tests of the replication streams"""
servlets = [
streams.register_servlets,
]
def prepare(self, reactor, clock, hs):
# build a replication server
self.server_factory = ReplicationStreamProtocolFactory(hs)
self.streamer = hs.get_replication_streamer()
store = hs.get_datastore()
self.database = store.db
self.reactor.lookups["testserv"] = "1.2.3.4"
def default_config(self):
conf = super().default_config()
conf["send_federation"] = False
return conf
def make_worker_hs(self, extra_config={}):
config = self._get_worker_hs_config()
config.update(extra_config)
mock_federation_client = Mock(spec=["put_json"])
mock_federation_client.put_json.side_effect = lambda *_, **__: defer.succeed({})
worker_hs = self.setup_test_homeserver(
http_client=mock_federation_client,
homeserverToUse=GenericWorkerServer,
config=config,
reactor=self.reactor,
)
store = worker_hs.get_datastore()
store.db._db_pool = self.database._db_pool
repl_handler = ReplicationCommandHandler(worker_hs)
client = ClientReplicationStreamProtocol(
worker_hs, "client", "test", self.clock, repl_handler,
)
server = self.server_factory.buildProtocol(None)
client_transport = FakeTransport(server, self.reactor)
client.makeConnection(client_transport)
server_transport = FakeTransport(client, self.reactor)
server.makeConnection(server_transport)
return worker_hs
def _get_worker_hs_config(self) -> dict:
config = self.default_config()
config["worker_app"] = "synapse.app.federation_sender"
config["worker_replication_host"] = "testserv"
config["worker_replication_http_port"] = "8765"
return config
def replicate(self):
"""Tell the master side of replication that something has happened, and then
wait for the replication to occur.
"""
self.streamer.on_notifier_poke()
self.pump()
def create_room_with_remote_server(self, user, token, remote_server="other_server"):
room = self.helper.create_room_as(user, tok=token)
store = self.hs.get_datastore()
federation = self.hs.get_handlers().federation_handler
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
room_version = self.get_success(store.get_room_version(room))
factory = EventBuilderFactory(self.hs)
factory.hostname = remote_server
user_id = UserID("user", remote_server).to_string()
event_dict = {
"type": EventTypes.Member,
"state_key": user_id,
"content": {"membership": Membership.JOIN},
"sender": user_id,
"room_id": room,
}
builder = factory.for_room_version(room_version, event_dict)
join_event = self.get_success(builder.build(prev_event_ids))
self.get_success(federation.on_send_join_request(remote_server, join_event))
self.replicate()
return room
class FederationSenderTestCase(BaseStreamTestCase):
servlets = [
login.register_servlets,
register_servlets_for_client_rest_resource,
room.register_servlets,
]
def test_send_event_single_sender(self):
"""Test that using a single federation sender worker correctly sends a
new event.
"""
worker_hs = self.make_worker_hs({"send_federation": True})
mock_client = worker_hs.get_http_client()
user = self.register_user("user", "pass")
token = self.login("user", "pass")
room = self.create_room_with_remote_server(user, token)
mock_client.put_json.reset_mock()
self.create_and_send_event(room, UserID.from_string(user))
self.replicate()
# Assert that the event was sent out over federation.
mock_client.put_json.assert_called()
self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
def test_send_event_sharded(self):
"""Test that using two federation sender workers correctly sends
new events.
"""
worker1 = self.make_worker_hs(
{
"send_federation": True,
"worker_name": "sender1",
"federation_sender_instances": ["sender1", "sender2"],
}
)
mock_client1 = worker1.get_http_client()
worker2 = self.make_worker_hs(
{
"send_federation": True,
"worker_name": "sender2",
"federation_sender_instances": ["sender1", "sender2"],
}
)
mock_client2 = worker2.get_http_client()
user = self.register_user("user2", "pass")
token = self.login("user2", "pass")
sent_on_1 = False
sent_on_2 = False
for i in range(20):
server_name = "other_server_%d" % (i,)
room = self.create_room_with_remote_server(user, token, server_name)
mock_client1.reset_mock()
mock_client2.reset_mock()
self.create_and_send_event(room, UserID.from_string(user))
self.replicate()
if mock_client1.put_json.called:
sent_on_1 = True
mock_client2.put_json.assert_not_called()
self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
elif mock_client2.put_json.called:
sent_on_2 = True
mock_client1.put_json.assert_not_called()
self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
else:
raise AssertionError(
"Expected send transaction from one or the other sender"
)
if sent_on_1 and sent_on_2:
break
self.assertTrue(sent_on_1)
self.assertTrue(sent_on_2)
def test_send_typing_sharded(self):
"""Test that using two federation sender workers correctly sends
new typing EDUs.
"""
worker1 = self.make_worker_hs(
{
"send_federation": True,
"worker_name": "sender1",
"federation_sender_instances": ["sender1", "sender2"],
}
)
mock_client1 = worker1.get_http_client()
worker2 = self.make_worker_hs(
{
"send_federation": True,
"worker_name": "sender2",
"federation_sender_instances": ["sender1", "sender2"],
}
)
mock_client2 = worker2.get_http_client()
user = self.register_user("user3", "pass")
token = self.login("user3", "pass")
typing_handler = self.hs.get_typing_handler()
sent_on_1 = False
sent_on_2 = False
for i in range(20):
server_name = "other_server_%d" % (i,)
room = self.create_room_with_remote_server(user, token, server_name)
mock_client1.reset_mock()
mock_client2.reset_mock()
self.get_success(
typing_handler.started_typing(
target_user=UserID.from_string(user),
auth_user=UserID.from_string(user),
room_id=room,
timeout=20000,
)
)
self.replicate()
if mock_client1.put_json.called:
sent_on_1 = True
mock_client2.put_json.assert_not_called()
self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
elif mock_client2.put_json.called:
sent_on_2 = True
mock_client1.put_json.assert_not_called()
self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
else:
raise AssertionError(
"Expected send transaction from one or the other sender"
)
if sent_on_1 and sent_on_2:
break
self.assertTrue(sent_on_1)
self.assertTrue(sent_on_2)