diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 580e591ac..68243d31d 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -63,7 +63,7 @@ class FederationEventHandler(object): Deferred: Resolved when it has successfully been queued for processing. """ - yield self._fill_out_prev_events(event) + yield self.fill_out_prev_events(event) pdu = self.pdu_codec.pdu_from_event(event) @@ -129,7 +129,7 @@ class FederationEventHandler(object): yield self.event_handler.on_receive(new_state_event) @defer.inlineCallbacks - def _fill_out_prev_events(self, event): + def fill_out_prev_events(self, event): if hasattr(event, "prev_events"): return diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 841ad8f13..9f78f3f9b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -43,9 +43,10 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() + self.hs = hs @defer.inlineCallbacks - def persist_event(self, event): + def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: @@ -57,7 +58,7 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - ret = yield self._store_event(event) + ret = yield self._store_event(event, backfilled) defer.returnValue(ret) @defer.inlineCallbacks @@ -79,14 +80,23 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks - def _store_event(self, event): + def _store_event(self, event, backfilled): + # FIXME (erikj): This should be removed when we start amalgamating + # event and pdu storage. + yield self.hs.get_federation().fill_out_prev_events(event) + vals = { + "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, "content": json.dumps(event.content), + "processed": True, } + if backfilled: + vals["token_ordering"] = "-1" + unrec = { k: v for k, v in event.get_full_dict().items() @@ -96,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore, yield self._simple_insert("events", vals) - if hasattr(event, "state_key"): + if not backfilled and hasattr(event, "state_key"): vals = { "event_id": event.event_id, "room_id": event.room_id, diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 2452890ea..b0240e39a 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,12 +14,15 @@ */ CREATE TABLE IF NOT EXISTS events( - ordering INTEGER PRIMARY KEY AUTOINCREMENT, + token_ordering INTEGER AUTOINCREMENT, + topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, - room_id TEXT, - content TEXT, - unrecognized_keys TEXT + room_id TEXT NOT NULL, + content TEXT NOT NULL, + unrecognized_keys TEXT, + processed BOOL NOT NULL, + CONSTRAINT ev_uniq UNIQUE (event_id) ); CREATE TABLE IF NOT EXISTS state_events( @@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS current_state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE + CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cf4b1682b..f7968f576 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -73,13 +73,14 @@ class StreamStore(SQLBaseStore): # Constraints and ordering depend on direction. if from_key < to_key: sql += ( - "AND e.ordering > ? AND e.ordering < ? " - "ORDER BY ordering ASC LIMIT %(limit)d " + "AND e.token_ordering > ? AND e.token_ordering < ? " + "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d " ) % {"limit": limit} else: sql += ( - "AND e.ordering < ? AND e.ordering > ? " - "ORDER BY ordering DESC LIMIT %(limit)d " + "AND e.token_ordering < ? " + "AND e.token_ordering > ? " + "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d " ) % {"limit": int(limit)} rows = yield self._execute_and_decode( @@ -91,9 +92,9 @@ class StreamStore(SQLBaseStore): if rows: if from_key < to_key: - key = max([r["ordering"] for r in rows]) + key = max([r["token_ordering"] for r in rows]) else: - key = min([r["ordering"] for r in rows]) + key = min([r["token_ordering"] for r in rows]) else: key = to_key @@ -105,7 +106,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events WHERE room_id = ? " - "ORDER BY ordering DESC LIMIT ? " + "ORDER BY token_ordering, rowid DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -120,7 +121,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_max_id(self): res = yield self._execute_and_decode( - "SELECT MAX(ordering) as m FROM events" + "SELECT MAX(token_ordering) as m FROM events" ) if not res: