diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 53feaa196..f0aa2193f 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -235,80 +235,21 @@ class EventFederationStore(SQLBaseStore): ], ) - self._update_extremeties(txn, events) + self._update_backward_extremeties(txn, events) - def _update_extremeties(self, txn, events): - """Updates the event_*_extremities tables based on the new/updated + def _update_backward_extremeties(self, txn, events): + """Updates the event_backward_extremities tables based on the new/updated events being persisted. This is called for new events *and* for events that were outliers, but - are are now being persisted as non-outliers. + are now being persisted as non-outliers. + + Forward extremities are handled when we first start persisting the events. """ events_by_room = {} for ev in events: events_by_room.setdefault(ev.room_id, []).append(ev) - for room_id, room_events in events_by_room.items(): - prevs = [ - e_id for ev in room_events for e_id, _ in ev.prev_events - if not ev.internal_metadata.is_outlier() - ] - if prevs: - txn.execute( - "DELETE FROM event_forward_extremities" - " WHERE room_id = ?" - " AND event_id in (%s)" % ( - ",".join(["?"] * len(prevs)), - ), - [room_id] + prevs, - ) - - query = ( - "INSERT INTO event_forward_extremities (event_id, room_id)" - " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_edges WHERE prev_event_id = ?" - " )" - ) - - txn.executemany( - query, - [ - (ev.event_id, ev.room_id, ev.event_id) for ev in events - if not ev.internal_metadata.is_outlier() - ] - ) - - # We now insert into stream_ordering_to_exterm a mapping from room_id, - # new stream_ordering to new forward extremeties in the room. - # This allows us to later efficiently look up the forward extremeties - # for a room before a given stream_ordering - max_stream_ord = max( - ev.internal_metadata.stream_ordering for ev in events - ) - new_extrem = {} - for room_id in events_by_room: - event_ids = self._simple_select_onecol_txn( - txn, - table="event_forward_extremities", - keyvalues={"room_id": room_id}, - retcol="event_id", - ) - new_extrem[room_id] = event_ids - - self._simple_insert_many_txn( - txn, - table="stream_ordering_to_exterm", - values=[ - { - "room_id": room_id, - "event_id": event_id, - "stream_ordering": max_stream_ord, - } - for room_id, extrem_evs in new_extrem.items() - for event_id in extrem_evs - ] - ) - query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" @@ -339,11 +280,6 @@ class EventFederationStore(SQLBaseStore): ] ) - for room_id in events_by_room: - txn.call_after( - self.get_latest_event_ids_in_room.invalidate, (room_id,) - ) - def get_forward_extremeties_for_room(self, room_id, stream_ordering): # We want to make the cache more effective, so we clamp to the last # change before the given ordering. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0d6519f30..295f2522b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -279,6 +279,7 @@ class EventsStore(SQLBaseStore): # We can't easily parallelize these since different chunks # might contain the same event. :( + new_forward_extremeties = {} current_state_for_room = {} if not backfilled: # Work out the new "current state" for each room. @@ -296,20 +297,16 @@ class EventsStore(SQLBaseStore): latest_event_ids = yield self.get_latest_event_ids_in_room( room_id ) - new_latest_event_ids = set(latest_event_ids) - for event, ctx in ev_ctx_rm: - if event.internal_metadata.is_outlier(): - continue - - new_latest_event_ids.difference_update( - e_id for e_id, _ in event.prev_events - ) - new_latest_event_ids.add(event.event_id) + new_latest_event_ids = yield self._calculate_new_extremeties( + room_id, [ev for ev, _ in ev_ctx_rm] + ) if new_latest_event_ids == set(latest_event_ids): # No change in extremities, so no change in state continue + new_forward_extremeties[room_id] = new_latest_event_ids + # Now we need to work out the different state sets for # each state extremities state_sets = [] @@ -358,9 +355,45 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, delete_existing=delete_existing, current_state_for_room=current_state_for_room, + new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + @defer.inlineCallbacks + def _calculate_new_extremeties(self, room_id, events): + latest_event_ids = yield self.get_latest_event_ids_in_room( + room_id + ) + new_latest_event_ids = set(latest_event_ids) + new_latest_event_ids.update( + event.event_id for event in events + if not event.internal_metadata.is_outlier() + ) + new_latest_event_ids.difference_update( + e_id + for event in events + for e_id, _ in event.prev_events + if not event.internal_metadata.is_outlier() + ) + + rows = yield self._simple_select_many_batch( + table="event_edges", + column="prev_event_id", + iterable=list(new_latest_event_ids), + retcols=["prev_event_id"], + keyvalues={ + "room_id": room_id, + "is_state": False, + }, + desc="_calculate_new_extremeties", + ) + + new_latest_event_ids.difference_update( + row["prev_event_id"] for row in rows + ) + + defer.returnValue(new_latest_event_ids) + @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, @@ -417,53 +450,10 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) - @log_function - def _persist_event_txn(self, txn, event, context, current_state, backfilled=False, - delete_existing=False): - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.call_after(self._get_current_state_for_key.invalidate_all) - txn.call_after(self.get_rooms_for_user.invalidate_all) - txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - - # Add an entry to the current_state_resets table to record the point - # where we clobbered the current state - stream_order = event.internal_metadata.stream_ordering - self._simple_insert_txn( - txn, - table="current_state_resets", - values={"event_stream_ordering": stream_order} - ) - - self._simple_delete_txn( - txn, - table="current_state_events", - keyvalues={"room_id": event.room_id}, - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - } - ) - - return self._persist_events_txn( - txn, - [(event, context)], - backfilled=backfilled, - delete_existing=delete_existing, - ) - @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, current_state_for_room={}): + delete_existing=False, current_state_for_room={}, + new_forward_extremeties={}): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, @@ -473,6 +463,7 @@ class EventsStore(SQLBaseStore): If delete_existing is True then existing events will be purged from the database before insertion. This is useful when retrying due to IntegrityError. """ + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering for room_id, current_state in current_state_for_room.iteritems(): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) @@ -480,11 +471,10 @@ class EventsStore(SQLBaseStore): # Add an entry to the current_state_resets table to record the point # where we clobbered the current state - stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering self._simple_insert_txn( txn, table="current_state_resets", - values={"event_stream_ordering": stream_order} + values={"event_stream_ordering": max_stream_order} ) self._simple_delete_txn( @@ -507,6 +497,46 @@ class EventsStore(SQLBaseStore): ], ) + for room_id, new_extrem in new_forward_extremeties.items(): + self._simple_delete_txn( + txn, + table="event_forward_extremities", + keyvalues={"room_id": room_id}, + ) + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, (room_id,) + ) + + self._simple_insert_many_txn( + txn, + table="event_forward_extremities", + values=[ + { + "event_id": ev_id, + "room_id": room_id, + } + for room_id, new_extrem in new_forward_extremeties.items() + for ev_id in new_extrem + ], + ) + # We now insert into stream_ordering_to_exterm a mapping from room_id, + # new stream_ordering to new forward extremeties in the room. + # This allows us to later efficiently look up the forward extremeties + # for a room before a given stream_ordering + self._simple_insert_many_txn( + txn, + table="stream_ordering_to_exterm", + values=[ + { + "room_id": room_id, + "event_id": event_id, + "stream_ordering": max_stream_order, + } + for room_id, new_extrem in new_forward_extremeties.items() + for event_id in new_extrem + ] + ) + # Ensure that we don't have the same event twice. # Pick the earliest non-outlier if there is one, else the earliest one. new_events_and_contexts = OrderedDict()