Batch up notifications after event persistence (#14033)

This commit is contained in:
Shay 2022-10-05 10:12:48 -07:00 committed by GitHub
parent 51436c8dd5
commit 7b7478e8b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 58 deletions

1
changelog.d/14033.misc Normal file
View File

@ -0,0 +1 @@
Don't repeatedly wake up the same users for batched events.

View File

@ -2240,8 +2240,8 @@ class FederationEventHandler:
event_pos = PersistedEventPosition( event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering self._instance_name, event.internal_metadata.stream_ordering
) )
await self._notifier.on_new_room_event( await self._notifier.on_new_room_events(
event, event_pos, max_stream_token, extra_users=extra_users [(event, event_pos)], max_stream_token, extra_users=extra_users
) )
if event.type == EventTypes.Member and event.membership == Membership.JOIN: if event.type == EventTypes.Member and event.membership == Membership.JOIN:

View File

@ -1872,6 +1872,7 @@ class EventCreationHandler:
events_and_context, backfilled=backfilled events_and_context, backfilled=backfilled
) )
events_and_pos = []
for event in persisted_events: for event in persisted_events:
if self._ephemeral_events_enabled: if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry. # If there's an expiry timestamp on the event, schedule its expiry.
@ -1880,25 +1881,23 @@ class EventCreationHandler:
stream_ordering = event.internal_metadata.stream_ordering stream_ordering = event.internal_metadata.stream_ordering
assert stream_ordering is not None assert stream_ordering is not None
pos = PersistedEventPosition(self._instance_name, stream_ordering) pos = PersistedEventPosition(self._instance_name, stream_ordering)
events_and_pos.append((event, pos))
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
if event.type == EventTypes.Message: if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This # We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while. # matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user) run_in_background(self._bump_active_time, requester.user)
async def _notify() -> None:
try:
await self.notifier.on_new_room_events(
events_and_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room events")
run_in_background(_notify)
return persisted_events[-1] return persisted_events[-1]
async def _maybe_kick_guest_users( async def _maybe_kick_guest_users(

View File

@ -294,35 +294,31 @@ class Notifier:
""" """
self._new_join_in_room_callbacks.append(cb) self._new_join_in_room_callbacks.append(cb)
async def on_new_room_event( async def on_new_room_events(
self, self,
event: EventBase, events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken, max_room_stream_token: RoomStreamToken,
extra_users: Optional[Collection[UserID]] = None, extra_users: Optional[Collection[UserID]] = None,
) -> None: ) -> None:
"""Unwraps event and calls `on_new_room_event_args`.""" """Creates a _PendingRoomEventEntry for each of the listed events and calls
await self.on_new_room_event_args( notify_new_room_events with the results."""
event_pos=event_pos, event_entries = []
room_id=event.room_id, for event, pos in events_and_pos:
event_id=event.event_id, entry = self.create_pending_room_event_entry(
event_type=event.type, pos,
state_key=event.get("state_key"), extra_users,
membership=event.content.get("membership"), event.room_id,
max_room_stream_token=max_room_stream_token, event.type,
extra_users=extra_users or [], event.get("state_key"),
) event.content.get("membership"),
)
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)
async def on_new_room_event_args( async def notify_new_room_events(
self, self,
room_id: str, event_entries: List[Tuple[_PendingRoomEventEntry, str]],
event_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken, max_room_stream_token: RoomStreamToken,
extra_users: Optional[Collection[UserID]] = None,
) -> None: ) -> None:
"""Used by handlers to inform the notifier something has happened """Used by handlers to inform the notifier something has happened
in the room, room event wise. in the room, room event wise.
@ -338,22 +334,33 @@ class Notifier:
until all previous events have been persisted before notifying until all previous events have been persisted before notifying
the client streams. the client streams.
""" """
self.pending_new_room_events.append( for event_entry, event_id in event_entries:
_PendingRoomEventEntry( self.pending_new_room_events.append(event_entry)
event_pos=event_pos, await self._third_party_rules.on_new_event(event_id)
extra_users=extra_users or [],
room_id=room_id,
type=event_type,
state_key=state_key,
membership=membership,
)
)
self._notify_pending_new_room_events(max_room_stream_token) self._notify_pending_new_room_events(max_room_stream_token)
await self._third_party_rules.on_new_event(event_id)
self.notify_replication() self.notify_replication()
def create_pending_room_event_entry(
self,
event_pos: PersistedEventPosition,
extra_users: Optional[Collection[UserID]],
room_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
) -> _PendingRoomEventEntry:
"""Creates and returns a _PendingRoomEventEntry"""
return _PendingRoomEventEntry(
event_pos=event_pos,
extra_users=extra_users or [],
room_id=room_id,
type=event_type,
state_key=state_key,
membership=membership,
)
def _notify_pending_new_room_events( def _notify_pending_new_room_events(
self, max_room_stream_token: RoomStreamToken self, max_room_stream_token: RoomStreamToken
) -> None: ) -> None:

View File

@ -210,15 +210,16 @@ class ReplicationDataHandler:
max_token = self.store.get_room_max_token() max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token) event_pos = PersistedEventPosition(instance_name, token)
await self.notifier.on_new_room_event_args( event_entry = self.notifier.create_pending_room_event_entry(
event_pos=event_pos, event_pos,
max_room_stream_token=max_token, extra_users,
extra_users=extra_users, row.data.room_id,
room_id=row.data.room_id, row.data.type,
event_id=row.data.event_id, row.data.state_key,
event_type=row.data.type, row.data.membership,
state_key=row.data.state_key, )
membership=row.data.membership, await self.notifier.notify_new_room_events(
[(event_entry, row.data.event_id)], max_token
) )
# If this event is a join, make a note of it so we have an accurate # If this event is a join, make a note of it so we have an accurate