mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-17 14:34:21 -05:00
Prepatory work for batching events to send (#13487)
This PR begins work on batching up events during the creation of a room. The PR splits out the creation and sending/persisting of the events. The first three events in the creation of the room-creating the room, joining the creator to the room, and the power levels event are sent sequentially, while the subsequent events are created and collected to be sent at the end of the function. This is currently done by appending them to a list and then iterating over the list to send, the next step (after this PR) would be to send and persist the collected events as a batch.
This commit is contained in:
parent
29269d9d3f
commit
a2cf66a94d
1
changelog.d/13487.misc
Normal file
1
changelog.d/13487.misc
Normal file
@ -0,0 +1 @@
|
||||
Speed up creation of DM rooms.
|
@ -63,6 +63,7 @@ from synapse.types import (
|
||||
MutableStateMap,
|
||||
Requester,
|
||||
RoomAlias,
|
||||
StateMap,
|
||||
StreamToken,
|
||||
UserID,
|
||||
create_requester,
|
||||
@ -567,9 +568,17 @@ class EventCreationHandler:
|
||||
outlier: bool = False,
|
||||
historical: bool = False,
|
||||
depth: Optional[int] = None,
|
||||
state_map: Optional[StateMap[str]] = None,
|
||||
for_batch: bool = False,
|
||||
current_state_group: Optional[int] = None,
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""
|
||||
Given a dict from a client, create a new event.
|
||||
Given a dict from a client, create a new event. If bool for_batch is true, will
|
||||
create an event using the prev_event_ids, and will create an event context for
|
||||
the event using the parameters state_map and current_state_group, thus these parameters
|
||||
must be provided in this case if for_batch is True. The subsequently created event
|
||||
and context are suitable for being batched up and bulk persisted to the database
|
||||
with other similarly created events.
|
||||
|
||||
Creates an FrozenEvent object, filling out auth_events, prev_events,
|
||||
etc.
|
||||
@ -612,16 +621,27 @@ class EventCreationHandler:
|
||||
outlier: Indicates whether the event is an `outlier`, i.e. if
|
||||
it's from an arbitrary point and floating in the DAG as
|
||||
opposed to being inline with the current DAG.
|
||||
|
||||
historical: Indicates whether the message is being inserted
|
||||
back in time around some existing events. This is used to skip
|
||||
a few checks and mark the event as backfilled.
|
||||
|
||||
depth: Override the depth used to order the event in the DAG.
|
||||
Should normally be set to None, which will cause the depth to be calculated
|
||||
based on the prev_events.
|
||||
|
||||
state_map: A state map of previously created events, used only when creating events
|
||||
for batch persisting
|
||||
|
||||
for_batch: whether the event is being created for batch persisting to the db
|
||||
|
||||
current_state_group: the current state group, used only for creating events for
|
||||
batch persisting
|
||||
|
||||
Raises:
|
||||
ResourceLimitError if server is blocked to some resource being
|
||||
exceeded
|
||||
|
||||
Returns:
|
||||
Tuple of created event, Context
|
||||
"""
|
||||
@ -693,6 +713,9 @@ class EventCreationHandler:
|
||||
auth_event_ids=auth_event_ids,
|
||||
state_event_ids=state_event_ids,
|
||||
depth=depth,
|
||||
state_map=state_map,
|
||||
for_batch=for_batch,
|
||||
current_state_group=current_state_group,
|
||||
)
|
||||
|
||||
# In an ideal world we wouldn't need the second part of this condition. However,
|
||||
@ -707,6 +730,10 @@ class EventCreationHandler:
|
||||
# federation as well as those created locally. As of room v3, aliases events
|
||||
# can be created by users that are not in the room, therefore we have to
|
||||
# tolerate them in event_auth.check().
|
||||
if for_batch:
|
||||
assert state_map is not None
|
||||
prev_event_id = state_map.get((EventTypes.Member, event.sender))
|
||||
else:
|
||||
prev_state_ids = await context.get_prev_state_ids(
|
||||
StateFilter.from_types([(EventTypes.Member, None)])
|
||||
)
|
||||
@ -1009,8 +1036,16 @@ class EventCreationHandler:
|
||||
auth_event_ids: Optional[List[str]] = None,
|
||||
state_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
state_map: Optional[StateMap[str]] = None,
|
||||
for_batch: bool = False,
|
||||
current_state_group: Optional[int] = None,
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""Create a new event for a local client
|
||||
"""Create a new event for a local client. If bool for_batch is true, will
|
||||
create an event using the prev_event_ids, and will create an event context for
|
||||
the event using the parameters state_map and current_state_group, thus these parameters
|
||||
must be provided in this case if for_batch is True. The subsequently created event
|
||||
and context are suitable for being batched up and bulk persisted to the database
|
||||
with other similarly created events.
|
||||
|
||||
Args:
|
||||
builder:
|
||||
@ -1043,6 +1078,14 @@ class EventCreationHandler:
|
||||
Should normally be set to None, which will cause the depth to be calculated
|
||||
based on the prev_events.
|
||||
|
||||
state_map: A state map of previously created events, used only when creating events
|
||||
for batch persisting
|
||||
|
||||
for_batch: whether the event is being created for batch persisting to the db
|
||||
|
||||
current_state_group: the current state group, used only for creating events for
|
||||
batch persisting
|
||||
|
||||
Returns:
|
||||
Tuple of created event, context
|
||||
"""
|
||||
@ -1095,6 +1138,18 @@ class EventCreationHandler:
|
||||
builder.type == EventTypes.Create or prev_event_ids
|
||||
), "Attempting to create a non-m.room.create event with no prev_events"
|
||||
|
||||
if for_batch:
|
||||
assert prev_event_ids is not None
|
||||
assert state_map is not None
|
||||
assert current_state_group is not None
|
||||
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
|
||||
event = await builder.build(
|
||||
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
|
||||
)
|
||||
context = await self.state.compute_event_context_for_batched(
|
||||
event, state_map, current_state_group
|
||||
)
|
||||
else:
|
||||
event = await builder.build(
|
||||
prev_event_ids=prev_event_ids,
|
||||
auth_event_ids=auth_event_ids,
|
||||
|
@ -716,7 +716,7 @@ class RoomCreationHandler:
|
||||
|
||||
if (
|
||||
self._server_notices_mxid is not None
|
||||
and requester.user.to_string() == self._server_notices_mxid
|
||||
and user_id == self._server_notices_mxid
|
||||
):
|
||||
# allow the server notices mxid to create rooms
|
||||
is_requester_admin = True
|
||||
@ -1042,7 +1042,9 @@ class RoomCreationHandler:
|
||||
creator_join_profile: Optional[JsonDict] = None,
|
||||
ratelimit: bool = True,
|
||||
) -> Tuple[int, str, int]:
|
||||
"""Sends the initial events into a new room.
|
||||
"""Sends the initial events into a new room. Sends the room creation, membership,
|
||||
and power level events into the room sequentially, then creates and batches up the
|
||||
rest of the events to persist as a batch to the DB.
|
||||
|
||||
`power_level_content_override` doesn't apply when initial state has
|
||||
power level state event content.
|
||||
@ -1053,13 +1055,21 @@ class RoomCreationHandler:
|
||||
"""
|
||||
|
||||
creator_id = creator.user.to_string()
|
||||
|
||||
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
|
||||
|
||||
depth = 1
|
||||
# the last event sent/persisted to the db
|
||||
last_sent_event_id: Optional[str] = None
|
||||
# the most recently created event
|
||||
prev_event: List[str] = []
|
||||
# a map of event types, state keys -> event_ids. We collect these mappings this as events are
|
||||
# created (but not persisted to the db) to determine state for future created events
|
||||
# (as this info can't be pulled from the db)
|
||||
state_map: MutableStateMap[str] = {}
|
||||
# current_state_group of last event created. Used for computing event context of
|
||||
# events to be batched
|
||||
current_state_group = None
|
||||
|
||||
def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
|
||||
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
|
||||
e = {"type": etype, "content": content}
|
||||
|
||||
e.update(event_keys)
|
||||
@ -1067,32 +1077,52 @@ class RoomCreationHandler:
|
||||
|
||||
return e
|
||||
|
||||
async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
|
||||
nonlocal last_sent_event_id
|
||||
async def create_event(
|
||||
etype: str,
|
||||
content: JsonDict,
|
||||
for_batch: bool,
|
||||
**kwargs: Any,
|
||||
) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
|
||||
nonlocal depth
|
||||
nonlocal prev_event
|
||||
|
||||
event = create(etype, content, **kwargs)
|
||||
logger.debug("Sending %s in new room", etype)
|
||||
# Allow these events to be sent even if the user is shadow-banned to
|
||||
# allow the room creation to complete.
|
||||
(
|
||||
sent_event,
|
||||
last_stream_id,
|
||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
event_dict = create_event_dict(etype, content, **kwargs)
|
||||
|
||||
new_event, new_context = await self.event_creation_handler.create_event(
|
||||
creator,
|
||||
event,
|
||||
event_dict,
|
||||
prev_event_ids=prev_event,
|
||||
depth=depth,
|
||||
state_map=state_map,
|
||||
for_batch=for_batch,
|
||||
current_state_group=current_state_group,
|
||||
)
|
||||
depth += 1
|
||||
prev_event = [new_event.event_id]
|
||||
state_map[(new_event.type, new_event.state_key)] = new_event.event_id
|
||||
|
||||
return new_event, new_context
|
||||
|
||||
async def send(
|
||||
event: EventBase,
|
||||
context: synapse.events.snapshot.EventContext,
|
||||
creator: Requester,
|
||||
) -> int:
|
||||
nonlocal last_sent_event_id
|
||||
|
||||
ev = await self.event_creation_handler.handle_new_client_event(
|
||||
requester=creator,
|
||||
event=event,
|
||||
context=context,
|
||||
ratelimit=False,
|
||||
ignore_shadow_ban=True,
|
||||
# Note: we don't pass state_event_ids here because this triggers
|
||||
# an additional query per event to look them up from the events table.
|
||||
prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
|
||||
depth=depth,
|
||||
)
|
||||
|
||||
last_sent_event_id = sent_event.event_id
|
||||
depth += 1
|
||||
last_sent_event_id = ev.event_id
|
||||
|
||||
return last_stream_id
|
||||
# we know it was persisted, so must have a stream ordering
|
||||
assert ev.internal_metadata.stream_ordering
|
||||
return ev.internal_metadata.stream_ordering
|
||||
|
||||
try:
|
||||
config = self._presets_dict[preset_config]
|
||||
@ -1102,9 +1132,13 @@ class RoomCreationHandler:
|
||||
)
|
||||
|
||||
creation_content.update({"creator": creator_id})
|
||||
await send(etype=EventTypes.Create, content=creation_content)
|
||||
creation_event, creation_context = await create_event(
|
||||
EventTypes.Create, creation_content, False
|
||||
)
|
||||
|
||||
logger.debug("Sending %s in new room", EventTypes.Member)
|
||||
await send(creation_event, creation_context, creator)
|
||||
|
||||
# Room create event must exist at this point
|
||||
assert last_sent_event_id is not None
|
||||
member_event_id, _ = await self.room_member_handler.update_membership(
|
||||
@ -1119,14 +1153,22 @@ class RoomCreationHandler:
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = member_event_id
|
||||
prev_event = [member_event_id]
|
||||
|
||||
# update the depth and state map here as the membership event has been created
|
||||
# through a different code path
|
||||
depth += 1
|
||||
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
|
||||
|
||||
# We treat the power levels override specially as this needs to be one
|
||||
# of the first events that get sent into a room.
|
||||
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
|
||||
if pl_content is not None:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.PowerLevels, content=pl_content
|
||||
power_event, power_context = await create_event(
|
||||
EventTypes.PowerLevels, pl_content, False
|
||||
)
|
||||
current_state_group = power_context._state_group
|
||||
last_sent_stream_id = await send(power_event, power_context, creator)
|
||||
else:
|
||||
power_level_content: JsonDict = {
|
||||
"users": {creator_id: 100},
|
||||
@ -1169,47 +1211,68 @@ class RoomCreationHandler:
|
||||
# apply those.
|
||||
if power_level_content_override:
|
||||
power_level_content.update(power_level_content_override)
|
||||
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.PowerLevels, content=power_level_content
|
||||
pl_event, pl_context = await create_event(
|
||||
EventTypes.PowerLevels,
|
||||
power_level_content,
|
||||
False,
|
||||
)
|
||||
current_state_group = pl_context._state_group
|
||||
last_sent_stream_id = await send(pl_event, pl_context, creator)
|
||||
|
||||
events_to_send = []
|
||||
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.CanonicalAlias,
|
||||
content={"alias": room_alias.to_string()},
|
||||
room_alias_event, room_alias_context = await create_event(
|
||||
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
|
||||
)
|
||||
current_state_group = room_alias_context._state_group
|
||||
events_to_send.append((room_alias_event, room_alias_context))
|
||||
|
||||
if (EventTypes.JoinRules, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
|
||||
join_rules_event, join_rules_context = await create_event(
|
||||
EventTypes.JoinRules,
|
||||
{"join_rule": config["join_rules"]},
|
||||
True,
|
||||
)
|
||||
current_state_group = join_rules_context._state_group
|
||||
events_to_send.append((join_rules_event, join_rules_context))
|
||||
|
||||
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.RoomHistoryVisibility,
|
||||
content={"history_visibility": config["history_visibility"]},
|
||||
visibility_event, visibility_context = await create_event(
|
||||
EventTypes.RoomHistoryVisibility,
|
||||
{"history_visibility": config["history_visibility"]},
|
||||
True,
|
||||
)
|
||||
current_state_group = visibility_context._state_group
|
||||
events_to_send.append((visibility_event, visibility_context))
|
||||
|
||||
if config["guest_can_join"]:
|
||||
if (EventTypes.GuestAccess, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.GuestAccess,
|
||||
content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
|
||||
guest_access_event, guest_access_context = await create_event(
|
||||
EventTypes.GuestAccess,
|
||||
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
|
||||
True,
|
||||
)
|
||||
current_state_group = guest_access_context._state_group
|
||||
events_to_send.append((guest_access_event, guest_access_context))
|
||||
|
||||
for (etype, state_key), content in initial_state.items():
|
||||
last_sent_stream_id = await send(
|
||||
etype=etype, state_key=state_key, content=content
|
||||
event, context = await create_event(
|
||||
etype, content, True, state_key=state_key
|
||||
)
|
||||
current_state_group = context._state_group
|
||||
events_to_send.append((event, context))
|
||||
|
||||
if config["encrypted"]:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.RoomEncryption,
|
||||
encryption_event, encryption_context = await create_event(
|
||||
EventTypes.RoomEncryption,
|
||||
{"algorithm": RoomEncryptionAlgorithms.DEFAULT},
|
||||
True,
|
||||
state_key="",
|
||||
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
|
||||
)
|
||||
events_to_send.append((encryption_event, encryption_context))
|
||||
|
||||
for event, context in events_to_send:
|
||||
last_sent_stream_id = await send(event, context, creator)
|
||||
return last_sent_stream_id, last_sent_event_id, depth
|
||||
|
||||
def _generate_room_id(self) -> str:
|
||||
|
@ -420,6 +420,69 @@ class StateHandler:
|
||||
partial_state=partial_state,
|
||||
)
|
||||
|
||||
async def compute_event_context_for_batched(
|
||||
self,
|
||||
event: EventBase,
|
||||
state_ids_before_event: StateMap[str],
|
||||
current_state_group: int,
|
||||
) -> EventContext:
|
||||
"""
|
||||
Generate an event context for an event that has not yet been persisted to the
|
||||
database. Intended for use with events that are created to be persisted in a batch.
|
||||
Args:
|
||||
event: the event the context is being computed for
|
||||
state_ids_before_event: a state map consisting of the state ids of the events
|
||||
created prior to this event.
|
||||
current_state_group: the current state group before the event.
|
||||
"""
|
||||
state_group_before_event_prev_group = None
|
||||
deltas_to_state_group_before_event = None
|
||||
|
||||
state_group_before_event = current_state_group
|
||||
|
||||
# if the event is not state, we are set
|
||||
if not event.is_state():
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_group=state_group_before_event,
|
||||
state_delta_due_to_event={},
|
||||
prev_group=state_group_before_event_prev_group,
|
||||
delta_ids=deltas_to_state_group_before_event,
|
||||
partial_state=False,
|
||||
)
|
||||
|
||||
# otherwise, we'll need to create a new state group for after the event
|
||||
key = (event.type, event.state_key)
|
||||
|
||||
if state_ids_before_event is not None:
|
||||
replaces = state_ids_before_event.get(key)
|
||||
|
||||
if replaces and replaces != event.event_id:
|
||||
event.unsigned["replaces_state"] = replaces
|
||||
|
||||
delta_ids = {key: event.event_id}
|
||||
|
||||
state_group_after_event = (
|
||||
await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
current_state_ids=None,
|
||||
)
|
||||
)
|
||||
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group=state_group_after_event,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_delta_due_to_event=delta_ids,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
partial_state=False,
|
||||
)
|
||||
|
||||
@measure_func()
|
||||
async def resolve_state_groups_for_events(
|
||||
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
|
||||
|
@ -710,7 +710,7 @@ class RoomsCreateTestCase(RoomBase):
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(44, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(35, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_initial_state(self) -> None:
|
||||
# POST with initial_state config key, expect new room id
|
||||
@ -723,7 +723,7 @@ class RoomsCreateTestCase(RoomBase):
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(50, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(38, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_visibility_key(self) -> None:
|
||||
# POST with visibility config key, expect new room id
|
||||
|
Loading…
Reference in New Issue
Block a user