Batch up storing state groups when creating new room (#14918)

This commit is contained in:
Shay 2023-02-24 13:15:29 -08:00 committed by GitHub
parent 335f52d595
commit 1c95ddd09b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 371 additions and 49 deletions

View file

@ -18,6 +18,8 @@ from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Se
import attr
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@ -401,6 +403,123 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
fetched_keys=non_member_types,
)
async def store_state_deltas_for_batched(
self,
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
room_id: str,
prev_group: int,
) -> List[Tuple[EventBase, UnpersistedEventContext]]:
"""Generate and store state deltas for a group of events and contexts created to be
batch persisted. Note that all the events must be in a linear chain (ie a <- b <- c).
Args:
events_and_context: the events to generate and store a state groups for
and their associated contexts
room_id: the id of the room the events were created for
prev_group: the state group of the last event persisted before the batched events
were created
"""
def insert_deltas_group_txn(
txn: LoggingTransaction,
events_and_context: List[Tuple[EventBase, UnpersistedEventContext]],
prev_group: int,
) -> List[Tuple[EventBase, UnpersistedEventContext]]:
"""Generate and store state groups for the provided events and contexts.
Requires that we have the state as a delta from the last persisted state group.
Returns:
A list of state groups
"""
is_in_db = self.db_pool.simple_select_one_onecol_txn(
txn,
table="state_groups",
keyvalues={"id": prev_group},
retcol="id",
allow_none=True,
)
if not is_in_db:
raise Exception(
"Trying to persist state with unpersisted prev_group: %r"
% (prev_group,)
)
num_state_groups = sum(
1 for event, _ in events_and_context if event.is_state()
)
state_groups = self._state_group_seq_gen.get_next_mult_txn(
txn, num_state_groups
)
sg_before = prev_group
state_group_iter = iter(state_groups)
for event, context in events_and_context:
if not event.is_state():
context.state_group_after_event = sg_before
context.state_group_before_event = sg_before
continue
sg_after = next(state_group_iter)
context.state_group_after_event = sg_after
context.state_group_before_event = sg_before
context.state_delta_due_to_event = {
(event.type, event.state_key): event.event_id
}
sg_before = sg_after
self.db_pool.simple_insert_many_txn(
txn,
table="state_groups",
keys=("id", "room_id", "event_id"),
values=[
(context.state_group_after_event, room_id, event.event_id)
for event, context in events_and_context
if event.is_state()
],
)
self.db_pool.simple_insert_many_txn(
txn,
table="state_group_edges",
keys=("state_group", "prev_state_group"),
values=[
(
context.state_group_after_event,
context.state_group_before_event,
)
for event, context in events_and_context
if event.is_state()
],
)
self.db_pool.simple_insert_many_txn(
txn,
table="state_groups_state",
keys=("state_group", "room_id", "type", "state_key", "event_id"),
values=[
(
context.state_group_after_event,
room_id,
key[0],
key[1],
state_id,
)
for event, context in events_and_context
if context.state_delta_due_to_event is not None
for key, state_id in context.state_delta_due_to_event.items()
],
)
return events_and_context
return await self.db_pool.runInteraction(
"store_state_deltas_for_batched.insert_deltas_group",
insert_deltas_group_txn,
events_and_context,
prev_group,
)
async def store_state_group(
self,
event_id: str,