From cccf86dd05a72f87aaf6fe288c700697954a2144 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Apr 2016 11:19:10 +0100 Subject: [PATCH 1/6] Check if we've already backfilled events --- synapse/handlers/federation.py | 27 ++++++++++++++++++++------- synapse/storage/events.py | 16 ++++++++++++++++ 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 83dab32bc..a38300ec6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -294,6 +294,15 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + seen_events = yield self.store.have_events_in_timeline( + set(e.event_id for e in events) + ) + + events = [e for e in events if e.event_id not in seen_events] + + if not events: + defer.returnValue([]) + event_map = {e.event_id: e for e in events} event_ids = set(e.event_id for e in events) @@ -353,6 +362,7 @@ class FederationHandler(BaseHandler): for a in auth_events.values(): if a.event_id in seen_events: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, "auth_events": { @@ -373,6 +383,11 @@ class FederationHandler(BaseHandler): } }) + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) + events.sort(key=lambda e: e.depth) for event in events: @@ -383,10 +398,9 @@ class FederationHandler(BaseHandler): "event": event, }) - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + yield self._handle_new_event( + dest, event + ) defer.returnValue(events) @@ -458,11 +472,12 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - events = yield self.backfill( + yield self.backfill( dom, room_id, limit=100, extremities=[e for e in extremities.keys()] ) + defer.returnValue(True) except SynapseError as e: logger.info( "Failed to backfill from %s because %s", @@ -488,8 +503,6 @@ class FederationHandler(BaseHandler): ) continue - if events: - defer.returnValue(True) defer.returnValue(False) success = yield try_backfill(likely_domains) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 308a2c9b0..f847c161a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -543,6 +543,22 @@ class EventsStore(SQLBaseStore): (event.event_id, event.redacts) ) + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + def have_events(self, event_ids): """Given a list of event ids, check if we have already processed them. From 0d3da210f01bb323aa7348a16fa3d19c5b8e1f0e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Apr 2016 11:54:41 +0100 Subject: [PATCH 2/6] Add comment --- synapse/handlers/federation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a38300ec6..654fd3829 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -294,6 +294,7 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( set(e.event_id for e in events) ) From 762ada1e079350df46fc8e1c923b7c5beeb533c0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Apr 2016 11:58:04 +0100 Subject: [PATCH 3/6] Add back backfilled parameter that was removed --- synapse/handlers/federation.py | 4 +++- synapse/storage/events.py | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 654fd3829..cb56733b9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1090,7 +1090,8 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None, + backfilled=False): context = yield self._prep_event( origin, event, state=state, @@ -1106,6 +1107,7 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, + backfilled=backfilled, ) # this intentionally does not yield: we don't care about the result diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f847c161a..21487724e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -118,7 +118,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None): + def persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: @@ -131,6 +131,7 @@ class EventsStore(SQLBaseStore): event=event, context=context, current_state=current_state, + backfilled=backfilled, ) except _RollbackButIsFineException: pass @@ -195,7 +196,7 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state): + def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -238,7 +239,7 @@ class EventsStore(SQLBaseStore): return self._persist_events_txn( txn, [(event, context)], - backfilled=False, + backfilled=backfilled, ) @log_function From d3d0be41674a853ee001db9e92b8a30a4fb9ecb8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Apr 2016 11:59:00 +0100 Subject: [PATCH 4/6] Don't append to unused list --- synapse/handlers/federation.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cb56733b9..fe80568ea 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -395,10 +395,6 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue - ev_infos.append({ - "event": event, - }) - yield self._handle_new_event( dest, event ) From 8be1a379093162f8f5b424d0cfd8a05524ea539d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Apr 2016 12:04:19 +0100 Subject: [PATCH 5/6] More comments --- synapse/handlers/federation.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fe80568ea..0c65a4832 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -280,6 +280,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` + + This will attempt to get more events from the remote. This may return + be successfull and still return no events if the other side has no new + events to offer. """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -474,6 +478,8 @@ class FederationHandler(BaseHandler): limit=100, extremities=[e for e in extremities.keys()] ) + # If this succeeded then we probably already have the + # appropriate stuff. defer.returnValue(True) except SynapseError as e: logger.info( From c48465dbaa8f154c0ac5b87db7b5b1790cc483ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Apr 2016 12:48:30 +0100 Subject: [PATCH 6/6] More comments --- synapse/handlers/federation.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0c65a4832..5ac55e10f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -399,6 +399,9 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue + # We store these one at a time since each event depends on the + # previous to work out the state. + # TODO: We can probably do something more clever here. yield self._handle_new_event( dest, event ) @@ -480,6 +483,7 @@ class FederationHandler(BaseHandler): ) # If this succeeded then we probably already have the # appropriate stuff. + # TODO: We can probably do something more intelligent here. defer.returnValue(True) except SynapseError as e: logger.info( @@ -1122,6 +1126,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): + """Creates the appropriate contexts and persists events. The events + should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + """ contexts = yield defer.gatherResults( [ self._prep_event(