From 61407986b40e28b590961d364f5618bbe7d44e94 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 30 Mar 2016 16:18:46 +0100 Subject: [PATCH 1/3] Add a entry to current_state_resets table when the current state is reset --- synapse/storage/events.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a4b899549..bd4d503b6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -205,6 +205,15 @@ class EventsStore(SQLBaseStore): txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases, event.room_id) + # Add an entry to the current_state_resets table to record the point + # where we clobbered the current state + stream_order = event.internal_metadata.stream_ordering + self._simple_insert_txn( + txn, + table="current_state_resets", + values={"event_stream_ordering": stream_order} + ) + self._simple_delete_txn( txn, table="current_state_events", From 1fbb094c6fbaab33ef8e17802e37057e83718e7e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 30 Mar 2016 17:19:56 +0100 Subject: [PATCH 2/3] Add replication streams for ex outliers and current state resets --- synapse/replication/resource.py | 17 +++++- synapse/storage/events.py | 60 ++++++++++++++++++- .../storage/schema/delta/30/state_stream.sql | 38 ++++++++++++ 3 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/schema/delta/30/state_stream.sql diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 096a79a7a..7afa1242d 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -204,7 +204,11 @@ class ReplicationResource(Resource): request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill - events_rows, backfill_rows = yield self.store.get_all_new_events( + ( + events_rows, backfill_rows, + forward_ex_outliers, backward_ex_outliers, + state_resets + ) = yield self.store.get_all_new_events( request_backfill, request_events, current_token.backfill, current_token.events, limit @@ -215,6 +219,17 @@ class ReplicationResource(Resource): writer.write_header_and_rows("backfill", backfill_rows, ( "position", "internal", "json", "state_group" )) + writer.write_header_and_rows( + "forward_ex_outliers", forward_ex_outliers, + ("position", "event_id", "state_group") + ) + writer.write_header_and_rows( + "backward_ex_outliers", backward_ex_outliers, + ("position", "event_id", "state_group") + ) + writer.write_header_and_rows( + "state_resets", state_resets, ("position",) + ) @defer.inlineCallbacks def presence(self, writer, current_token): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bd4d503b6..9725a3fed 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -323,6 +323,18 @@ class EventsStore(SQLBaseStore): (metadata_json, event.event_id,) ) + stream_order = event.internal_metadata.stream_ordering + state_group_id = context.state_group or context.new_state_group_id + self._simple_insert_txn( + txn, + table="ex_outlier_stream", + values={ + "event_stream_ordering": stream_order, + "event_id": event.event_id, + "state_group": state_group_id, + } + ) + sql = ( "UPDATE events SET outlier = ?" " WHERE event_id = ?" @@ -1119,8 +1131,34 @@ class EventsStore(SQLBaseStore): if last_forward_id != current_forward_id: txn.execute(sql, (last_forward_id, current_forward_id, limit)) new_forward_events = txn.fetchall() + + if len(new_forward_events) == limit: + upper_bound = new_forward_events[-1][0] + else: + upper_bound = current_forward_id + + sql = ( + "SELECT -event_stream_ordering FROM current_state_resets" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " ORDER BY event_stream_ordering ASC" + ) + txn.execute(sql, (last_forward_id, upper_bound)) + state_resets = txn.fetchall() + + sql = ( + "SELECT -event_stream_ordering, event_id, state_group" + " FROM ex_outlier_stream" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (last_forward_id, upper_bound)) + forward_ex_outliers = txn.fetchall() else: new_forward_events = [] + state_resets = [] + forward_ex_outliers = [] sql = ( "SELECT -e.stream_ordering, ej.internal_metadata, ej.json" @@ -1136,8 +1174,28 @@ class EventsStore(SQLBaseStore): if last_backfill_id != current_backfill_id: txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit)) new_backfill_events = txn.fetchall() + + if len(new_backfill_events) == limit: + upper_bound = new_backfill_events[-1][0] + else: + upper_bound = current_backfill_id + + sql = ( + "SELECT -event_stream_ordering, event_id, state_group" + " FROM ex_outlier_stream" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_backfill_id, -upper_bound)) + backward_ex_outliers = txn.fetchall() else: new_backfill_events = [] + backward_ex_outliers = [] - return (new_forward_events, new_backfill_events) + return ( + new_forward_events, new_backfill_events, + forward_ex_outliers, backward_ex_outliers, + state_resets, + ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql new file mode 100644 index 000000000..706fe1dcf --- /dev/null +++ b/synapse/storage/schema/delta/30/state_stream.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * The positions in the event stream_ordering when the current_state was + * replaced by the state at the event. + */ + +CREATE TABLE IF NOT EXISTS current_state_resets( + event_stream_ordering BIGINT PRIMARY KEY NOT NULL +); + +/* The outlier events that have aquired a state group typically through + * backfill. This is tracked separately to the events table, as assigning a + * state group change the position of the existing event in the stream + * ordering. + * However since a stream_ordering is assigned in persist_event for the + * (event, state) pair, we can use that stream_ordering to identify when + * the new state was assigned for the event. + */ +CREATE TABLE IF NOT EXISTS ex_outlier_stream( + event_stream_ordering BIGINT PRIMARY KEY NOT NULL, + event_id TEXT NOT NULL, + state_group BIGINT NOT NULL +); From 2ec54260350b46c937527bd566b713cf3544f1d2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 31 Mar 2016 10:33:02 +0100 Subject: [PATCH 3/3] Use a namedtuple rather than tuple unpacking --- synapse/replication/resource.py | 16 ++++++---------- synapse/storage/events.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 7afa1242d..69afcb03d 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -204,31 +204,27 @@ class ReplicationResource(Resource): request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill - ( - events_rows, backfill_rows, - forward_ex_outliers, backward_ex_outliers, - state_resets - ) = yield self.store.get_all_new_events( + res = yield self.store.get_all_new_events( request_backfill, request_events, current_token.backfill, current_token.events, limit ) - writer.write_header_and_rows("events", events_rows, ( + writer.write_header_and_rows("events", res.new_forward_events, ( "position", "internal", "json", "state_group" )) - writer.write_header_and_rows("backfill", backfill_rows, ( + writer.write_header_and_rows("backfill", res.new_backfill_events, ( "position", "internal", "json", "state_group" )) writer.write_header_and_rows( - "forward_ex_outliers", forward_ex_outliers, + "forward_ex_outliers", res.forward_ex_outliers, ("position", "event_id", "state_group") ) writer.write_header_and_rows( - "backward_ex_outliers", backward_ex_outliers, + "backward_ex_outliers", res.backward_ex_outliers, ("position", "event_id", "state_group") ) writer.write_header_and_rows( - "state_resets", state_resets, ("position",) + "state_resets", res.state_resets, ("position",) ) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9725a3fed..b7ad045e4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -25,7 +25,7 @@ from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json from contextlib import contextmanager - +from collections import namedtuple import logging import math @@ -1193,9 +1193,16 @@ class EventsStore(SQLBaseStore): new_backfill_events = [] backward_ex_outliers = [] - return ( + return AllNewEventsResult( new_forward_events, new_backfill_events, forward_ex_outliers, backward_ex_outliers, state_resets, ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) + + +AllNewEventsResult = namedtuple("AllNewEventsResult", [ + "new_forward_events", "new_backfill_events", + "forward_ex_outliers", "backward_ex_outliers", + "state_resets" +])