synapse-product/synapse/federation/send_queue.py

546 lines
17 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2016-11-21 11:55:23 -05:00
"""A federation sender that forwards things to be sent across replication to
a worker process.
It assumes there is a single worker process feeding off of it.
Each row in the replication stream consists of a type and some json, where the
types indicate whether they are presence, or edus, etc.
Ephemeral or non-event data are queued up in-memory. When the worker requests
updates since a particular point, all in-memory data since before that point is
dropped. We also expire things in the queue after 5 minutes, to ensure that a
dead worker doesn't cause the queues to grow limitlessly.
Events are replicated via a separate events stream.
"""
from .units import Edu
from synapse.storage.presence import UserPresenceState
2016-11-21 12:34:43 -05:00
from synapse.util.metrics import Measure
2018-05-21 20:47:37 -04:00
from synapse.metrics import LaterGauge
2016-11-21 12:34:43 -05:00
from blist import sorteddict
from collections import namedtuple
2017-04-07 06:51:28 -04:00
import logging
from six import itervalues, iteritems
2017-04-07 06:51:28 -04:00
logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(object):
2016-11-21 11:55:23 -05:00
"""A drop in replacement for TransactionQueue"""
def __init__(self, hs):
self.server_name = hs.hostname
self.clock = hs.get_clock()
2017-03-27 09:07:47 -04:00
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
2017-04-11 10:28:24 -04:00
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id
2017-04-11 10:28:24 -04:00
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
self.edus = sorteddict() # stream position -> Edu
self.failures = sorteddict() # stream position -> (destination, Failure)
self.device_messages = sorteddict() # stream position -> destination
2016-11-21 12:34:43 -05:00
self.pos = 1
self.pos_time = sorteddict()
2016-11-21 12:34:43 -05:00
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
2016-11-21 12:59:39 -05:00
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
2016-11-21 12:34:43 -05:00
def register(name, queue):
2018-05-22 18:32:57 -04:00
LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
"", lambda: len(queue))
2016-11-21 12:34:43 -05:00
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
"edus", "failures", "device_messages", "pos_time",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)
def _next_pos(self):
pos = self.pos
self.pos += 1
self.pos_time[self.clock.time_msec()] = pos
return pos
def _clear_queue(self):
2016-11-21 11:55:23 -05:00
"""Clear the queues for anything older than N minutes"""
FIVE_MINUTES_AGO = 5 * 60 * 1000
now = self.clock.time_msec()
keys = self.pos_time.keys()
time = keys.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
position_to_delete = max(keys[:time])
for key in keys[:time]:
del self.pos_time[key]
self._clear_queue_before_pos(position_to_delete)
def _clear_queue_before_pos(self, position_to_delete):
2016-11-21 11:55:23 -05:00
"""Clear all the queues from before a given position"""
2016-11-21 12:34:43 -05:00
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
user_ids = set(
user_id
for uids in itervalues(self.presence_changed)
for user_id in uids
2016-11-21 12:34:43 -05:00
)
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
for user_id in to_del:
del self.presence_map[user_id]
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
live_keys = set()
for edu_key in self.keyed_edu_changed.values():
live_keys.add(edu_key)
to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
for edu_key in to_del:
del self.keyed_edu[edu_key]
# Delete things out of edu map
keys = self.edus.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map
keys = self.device_messages.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
def notify_new_events(self, current_id):
2016-11-21 11:55:23 -05:00
"""As per TransactionQueue"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
def send_edu(self, destination, edu_type, content, key=None):
2016-11-21 11:55:23 -05:00
"""As per TransactionQueue"""
pos = self._next_pos()
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
if key:
assert isinstance(key, tuple)
self.keyed_edu[(destination, key)] = edu
self.keyed_edu_changed[pos] = (destination, key)
else:
self.edus[pos] = edu
2017-03-27 09:07:47 -04:00
self.notifier.on_new_replication_data()
def send_presence(self, states):
2017-04-11 10:34:55 -04:00
"""As per TransactionQueue
Args:
states (list(UserPresenceState))
"""
pos = self._next_pos()
2017-04-11 10:34:55 -04:00
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
2017-03-27 09:07:47 -04:00
self.notifier.on_new_replication_data()
def send_failure(self, failure, destination):
2016-11-21 11:55:23 -05:00
"""As per TransactionQueue"""
pos = self._next_pos()
self.failures[pos] = (destination, str(failure))
2017-03-27 09:07:47 -04:00
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
2016-11-21 11:55:23 -05:00
"""As per TransactionQueue"""
pos = self._next_pos()
self.device_messages[pos] = destination
2017-03-27 09:07:47 -04:00
self.notifier.on_new_replication_data()
def get_current_token(self):
return self.pos - 1
def federation_ack(self, token):
self._clear_queue_before_pos(token)
def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
"""Get rows to be sent over federation between the two tokens
2016-11-23 05:40:44 -05:00
Args:
from_token (int)
to_token(int)
2016-11-23 05:40:44 -05:00
limit (int)
federation_ack (int): Optional. The position where the worker is
explicitly acknowledged it has handled. Allows us to drop
data from before that point
"""
# TODO: Handle limit.
# To handle restarts where we wrap around
if from_token > self.pos:
from_token = -1
2017-04-10 05:02:17 -04:00
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
rows = []
# There should be only one reader, so lets delete everything its
# acknowledged its seen.
2016-11-23 05:40:44 -05:00
if federation_ack:
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for user_id in self.presence_changed[pos]
]
for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
state=self.presence_map[user_id],
)))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
)))
# Fetch changed edus
keys = self.edus.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
2017-04-10 11:55:56 -04:00
edus = ((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Fetch changed failures
keys = self.failures.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
2017-04-10 11:55:56 -04:00
failures = ((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
destination=destination,
failure=failure,
)))
# Fetch changed device messages
keys = self.device_messages.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
destination=destination,
)))
# Sort rows based on pos
rows.sort()
return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
class BaseFederationRow(object):
2017-04-07 06:52:57 -04:00
"""Base class for rows to be sent in the federation stream.
Specifies how to identify, serialize and deserialize the different types.
"""
TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
@staticmethod
def from_data(data):
"""Parse the data from the federation stream into a row.
2017-04-07 06:52:57 -04:00
Args:
data: The value of ``data`` from FederationStreamRow.data, type
depends on the type of stream
"""
raise NotImplementedError()
def to_data(self):
2017-04-07 06:52:57 -04:00
"""Serialize this row to be sent over the federation stream.
Returns:
The value to be sent in FederationStreamRow.data. The type depends
on the type of stream.
"""
raise NotImplementedError()
def add_to_buffer(self, buff):
"""Add this row to the appropriate field in the buffer ready for this
to be sent over federation.
We use a buffer so that we can batch up events that have come in at
the same time and send them all at once.
Args:
buff (BufferedToSend)
"""
raise NotImplementedError()
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
"state", # UserPresenceState
))):
TypeId = "p"
@staticmethod
def from_data(data):
return PresenceRow(
state=UserPresenceState.from_dict(data)
)
def to_data(self):
return self.state.as_dict()
def add_to_buffer(self, buff):
buff.presence.append(self.state)
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
))):
2017-04-12 05:36:29 -04:00
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""
TypeId = "k"
@staticmethod
def from_data(data):
return KeyedEduRow(
key=tuple(data["key"]),
edu=Edu(**data["edu"]),
)
def to_data(self):
return {
"key": self.key,
"edu": self.edu.get_internal_dict(),
}
def add_to_buffer(self, buff):
buff.keyed_edus.setdefault(
self.edu.destination, {}
)[self.key] = self.edu
class EduRow(BaseFederationRow, namedtuple("EduRow", (
"edu", # Edu
))):
2017-04-12 05:36:29 -04:00
"""Streams EDUs that don't have keys. See KeyedEduRow
"""
TypeId = "e"
@staticmethod
def from_data(data):
return EduRow(Edu(**data))
def to_data(self):
return self.edu.get_internal_dict()
def add_to_buffer(self, buff):
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
"destination", # str
"failure",
))):
2017-04-12 05:36:29 -04:00
"""Streams failures to a remote server. Failures are issued when there was
something wrong with a transaction the remote sent us, e.g. it included
an event that was invalid.
"""
TypeId = "f"
@staticmethod
def from_data(data):
return FailureRow(
destination=data["destination"],
failure=data["failure"],
)
def to_data(self):
return {
"destination": self.destination,
"failure": self.failure,
}
def add_to_buffer(self, buff):
buff.failures.setdefault(self.destination, []).append(self.failure)
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
"destination", # str
))):
2017-04-12 05:36:29 -04:00
"""Streams the fact that either a) there is pending to device messages for
users on the remote, or b) a local users device has changed and needs to
be sent to the remote.
"""
TypeId = "d"
@staticmethod
def from_data(data):
return DeviceRow(destination=data["destination"])
def to_data(self):
return {"destination": self.destination}
def add_to_buffer(self, buff):
buff.device_destinations.add(self.destination)
TypeToRow = {
Row.TypeId: Row
for Row in (
PresenceRow,
KeyedEduRow,
EduRow,
FailureRow,
DeviceRow,
)
}
2017-04-10 05:03:07 -04:00
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
"device_destinations", # set of destinations
))
2017-04-10 07:44:17 -04:00
def process_rows_for_federation(transaction_queue, rows):
2017-04-10 05:02:17 -04:00
"""Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args:
2017-04-10 07:44:17 -04:00
transaction_queue (TransactionQueue)
2017-04-10 05:02:17 -04:00
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
"""
2017-04-10 05:02:17 -04:00
# The federation stream contains a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
2017-04-10 05:03:07 -04:00
buff = ParsedFederationStreamData(
presence=[],
keyed_edus={},
edus={},
failures={},
device_destinations=set(),
)
# Parse the rows in the stream and add to the buffer
for row in rows:
2017-04-07 06:51:28 -04:00
if row.type not in TypeToRow:
logger.error("Unrecognized federation row type %r", row.type)
continue
RowType = TypeToRow[row.type]
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
if buff.presence:
transaction_queue.send_presence(buff.presence)
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
2017-04-10 07:44:17 -04:00
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
2017-04-10 07:44:17 -04:00
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
for destination, failure_list in iteritems(buff.failures):
for failure in failure_list:
2017-04-10 07:44:17 -04:00
transaction_queue.send_failure(destination, failure)
for destination in buff.device_destinations:
2017-04-10 07:44:17 -04:00
transaction_queue.send_device_messages(destination)