Merge remote-tracking branch 'upstream/release-v1.70'

This commit is contained in:
Tulir Asokan 2022-10-19 17:24:11 +03:00
commit c3b3895da4
142 changed files with 4896 additions and 2015 deletions

View file

@ -1367,8 +1367,16 @@ class EventCreationHandler:
else:
try:
validate_event_for_room_version(event)
# If we are persisting a batch of events the event(s) needed to auth the
# current event may be part of the batch and will not be in the DB yet
event_id_to_event = {e.event_id: e for e, _ in events_and_context}
batched_auth_events = {}
for event_id in event.auth_event_ids():
auth_event = event_id_to_event.get(event_id)
if auth_event:
batched_auth_events[event_id] = auth_event
await self._event_auth_handler.check_auth_rules_from_context(
event, context
event, batched_auth_events
)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
@ -1398,7 +1406,7 @@ class EventCreationHandler:
dont_notify=dont_notify,
),
run_in_background(
self.cache_joined_hosts_for_event, event, context
self.cache_joined_hosts_for_events, events_and_context
).addErrback(
log_failure, "cache_joined_hosts_for_event failed"
),
@ -1502,62 +1510,65 @@ class EventCreationHandler:
await self.store.remove_push_actions_from_staging(event.event_id)
raise
async def cache_joined_hosts_for_event(
self, event: EventBase, context: EventContext
async def cache_joined_hosts_for_events(
self, events_and_context: List[Tuple[EventBase, EventContext]]
) -> None:
"""Precalculate the joined hosts at the event, when using Redis, so that
"""Precalculate the joined hosts at each of the given events, when using Redis, so that
external federation senders don't have to recalculate it themselves.
"""
if not self._external_cache.is_enabled():
return
# If external cache is enabled we should always have this.
assert self._external_cache_joined_hosts_updates is not None
# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
#
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
# Note: We set the state group -> joined hosts cache if it hasn't been
# set for a while, so that the expiry time is reset.
state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
)
if state_entry.state_group:
await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
state_entry.state_group,
expiry_ms=60 * 60 * 1000,
)
if state_entry.state_group in self._external_cache_joined_hosts_updates:
for event, _ in events_and_context:
if not self._external_cache.is_enabled():
return
state = await state_entry.get_state(
self._storage_controllers.state, StateFilter.all()
# If external cache is enabled we should always have this.
assert self._external_cache_joined_hosts_updates is not None
# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
#
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
# Note: We set the state group -> joined hosts cache if it hasn't been
# set for a while, so that the expiry time is reset.
state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
)
with opentracing.start_active_span("get_joined_hosts"):
joined_hosts = await self.store.get_joined_hosts(
event.room_id, state, state_entry
if state_entry.state_group:
await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
state_entry.state_group,
expiry_ms=60 * 60 * 1000,
)
# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
await self._external_cache.set(
"get_joined_hosts",
str(state_entry.state_group),
list(joined_hosts),
expiry_ms=60 * 60 * 1000,
)
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return
self._external_cache_joined_hosts_updates[state_entry.state_group] = None
state = await state_entry.get_state(
self._storage_controllers.state, StateFilter.all()
)
with opentracing.start_active_span("get_joined_hosts"):
joined_hosts = await self.store.get_joined_hosts(
event.room_id, state, state_entry
)
# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
await self._external_cache.set(
"get_joined_hosts",
str(state_entry.state_group),
list(joined_hosts),
expiry_ms=60 * 60 * 1000,
)
self._external_cache_joined_hosts_updates[
state_entry.state_group
] = None
async def _validate_canonical_alias(
self,
@ -1885,6 +1896,7 @@ class EventCreationHandler:
events_and_context, backfilled=backfilled
)
events_and_pos = []
for event in persisted_events:
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
@ -1893,27 +1905,25 @@ class EventCreationHandler:
stream_ordering = event.internal_metadata.stream_ordering
assert stream_ordering is not None
pos = PersistedEventPosition(self._instance_name, stream_ordering)
events_and_pos.append((event, pos))
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
if not dont_notify and event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
if not dont_notify:
# Skip notifying clients, this is used for Beeper's custom
# batch sending of non-historical messages.
run_in_background(_notify)
async def _notify() -> None:
try:
await self.notifier.on_new_room_events(
events_and_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room events")
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
if not dont_notify:
# Skip notifying clients, this is used for Beeper's custom
# batch sending of non-historical messages.
run_in_background(_notify)
return persisted_events[-1]