Merge pull request #4955 from matrix-org/rav/merge_state_into_events

Combine the CurrentStateDeltaStream into the EventStream
This commit is contained in:
Richard van der Hoff 2019-03-28 18:32:13 +00:00 committed by GitHub
commit 902cdc63b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 141 additions and 49 deletions

1
changelog.d/4955.bugfix Normal file
View File

@ -0,0 +1 @@
Fix sync bug which made accepting invites unreliable in worker-mode synapses.

View File

@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows. # we don't need to optimise this for multiple rows.
for row in rows: for row in rows:
event = yield self.store.get_event(row.event_id) if row.type != EventsStreamEventRow.TypeId:
continue
event = yield self.store.get_event(row.data.event_id)
extra_users = () extra_users = ()
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
extra_users = (event.state_key,) extra_users = (event.state_key,)

View File

@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamCurrentStateRow,
)
from synapse.rest.client.v2_alpha import user_directory from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
prefilled_cache=curr_state_delta_prefill, prefilled_cache=curr_state_delta_prefill,
) )
self._current_state_delta_pos = events_max
def stream_positions(self): def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions() result = super(UserDirectorySlaveStore, self).stream_positions()
result["current_state_deltas"] = self._current_state_delta_pos
return result return result
def process_replication_rows(self, stream_name, token, rows): def process_replication_rows(self, stream_name, token, rows):
if stream_name == "current_state_deltas": if stream_name == EventsStream.NAME:
self._current_state_delta_pos = token self._stream_id_gen.advance(token)
for row in rows: for row in rows:
if row.type != EventsStreamCurrentStateRow.TypeId:
continue
self._curr_state_delta_stream_cache.entity_has_changed( self._curr_state_delta_stream_cache.entity_has_changed(
row.room_id, token row.data.room_id, token
) )
return super(UserDirectorySlaveStore, self).process_replication_rows( return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows stream_name, token, rows
@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
yield super(UserDirectoryReplicationHandler, self).on_rdata( yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows stream_name, token, rows
) )
if stream_name == "current_state_deltas": if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory) run_in_background(self._notify_directory)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -16,6 +16,7 @@
import logging import logging
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
@ -79,9 +80,12 @@ class SlavedEventStore(EventFederationWorkerStore,
if stream_name == "events": if stream_name == "events":
self._stream_id_gen.advance(token) self._stream_id_gen.advance(token)
for row in rows: for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
data = row.data
self.invalidate_caches_for_event( self.invalidate_caches_for_event(
token, row.event_id, row.room_id, row.type, row.state_key, token, data.event_id, data.room_id, data.type, data.state_key,
row.redacts, data.redacts,
backfilled=False, backfilled=False,
) )
elif stream_name == "backfill": elif stream_name == "backfill":

View File

@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire::
> POSITION backfill 1 > POSITION backfill 1
> POSITION caches 1 > POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823", > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618 < PING 1490197675618
> ERROR server stopping > ERROR server stopping
* connection closed by server * * connection closed by server *

View File

@ -44,7 +44,6 @@ STREAMS_MAP = {
federation.FederationStream, federation.FederationStream,
_base.TagAccountDataStream, _base.TagAccountDataStream,
_base.AccountDataStream, _base.AccountDataStream,
_base.CurrentStateDeltaStream,
_base.GroupServerStream, _base.GroupServerStream,
) )
} }

View File

@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
"data_type", # str "data_type", # str
"data", # dict "data", # dict
)) ))
CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
"room_id", # str
"type", # str
"state_key", # str
"event_id", # str, optional
))
GroupsStreamRow = namedtuple("GroupsStreamRow", ( GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str "group_id", # str
"user_id", # str "user_id", # str
@ -428,21 +422,6 @@ class AccountDataStream(Stream):
defer.returnValue(results) defer.returnValue(results)
class CurrentStateDeltaStream(Stream):
"""Current state for a room was changed
"""
NAME = "current_state_deltas"
ROW_TYPE = CurrentStateDeltaStreamRow
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_max_current_state_delta_stream_id
self.update_function = store.get_all_updated_current_state_deltas
super(CurrentStateDeltaStream, self).__init__(hs)
class GroupServerStream(Stream): class GroupServerStream(Stream):
NAME = "groups" NAME = "groups"
ROW_TYPE = GroupsStreamRow ROW_TYPE = GroupsStreamRow

View File

@ -13,28 +13,134 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from collections import namedtuple import heapq
import attr
from twisted.internet import defer
from ._base import Stream from ._base import Stream
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str """Handling of the 'events' replication stream
"room_id", # str
"type", # str This stream contains rows of various types. Each row therefore contains a 'type'
"state_key", # str, optional identifier before the real data. For example::
"redacts", # str, optional
)) RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
An "ev" row is sent for each new event. The fields in the data part are:
* The new event id
* The room id for the event
* The type of the new event
* The state key of the event, for state events
* The event id of an event which is redacted by this event.
A "state" row is sent whenever the "current state" in a room changes. The fields in the
data part are:
* The room id for the state change
* The event type of the state which has changed
* The state_key of the state which has changed
* The event id of the new state
"""
@attr.s(slots=True, frozen=True)
class EventsStreamRow(object):
"""A parsed row from the events replication stream"""
type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
data = attr.ib() # BaseEventsStreamRow
class BaseEventsStreamRow(object):
"""Base class for rows to be sent in the events stream.
Specifies how to identify, serialize and deserialize the different types.
"""
TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
@classmethod
def from_data(cls, data):
"""Parse the data from the replication stream into a row.
By default we just call the constructor with the data list as arguments
Args:
data: The value of the data object from the replication stream
"""
return cls(*data)
@attr.s(slots=True, frozen=True)
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
event_id = attr.ib() # str
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str, optional
redacts = attr.ib() # str, optional
@attr.s(slots=True, frozen=True)
class EventsStreamCurrentStateRow(BaseEventsStreamRow):
TypeId = "state"
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str
event_id = attr.ib() # str, optional
TypeToRow = {
Row.TypeId: Row
for Row in (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
)
}
class EventsStream(Stream): class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not """We received a new event, or an event went from being an outlier to not
""" """
NAME = "events" NAME = "events"
ROW_TYPE = EventStreamRow
def __init__(self, hs): def __init__(self, hs):
store = hs.get_datastore() self._store = hs.get_datastore()
self.current_token = store.get_current_events_token self.current_token = self._store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows
super(EventsStream, self).__init__(hs) super(EventsStream, self).__init__(hs)
@defer.inlineCallbacks
def update_function(self, from_token, current_token, limit=None):
event_rows = yield self._store.get_all_new_forward_event_rows(
from_token, current_token, limit,
)
event_updates = (
(row[0], EventsStreamEventRow.TypeId, row[1:])
for row in event_rows
)
state_rows = yield self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (
(row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
for row in state_rows
)
all_updates = heapq.merge(event_updates, state_updates)
defer.returnValue(all_updates)
@classmethod
def parse_row(cls, row):
(typ, data) = row
data = TypeToRow[typ].from_data(data)
return EventsStreamRow(typ, data)

View File

@ -2235,9 +2235,6 @@ class EventsStore(
(int(res["topological_ordering"]), int(res["stream_ordering"])) (int(res["topological_ordering"]), int(res["stream_ordering"]))
) )
def get_max_current_state_delta_stream_id(self):
return self._stream_id_gen.get_current_token()
def get_all_updated_current_state_deltas(self, from_token, to_token, limit): def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
def get_all_updated_current_state_deltas_txn(txn): def get_all_updated_current_state_deltas_txn(txn):
sql = """ sql = """