From 84e6b4001f22b0e8c2f806053189fcdb1e85205b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 May 2015 18:01:31 +0100 Subject: [PATCH] Initial hack at wiring together pagination and backfill --- synapse/handlers/federation.py | 108 +++++++++++++++++++++++++++- synapse/handlers/message.py | 10 ++- synapse/storage/event_federation.py | 28 +++++++- 3 files changed, 141 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e275722..4d39cd4b3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -218,10 +218,11 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): + def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ - extremities = yield self.store.get_oldest_events_in_room(room_id) + if not extremities: + extremities = yield self.store.get_oldest_events_in_room(room_id) pdus = yield self.replication_layer.backfill( dest, @@ -248,6 +249,109 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def maybe_backfill(self, room_id, current_depth): + """Checks the database to see if we should backfill before paginating + """ + extremities = yield self.store.get_oldest_events_with_depth_in_room( + room_id + ) + + logger.debug("Got extremeties: %r", extremities) + + if not extremities: + return + + # Check if we reached a point where we should start backfilling. + sorted_extremeties_tuple = sorted( + extremities.items(), + key=lambda e: -int(e[1]) + ) + max_depth = sorted_extremeties_tuple[0][1] + + logger.debug("max_depth: %r", max_depth) + if current_depth > max_depth: + return + + # Now we need to decide which hosts to hit first. + + # First we try hosts that are already in the room, that were around + # at the time. TODO: HEURISTIC ALERT. + + curr_state = yield self.state_handler.get_current_state(room_id) + + def get_domains_from_state(state): + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member + and event.membership == Membership.JOIN + ] + + joined_domains = {} + for u, d in joined_users: + try: + dom = UserID.from_string(u).domain + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + curr_domains = get_domains_from_state(curr_state) + + logger.debug("curr_domains: %r", curr_domains) + + likely_domains = [ + domain for domain, depth in curr_domains + ] + + @defer.inlineCallbacks + def try_backfill(domains): + # TODO: Should we try multiple of these at a time? + for dom in domains: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + + if events: + defer.returnValue(True) + defer.returnValue(False) + + success = yield try_backfill(likely_domains) + if success: + defer.returnValue(True) + + # Huh, well *those* domains didn't work out. Lets try some domains + # from the time. + + tried_domains = set(likely_domains) + + states = yield defer.gatherResults({ + e: self.state_handler.resolve_state_groups([e])[1] + for e in extremities.keys() + }) + + for e_id, _ in sorted_extremeties_tuple: + likely_domains = get_domains_from_state(states[e_id])[0] + + success = yield try_backfill([ + dom for dom in likely_domains + if dom not in tried_domains + ]) + if success: + defer.returnValue(True) + + tried_domains.update(likely_domains) + + defer.returnValue(False) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17..38e375f86 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -21,7 +21,7 @@ from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID +from synapse.types import UserID, RoomStreamToken from ._base import BaseHandler @@ -92,6 +92,14 @@ class MessageHandler(BaseHandler): yield self.hs.get_event_sources().get_current_token() ) + room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + if room_token.topological is None: + raise SynapseError(400, "Invalid token") + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, room_token.topological + ) + user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 74b4e2359..2b5424ced 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -79,6 +79,28 @@ class EventFederationStore(SQLBaseStore): room_id, ) + def get_oldest_events_with_depth_in_room(self, room_id): + return self.runInteraction( + "get_oldest_events_with_depth_in_room", + self.get_oldest_events_with_depth_in_room_txn, + room_id, + ) + + def get_oldest_events_with_depth_in_room_txn(self, txn, room_id): + sql = ( + "SELECT b.event_id, MAX(e.depth) FROM events as e" + " INNER JOIN event_edges as g" + " ON g.event_id = e.event_id AND g.room_id = e.room_id" + " INNER JOIN event_backward_extremities as b" + " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id" + " WHERE b.room_id = ? AND g.is_state is ?" + " GROUP BY b.event_id" + ) + + txn.execute(sql, (room_id, False,)) + + return dict(txn.fetchall()) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, @@ -247,11 +269,13 @@ class EventFederationStore(SQLBaseStore): do_insert = depth < min_depth if min_depth else True if do_insert: - self._simple_insert_txn( + self._simple_upsert_txn( txn, table="room_depth", - values={ + keyvalues={ "room_id": room_id, + }, + values={ "min_depth": depth, }, )