Refactor backfilled into specific behavior function arguments (_persist_events_and_state_updates) (#11417)

Part of https://github.com/matrix-org/synapse/issues/11300

Call stack:

 - `_persist_events_and_state_updates` (added `use_negative_stream_ordering`)
    - `_persist_events_txn`
       - `_update_room_depths_txn` (added `update_room_forward_stream_ordering`)
       - `_update_metadata_tables_txn`
          - `_store_room_members_txn` (added `inhibit_local_membership_updates`)

Using keyword-only arguments (`*`) to reduce the mistakes from `backfilled` being left as a positional argument somewhere and being interpreted wrong by our new arguments.
This commit is contained in:
Eric Eastwood 2021-11-29 16:01:54 -06:00 committed by GitHub
parent a4521ce0a8
commit fb58611d21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 21 deletions

1
changelog.d/11417.misc Normal file
View File

@ -0,0 +1 @@
Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates` and downstream calls).

View File

@ -124,10 +124,12 @@ class PersistEventsStore:
async def _persist_events_and_state_updates( async def _persist_events_and_state_updates(
self, self,
events_and_contexts: List[Tuple[EventBase, EventContext]], events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
current_state_for_room: Dict[str, StateMap[str]], current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState], state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]], new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False, use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
) -> None: ) -> None:
"""Persist a set of events alongside updates to the current state and """Persist a set of events alongside updates to the current state and
forward extremities tables. forward extremities tables.
@ -140,7 +142,14 @@ class PersistEventsStore:
room state room state
new_forward_extremities: Map from room_id to list of event IDs new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room. that are the new forward extremities of the room.
backfilled use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
Returns: Returns:
Resolves when the events have been persisted Resolves when the events have been persisted
@ -162,7 +171,7 @@ class PersistEventsStore:
# #
# Note: Multiple instances of this function cannot be in flight at # Note: Multiple instances of this function cannot be in flight at
# the same time for the same room. # the same time for the same room.
if backfilled: if use_negative_stream_ordering:
stream_ordering_manager = self._backfill_id_gen.get_next_mult( stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts) len(events_and_contexts)
) )
@ -179,13 +188,13 @@ class PersistEventsStore:
"persist_events", "persist_events",
self._persist_events_txn, self._persist_events_txn,
events_and_contexts=events_and_contexts, events_and_contexts=events_and_contexts,
backfilled=backfilled, inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room, state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties, new_forward_extremeties=new_forward_extremeties,
) )
persist_event_counter.inc(len(events_and_contexts)) persist_event_counter.inc(len(events_and_contexts))
if not backfilled: if stream < 0:
# backfilled events have negative stream orderings, so we don't # backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that. # want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set( synapse.metrics.event_persisted_position.set(
@ -319,8 +328,9 @@ class PersistEventsStore:
def _persist_events_txn( def _persist_events_txn(
self, self,
txn: LoggingTransaction, txn: LoggingTransaction,
*,
events_and_contexts: List[Tuple[EventBase, EventContext]], events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool, inhibit_local_membership_updates: bool = False,
state_delta_for_room: Optional[Dict[str, DeltaState]] = None, state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
new_forward_extremeties: Optional[Dict[str, List[str]]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
): ):
@ -333,7 +343,10 @@ class PersistEventsStore:
Args: Args:
txn txn
events_and_contexts: events to persist events_and_contexts: events to persist
backfilled: True if the events were backfilled inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
delete_existing True to purge existing table rows for the events delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to from the database. This is useful when retrying due to
IntegrityError. IntegrityError.
@ -366,9 +379,7 @@ class PersistEventsStore:
events_and_contexts events_and_contexts
) )
self._update_room_depths_txn( self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)
txn, events_and_contexts=events_and_contexts, backfilled=backfilled
)
# _update_outliers_txn filters out any events which have already been # _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list. # persisted, and returns the filtered list.
@ -401,7 +412,7 @@ class PersistEventsStore:
txn, txn,
events_and_contexts=events_and_contexts, events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts, all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled, inhibit_local_membership_updates=inhibit_local_membership_updates,
) )
# We call this last as it assumes we've inserted the events into # We call this last as it assumes we've inserted the events into
@ -1203,7 +1214,6 @@ class PersistEventsStore:
self, self,
txn, txn,
events_and_contexts: List[Tuple[EventBase, EventContext]], events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
): ):
"""Update min_depth for each room """Update min_depth for each room
@ -1211,13 +1221,18 @@ class PersistEventsStore:
txn (twisted.enterprise.adbapi.Connection): db connection txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting we are persisting
backfilled (bool): True if the events were backfilled
""" """
depth_updates: Dict[str, int] = {} depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids # Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id) txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
if not backfilled: # Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
# stream_ordering and happened in the past so we know that we don't
# need to update the stream_ordering tip/front for the room.
assert event.internal_metadata.stream_ordering is not None
if event.internal_metadata.stream_ordering >= 0:
txn.call_after( txn.call_after(
self.store._events_stream_cache.entity_has_changed, self.store._events_stream_cache.entity_has_changed,
event.room_id, event.room_id,
@ -1430,7 +1445,12 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove] return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _update_metadata_tables_txn( def _update_metadata_tables_txn(
self, txn, events_and_contexts, all_events_and_contexts, backfilled self,
txn,
*,
events_and_contexts,
all_events_and_contexts,
inhibit_local_membership_updates: bool = False,
): ):
"""Update all the miscellaneous tables for new events """Update all the miscellaneous tables for new events
@ -1442,7 +1462,10 @@ class PersistEventsStore:
events that we were going to persist. This includes events events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in we've already persisted, etc, that wouldn't appear in
events_and_context. events_and_context.
backfilled (bool): True if the events were backfilled inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
""" """
# Insert all the push actions into the event_push_actions table. # Insert all the push actions into the event_push_actions table.
@ -1516,7 +1539,7 @@ class PersistEventsStore:
for event, _ in events_and_contexts for event, _ in events_and_contexts
if event.type == EventTypes.Member if event.type == EventTypes.Member
], ],
backfilled=backfilled, inhibit_local_membership_updates=inhibit_local_membership_updates,
) )
# Insert event_reference_hashes table. # Insert event_reference_hashes table.
@ -1643,8 +1666,19 @@ class PersistEventsStore:
txn, table="event_reference_hashes", values=vals txn, table="event_reference_hashes", values=vals
) )
def _store_room_members_txn(self, txn, events, backfilled): def _store_room_members_txn(
"""Store a room member in the database.""" self, txn, events, *, inhibit_local_membership_updates: bool = False
):
"""
Store a room member in the database.
Args:
txn: The transaction to use.
events: List of events to store.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""
def non_null_str_or_none(val: Any) -> Optional[str]: def non_null_str_or_none(val: Any) -> Optional[str]:
return val if isinstance(val, str) and "\u0000" not in val else None return val if isinstance(val, str) and "\u0000" not in val else None
@ -1687,7 +1721,7 @@ class PersistEventsStore:
# band membership", like a remote invite or a rejection of a remote invite. # band membership", like a remote invite or a rejection of a remote invite.
if ( if (
self.is_mine_id(event.state_key) self.is_mine_id(event.state_key)
and not backfilled and not inhibit_local_membership_updates
and event.internal_metadata.is_outlier() and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership() and event.internal_metadata.is_out_of_band_membership()
): ):

View File

@ -583,7 +583,8 @@ class EventsPersistenceStorage:
current_state_for_room=current_state_for_room, current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room, state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties, new_forward_extremeties=new_forward_extremeties,
backfilled=backfilled, use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
) )
await self._handle_potentially_left_users(potentially_left_users) await self._handle_potentially_left_users(potentially_left_users)