synapse-product/synapse/replication/tcp/streams/events.py
Erik Johnston 4cff617df1
Move catchup of replication streams to worker. (#7024)
This changes the replication protocol so that the server does not send down `RDATA` for rows that happened before the client connected. Instead, the server will send a `POSITION` and clients then query the database (or master out of band) to get up to date.
2020-03-25 14:54:01 +00:00

148 lines
4.4 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
from typing import Tuple, Type
import attr
from ._base import Stream, db_query_to_update_function
"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
identifier before the real data. For example::
RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
An "ev" row is sent for each new event. The fields in the data part are:
* The new event id
* The room id for the event
* The type of the new event
* The state key of the event, for state events
* The event id of an event which is redacted by this event.
A "state" row is sent whenever the "current state" in a room changes. The fields in the
data part are:
* The room id for the state change
* The event type of the state which has changed
* The state_key of the state which has changed
* The event id of the new state
"""
@attr.s(slots=True, frozen=True)
class EventsStreamRow(object):
"""A parsed row from the events replication stream"""
type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
data = attr.ib() # BaseEventsStreamRow
class BaseEventsStreamRow(object):
"""Base class for rows to be sent in the events stream.
Specifies how to identify, serialize and deserialize the different types.
"""
# Unique string that ids the type. Must be overriden in sub classes.
TypeId = None # type: str
@classmethod
def from_data(cls, data):
"""Parse the data from the replication stream into a row.
By default we just call the constructor with the data list as arguments
Args:
data: The value of the data object from the replication stream
"""
return cls(*data)
@attr.s(slots=True, frozen=True)
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
event_id = attr.ib() # str
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str, optional
redacts = attr.ib() # str, optional
relates_to = attr.ib() # str, optional
@attr.s(slots=True, frozen=True)
class EventsStreamCurrentStateRow(BaseEventsStreamRow):
TypeId = "state"
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str
event_id = attr.ib() # str, optional
_EventRows = (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
) # type: Tuple[Type[BaseEventsStreamRow], ...]
TypeToRow = {Row.TypeId: Row for Row in _EventRows}
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
def __init__(self, hs):
self._store = hs.get_datastore()
self.current_token = self._store.get_current_events_token # type: ignore
self.update_function = db_query_to_update_function(self._update_function) # type: ignore
super(EventsStream, self).__init__(hs)
async def _update_function(self, from_token, current_token, limit=None):
event_rows = await self._store.get_all_new_forward_event_rows(
from_token, current_token, limit
)
event_updates = (
(row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
)
state_rows = await self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (
(row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
)
all_updates = heapq.merge(event_updates, state_updates)
return all_updates
@classmethod
def parse_row(cls, row):
(typ, data) = row
data = TypeToRow[typ].from_data(data)
return EventsStreamRow(typ, data)