Add initial power level event to batch of bulk persisted events when creating a new room. (#14228)

This commit is contained in:
Shay 2022-10-21 10:46:22 -07:00 committed by GitHub
parent 1d45ad8b2a
commit b7a7ff6ee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 82 additions and 58 deletions

1
changelog.d/14228.misc Normal file
View File

@ -0,0 +1 @@
Add initial power level event to batch of bulk persisted events when creating a new room.

View File

@ -1017,7 +1017,9 @@ class FederationHandler:
context = EventContext.for_outlier(self._storage_controllers) context = EventContext.for_outlier(self._storage_controllers)
await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context) await self._bulk_push_rule_evaluator.action_for_events_by_user(
[(event, context)]
)
try: try:
await self._federation_event_handler.persist_events_and_notify( await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)] event.room_id, [(event, context)]

View File

@ -2171,8 +2171,8 @@ class FederationEventHandler:
min_depth, min_depth,
) )
else: else:
await self._bulk_push_rule_evaluator.action_for_event_by_user( await self._bulk_push_rule_evaluator.action_for_events_by_user(
event, context [(event, context)]
) )
try: try:

View File

@ -1433,17 +1433,9 @@ class EventCreationHandler:
a room that has been un-partial stated. a room that has been un-partial stated.
""" """
for event, context in events_and_context: await self._bulk_push_rule_evaluator.action_for_events_by_user(
# Skip push notification actions for historical messages events_and_context
# because we don't want to notify people about old history back in time. )
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
with opentracing.start_active_span("calculate_push_actions"):
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
try: try:
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.

View File

@ -1055,9 +1055,6 @@ class RoomCreationHandler:
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
depth = 1 depth = 1
# the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None
# the most recently created event # the most recently created event
prev_event: List[str] = [] prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this as events are # a map of event types, state keys -> event_ids. We collect these mappings this as events are
@ -1102,26 +1099,6 @@ class RoomCreationHandler:
return new_event, new_context return new_event, new_context
async def send(
event: EventBase,
context: synapse.events.snapshot.EventContext,
creator: Requester,
) -> int:
nonlocal last_sent_event_id
ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
events_and_context=[(event, context)],
ratelimit=False,
ignore_shadow_ban=True,
)
last_sent_event_id = ev.event_id
# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
return ev.internal_metadata.stream_ordering
try: try:
config = self._presets_dict[preset_config] config = self._presets_dict[preset_config]
except KeyError: except KeyError:
@ -1135,10 +1112,14 @@ class RoomCreationHandler:
) )
logger.debug("Sending %s in new room", EventTypes.Member) logger.debug("Sending %s in new room", EventTypes.Member)
await send(creation_event, creation_context, creator) ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
events_and_context=[(creation_event, creation_context)],
ratelimit=False,
ignore_shadow_ban=True,
)
last_sent_event_id = ev.event_id
# Room create event must exist at this point
assert last_sent_event_id is not None
member_event_id, _ = await self.room_member_handler.update_membership( member_event_id, _ = await self.room_member_handler.update_membership(
creator, creator,
creator.user, creator.user,
@ -1157,6 +1138,7 @@ class RoomCreationHandler:
depth += 1 depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
events_to_send = []
# We treat the power levels override specially as this needs to be one # We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room. # of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
@ -1165,7 +1147,7 @@ class RoomCreationHandler:
EventTypes.PowerLevels, pl_content, False EventTypes.PowerLevels, pl_content, False
) )
current_state_group = power_context._state_group current_state_group = power_context._state_group
await send(power_event, power_context, creator) events_to_send.append((power_event, power_context))
else: else:
power_level_content: JsonDict = { power_level_content: JsonDict = {
"users": {creator_id: 100}, "users": {creator_id: 100},
@ -1214,9 +1196,8 @@ class RoomCreationHandler:
False, False,
) )
current_state_group = pl_context._state_group current_state_group = pl_context._state_group
await send(pl_event, pl_context, creator) events_to_send.append((pl_event, pl_context))
events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
room_alias_event, room_alias_context = await create_event( room_alias_event, room_alias_context = await create_event(
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True

View File

@ -165,8 +165,21 @@ class BulkPushRuleEvaluator:
return rules_by_user return rules_by_user
async def _get_power_levels_and_sender_level( async def _get_power_levels_and_sender_level(
self, event: EventBase, context: EventContext self,
event: EventBase,
context: EventContext,
event_id_to_event: Mapping[str, EventBase],
) -> Tuple[dict, Optional[int]]: ) -> Tuple[dict, Optional[int]]:
"""
Given an event and an event context, get the power level event relevant to the event
and the power level of the sender of the event.
Args:
event: event to check
context: context of event to check
event_id_to_event: a mapping of event_id to event for a set of events being
batch persisted. This is needed as the sought-after power level event may
be in this batch rather than the DB
"""
# There are no power levels and sender levels possible to get from outlier # There are no power levels and sender levels possible to get from outlier
if event.internal_metadata.is_outlier(): if event.internal_metadata.is_outlier():
return {}, None return {}, None
@ -177,15 +190,26 @@ class BulkPushRuleEvaluator:
) )
pl_event_id = prev_state_ids.get(POWER_KEY) pl_event_id = prev_state_ids.get(POWER_KEY)
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
if pl_event_id: if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and # Get the power level event from the batch, or fall back to the database.
# not having a power level event is an extreme edge case pl_event = event_id_to_event.get(pl_event_id)
auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)} if pl_event:
auth_events = {POWER_KEY: pl_event}
else:
auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
else: else:
auth_events_ids = self._event_auth_handler.compute_auth_events( auth_events_ids = self._event_auth_handler.compute_auth_events(
event, prev_state_ids, for_verification=False event, prev_state_ids, for_verification=False
) )
auth_events_dict = await self.store.get_events(auth_events_ids) auth_events_dict = await self.store.get_events(auth_events_ids)
# Some needed auth events might be in the batch, combine them with those
# fetched from the database.
for auth_event_id in auth_events_ids:
auth_event = event_id_to_event.get(auth_event_id)
if auth_event:
auth_events_dict[auth_event_id] = auth_event
auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()} auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}
sender_level = get_user_power_level(event.sender, auth_events) sender_level = get_user_power_level(event.sender, auth_events)
@ -194,16 +218,38 @@ class BulkPushRuleEvaluator:
return pl_event.content if pl_event else {}, sender_level return pl_event.content if pl_event else {}, sender_level
@measure_func("action_for_event_by_user") async def action_for_events_by_user(
async def action_for_event_by_user( self, events_and_context: List[Tuple[EventBase, EventContext]]
self, event: EventBase, context: EventContext
) -> None: ) -> None:
"""Given an event and context, evaluate the push rules, check if the message """Given a list of events and their associated contexts, evaluate the push rules
should increment the unread count, and insert the results into the for each event, check if the message should increment the unread count, and
event_push_actions_staging table. insert the results into the event_push_actions_staging table.
""" """
if not event.internal_metadata.is_notifiable(): # For batched events the power level events may not have been persisted yet,
# Push rules for events that aren't notifiable can't be processed by this # so we pass in the batched events. Thus if the event cannot be found in the
# database we can check in the batch.
event_id_to_event = {e.event_id: e for e, _ in events_and_context}
for event, context in events_and_context:
await self._action_for_event_by_user(event, context, event_id_to_event)
@measure_func("action_for_event_by_user")
async def _action_for_event_by_user(
self,
event: EventBase,
context: EventContext,
event_id_to_event: Mapping[str, EventBase],
) -> None:
if (
not event.internal_metadata.is_notifiable()
or event.internal_metadata.is_historical()
):
# Push rules for events that aren't notifiable can't be processed by this and
# we want to skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
return return
# Disable counting as unread unless the experimental configuration is # Disable counting as unread unless the experimental configuration is
@ -223,7 +269,9 @@ class BulkPushRuleEvaluator:
( (
power_levels, power_levels,
sender_power_level, sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context) ) = await self._get_power_levels_and_sender_level(
event, context, event_id_to_event
)
# Find the event's thread ID. # Find the event's thread ID.
relation = relation_from_event(event) relation = relation_from_event(event)

View File

@ -71,4 +71,4 @@ class TestBulkPushRuleEvaluator(unittest.HomeserverTestCase):
bulk_evaluator = BulkPushRuleEvaluator(self.hs) bulk_evaluator = BulkPushRuleEvaluator(self.hs)
# should not raise # should not raise
self.get_success(bulk_evaluator.action_for_event_by_user(event, context)) self.get_success(bulk_evaluator.action_for_events_by_user([(event, context)]))

View File

@ -371,7 +371,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
config=worker_hs.config.server.listeners[0], config=worker_hs.config.server.listeners[0],
resource=resource, resource=resource,
server_version_string="1", server_version_string="1",
max_request_body_size=4096, max_request_body_size=8192,
reactor=self.reactor, reactor=self.reactor,
) )