diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c38a63108..25a2be279 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled): + """Insert some number of room events into the necessary database tables. + + Rejected events are only inserted into the events table, the events_json table, + and the rejections table. Things reading from those table will need to check + whether the event was rejected. + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -427,15 +433,21 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } + # Remove the events that we've seen before. event_map = {} to_remove = set() for event, context in events_and_contexts: + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + if event.event_id in have_persisted: + # If we have already seen the event then ignore it. + to_remove.add(event) + continue + # Handle the case of the list including the same event multiple # times. The tricky thing here is when they differ by whether # they are an outlier. - if context.rejected: - continue - if event.event_id in event_map: other = event_map[event.event_id] @@ -457,6 +469,12 @@ class EventsStore(SQLBaseStore): outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: + # We received a copy of an event that we had already stored as + # an outlier in the database. We now have some state at that + # so we need to update the state_groups table with that state. + + # insert into the state_group, state_groups_state and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, ((event, context),)) metadata_json = encode_json( @@ -472,6 +490,8 @@ class EventsStore(SQLBaseStore): (metadata_json, event.event_id,) ) + # Add an entry to the ex_outlier_stream table to replicate the + # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering state_group_id = context.state_group or context.new_state_group_id self._simple_insert_txn( @@ -493,6 +513,8 @@ class EventsStore(SQLBaseStore): (False, event.event_id,) ) + # Update the event_backward_extremities table now that this + # event isn't an outlier any more. self._update_extremeties(txn, [event]) events_and_contexts = [ @@ -557,24 +579,30 @@ class EventsStore(SQLBaseStore): ], ) + # Remove the rejected events from the list now that we've added them + # to the events table and the events_json table. to_remove = set() for event, context in events_and_contexts: if context.rejected: + # Insert the event_id into the rejections table self._store_rejections_txn( txn, event.event_id, context.rejected ) - to_remove.add(event.event_id) + to_remove.add(event) events_and_contexts = [ - ec for ec in events_and_contexts if ec[0].event_id not in to_remove + ec for ec in events_and_contexts if ec[0] not in to_remove ] if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. return # From this point onwards the events are only ones that weren't rejected. for event, context in events_and_contexts: + # Insert all the push actions into the event_push_actions table. if context.push_actions: self._set_push_actions_for_event_and_users_txn( txn, event, context.push_actions @@ -595,12 +623,18 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. self._remove_push_actions_for_event_id_txn( txn, event.room_id, event.redacts ) + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, events_and_contexts) + # Update the event_forward_extremities, event_backward_extremities and + # event_edges tables. self._handle_mult_prev_events( txn, events=[event for event, _ in events_and_contexts], @@ -608,18 +642,25 @@ class EventsStore(SQLBaseStore): for event, _ in events_and_contexts: if event.type == EventTypes.Name: + # Insert into the room_names and event_search tables. self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: + # Insert into the topics table and event_search table. self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Message: + # Insert into the event_search table. self._store_room_message_txn(txn, event) elif event.type == EventTypes.Redaction: + # Insert into the redactions table. self._store_redaction(txn, event) elif event.type == EventTypes.RoomHistoryVisibility: + # Insert into the event_search table. self._store_history_visibility_txn(txn, event) elif event.type == EventTypes.GuestAccess: + # Insert into the event_search table. self._store_guest_access_txn(txn, event) + # Insert into the room_memberships table. self._store_room_members_txn( txn, [ @@ -630,6 +671,7 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, ) + # Insert event_reference_hashes table. self._store_event_reference_hashes_txn( txn, [event for event, _ in events_and_contexts] ) @@ -674,6 +716,7 @@ class EventsStore(SQLBaseStore): ], ) + # Prefil the event cache self._add_to_cache(txn, events_and_contexts) if backfilled: