Fix catchup-on-reconnect for the Federation Stream (#7374)

looks like we managed to break this during the refactorathon.
This commit is contained in:
Richard van der Hoff 2020-05-05 14:15:57 +01:00 committed by GitHub
parent 8123b2f909
commit d5aa7d93ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 158 additions and 48 deletions

1
changelog.d/7374.misc Normal file
View File

@ -0,0 +1 @@
Move catchup of replication streams logic to worker.

View File

@ -31,6 +31,7 @@ Events are replicated via a separate events stream.
import logging import logging
from collections import namedtuple from collections import namedtuple
from typing import List, Tuple
from six import iteritems from six import iteritems
@ -69,7 +70,11 @@ class FederationRemoteSendQueue(object):
self.edus = SortedDict() # stream position -> Edu self.edus = SortedDict() # stream position -> Edu
# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
self.pos = 1 self.pos = 1
# map from stream ID to the time that stream entry was generated, so that we
# can clear out entries after a while
self.pos_time = SortedDict() self.pos_time = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when # EVERYTHING IS SAD. In particular, python only makes new scopes when
@ -250,19 +255,23 @@ class FederationRemoteSendQueue(object):
self._clear_queue_before_pos(token) self._clear_queue_before_pos(token)
async def get_replication_rows( async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None self, instance_name: str, from_token: int, to_token: int, target_row_count: int
): ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
"""Get rows to be sent over federation between the two tokens """Get rows to be sent over federation between the two tokens
Args: Args:
from_token (int) instance_name: the name of the current process
to_token(int) from_token: the previous stream token: the starting point for fetching the
limit (int) updates
federation_ack (int): Optional. The position where the worker is to_token: the new stream token: the point to get updates up to
explicitly acknowledged it has handled. Allows us to drop target_row_count: a target for the number of rows to be returned.
data from before that point
Returns: a triplet `(updates, new_last_token, limited)`, where:
* `updates` is a list of `(token, row)` entries.
* `new_last_token` is the new position in stream.
* `limited` is whether there are more updates to fetch.
""" """
# TODO: Handle limit. # TODO: Handle target_row_count.
# To handle restarts where we wrap around # To handle restarts where we wrap around
if from_token > self.pos: if from_token > self.pos:
@ -270,12 +279,7 @@ class FederationRemoteSendQueue(object):
# list of tuple(int, BaseFederationRow), where the first is the position # list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream. # of the federation stream.
rows = [] rows = [] # type: List[Tuple[int, BaseFederationRow]]
# There should be only one reader, so lets delete everything its
# acknowledged its seen.
if federation_ack:
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence # Fetch changed presence
i = self.presence_changed.bisect_right(from_token) i = self.presence_changed.bisect_right(from_token)
@ -332,7 +336,11 @@ class FederationRemoteSendQueue(object):
# Sort rows based on pos # Sort rows based on pos
rows.sort() rows.sort()
return [(pos, row.TypeId, row.to_data()) for pos, row in rows] return (
[(pos, (row.TypeId, row.to_data())) for pos, row in rows],
to_token,
False,
)
class BaseFederationRow(object): class BaseFederationRow(object):

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
from six import itervalues from six import itervalues
@ -498,14 +498,16 @@ class FederationSender(object):
self._get_per_destination_queue(destination).attempt_new_transaction() self._get_per_destination_queue(destination).attempt_new_transaction()
def get_current_token(self) -> int: @staticmethod
def get_current_token() -> int:
# Dummy implementation for case where federation sender isn't offloaded # Dummy implementation for case where federation sender isn't offloaded
# to a worker. # to a worker.
return 0 return 0
@staticmethod
async def get_replication_rows( async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None instance_name: str, from_token: int, to_token: int, target_row_count: int
): ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
# Dummy implementation for case where federation sender isn't offloaded # Dummy implementation for case where federation sender isn't offloaded
# to a worker. # to a worker.
return [] return [], 0, False

View File

@ -15,11 +15,10 @@
# limitations under the License. # limitations under the License.
import datetime import datetime
import logging import logging
from typing import Dict, Hashable, Iterable, List, Tuple from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from prometheus_client import Counter from prometheus_client import Counter
import synapse.server
from synapse.api.errors import ( from synapse.api.errors import (
FederationDeniedError, FederationDeniedError,
HttpResponseException, HttpResponseException,
@ -34,6 +33,9 @@ from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
if TYPE_CHECKING:
import synapse.server
# This is defined in the Matrix spec and enforced by the receiver. # This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100 MAX_EDUS_PER_TRANSACTION = 100

View File

@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import List from typing import TYPE_CHECKING, List
from canonicaljson import json from canonicaljson import json
import synapse.server
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.events import EventBase from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions from synapse.federation.persistence import TransactionActions
@ -31,6 +30,9 @@ from synapse.logging.opentracing import (
) )
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
if TYPE_CHECKING:
import synapse.server
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -80,7 +80,7 @@ class ReplicationStreamer(object):
for stream in STREAMS_MAP.values(): for stream in STREAMS_MAP.values():
if stream == FederationStream and hs.config.send_federation: if stream == FederationStream and hs.config.send_federation:
# We only support federation stream if federation sending # We only support federation stream if federation sending
# hase been disabled on the master. # has been disabled on the master.
continue continue
self.streams.append(stream(hs)) self.streams.append(stream(hs))

View File

@ -104,7 +104,8 @@ class Stream(object):
implemented by subclasses. implemented by subclasses.
current_token_function is called to get the current token of the underlying current_token_function is called to get the current token of the underlying
stream. stream. It is only meaningful on the process that is the source of the
replication stream (ie, usually the master).
update_function is called to get updates for this stream between a pair of update_function is called to get updates for this stream between a pair of
stream tokens. See the UpdateFunction type definition for more info. stream tokens. See the UpdateFunction type definition for more info.

View File

@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
from collections import namedtuple from collections import namedtuple
from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function from synapse.replication.tcp.streams._base import Stream, make_http_update_function
class FederationStream(Stream): class FederationStream(Stream):
@ -35,21 +35,33 @@ class FederationStream(Stream):
ROW_TYPE = FederationStreamRow ROW_TYPE = FederationStreamRow
def __init__(self, hs): def __init__(self, hs):
# Not all synapse instances will have a federation sender instance, if hs.config.worker_app is None:
# whether that's a `FederationSender` or a `FederationRemoteSendQueue`, # master process: get updates from the FederationRemoteSendQueue.
# so we stub the stream out when that is the case. # (if the master is configured to send federation itself, federation_sender
if hs.config.worker_app is None or hs.should_send_federation(): # will be a real FederationSender, which has stubs for current_token and
# get_replication_rows.)
federation_sender = hs.get_federation_sender() federation_sender = hs.get_federation_sender()
current_token = federation_sender.get_current_token current_token = federation_sender.get_current_token
update_function = db_query_to_update_function( update_function = federation_sender.get_replication_rows
federation_sender.get_replication_rows
) elif hs.should_send_federation():
# federation sender: Query master process
update_function = make_http_update_function(hs, self.NAME)
current_token = self._stub_current_token
else: else:
current_token = lambda: 0 # other worker: stub out the update function (we're not interested in
# any updates so when we get a POSITION we do nothing)
update_function = self._stub_update_function update_function = self._stub_update_function
current_token = self._stub_current_token
super().__init__(hs.get_instance_name(), current_token, update_function) super().__init__(hs.get_instance_name(), current_token, update_function)
@staticmethod
def _stub_current_token():
# dummy current-token method for use on workers
return 0
@staticmethod @staticmethod
async def _stub_update_function(instance_name, from_token, upto_token, limit): async def _stub_update_function(instance_name, from_token, upto_token, limit):
return [], upto_token, False return [], upto_token, False

View File

@ -27,6 +27,7 @@ from synapse.app.generic_worker import (
GenericWorkerServer, GenericWorkerServer,
) )
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.replication.http import streams
from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@ -42,6 +43,10 @@ logger = logging.getLogger(__name__)
class BaseStreamTestCase(unittest.HomeserverTestCase): class BaseStreamTestCase(unittest.HomeserverTestCase):
"""Base class for tests of the replication streams""" """Base class for tests of the replication streams"""
servlets = [
streams.register_servlets,
]
def prepare(self, reactor, clock, hs): def prepare(self, reactor, clock, hs):
# build a replication server # build a replication server
server_factory = ReplicationStreamProtocolFactory(hs) server_factory = ReplicationStreamProtocolFactory(hs)
@ -49,17 +54,11 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.server = server_factory.buildProtocol(None) self.server = server_factory.buildProtocol(None)
# Make a new HomeServer object for the worker # Make a new HomeServer object for the worker
config = self.default_config()
config["worker_app"] = "synapse.app.generic_worker"
config["worker_replication_host"] = "testserv"
config["worker_replication_http_port"] = "8765"
self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["testserv"] = "1.2.3.4"
self.worker_hs = self.setup_test_homeserver( self.worker_hs = self.setup_test_homeserver(
http_client=None, http_client=None,
homeserverToUse=GenericWorkerServer, homeserverToUse=GenericWorkerServer,
config=config, config=self._get_worker_hs_config(),
reactor=self.reactor, reactor=self.reactor,
) )
@ -78,6 +77,13 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self._client_transport = None self._client_transport = None
self._server_transport = None self._server_transport = None
def _get_worker_hs_config(self) -> dict:
config = self.default_config()
config["worker_app"] = "synapse.app.generic_worker"
config["worker_replication_host"] = "testserv"
config["worker_replication_http_port"] = "8765"
return config
def _build_replication_data_handler(self): def _build_replication_data_handler(self):
return TestReplicationDataHandler(self.worker_hs) return TestReplicationDataHandler(self.worker_hs)

View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.federation.send_queue import EduRow
from synapse.replication.tcp.streams.federation import FederationStream
from tests.replication.tcp.streams._base import BaseStreamTestCase
class FederationStreamTestCase(BaseStreamTestCase):
def _get_worker_hs_config(self) -> dict:
# enable federation sending on the worker
config = super()._get_worker_hs_config()
# TODO: make it so we don't need both of these
config["send_federation"] = True
config["worker_app"] = "synapse.app.federation_sender"
return config
def test_catchup(self):
"""Basic test of catchup on reconnect
Makes sure that updates sent while we are offline are received later.
"""
fed_sender = self.hs.get_federation_sender()
received_rows = self.test_handler.received_rdata_rows
fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
self.reconnect()
self.reactor.advance(0)
# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual(received_rows, [])
# We should now see an attempt to connect to the master
request = self.handle_http_replication_attempt()
self.assert_request_is_get_repl_stream_updates(request, "federation")
# we should have received an update row
stream_name, token, row = received_rows.pop()
self.assertEqual(stream_name, "federation")
self.assertIsInstance(row, FederationStream.FederationStreamRow)
self.assertEqual(row.type, EduRow.TypeId)
edurow = EduRow.from_data(row.data)
self.assertEqual(edurow.edu.edu_type, "m.test_edu")
self.assertEqual(edurow.edu.origin, self.hs.hostname)
self.assertEqual(edurow.edu.destination, "testdest")
self.assertEqual(edurow.edu.content, {"a": "b"})
self.assertEqual(received_rows, [])
# additional updates should be transferred without an HTTP hit
fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
self.reactor.advance(0)
# there should be no http hit
self.assertEqual(len(self.reactor.tcpClients), 0)
# ... but we should have a row
self.assertEqual(len(received_rows), 1)
stream_name, token, row = received_rows.pop()
self.assertEqual(stream_name, "federation")
self.assertIsInstance(row, FederationStream.FederationStreamRow)
self.assertEqual(row.type, EduRow.TypeId)
edurow = EduRow.from_data(row.data)
self.assertEqual(edurow.edu.edu_type, "m.test1")
self.assertEqual(edurow.edu.origin, self.hs.hostname)
self.assertEqual(edurow.edu.destination, "testdest")
self.assertEqual(edurow.edu.content, {"c": "d"})

View File

@ -15,7 +15,6 @@
from mock import Mock from mock import Mock
from synapse.handlers.typing import RoomMember from synapse.handlers.typing import RoomMember
from synapse.replication.http import streams
from synapse.replication.tcp.streams import TypingStream from synapse.replication.tcp.streams import TypingStream
from tests.replication.tcp.streams._base import BaseStreamTestCase from tests.replication.tcp.streams._base import BaseStreamTestCase
@ -24,10 +23,6 @@ USER_ID = "@feeling:blue"
class TypingStreamTestCase(BaseStreamTestCase): class TypingStreamTestCase(BaseStreamTestCase):
servlets = [
streams.register_servlets,
]
def _build_replication_data_handler(self): def _build_replication_data_handler(self):
return Mock(wraps=super()._build_replication_data_handler()) return Mock(wraps=super()._build_replication_data_handler())