Add support for batch sending new events

This commit is contained in:
Tulir Asokan 2023-02-12 15:01:51 +02:00
parent bad102a762
commit de58f07338
6 changed files with 86 additions and 21 deletions

View File

@ -1345,6 +1345,7 @@ class EventCreationHandler:
ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None,
ignore_shadow_ban: bool = False,
dont_notify: bool = False,
) -> EventBase:
"""Processes new events. Please note that if batch persisting events, an error in
handling any one of these events will result in all of the events being dropped.
@ -1364,6 +1365,8 @@ class EventCreationHandler:
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
dont_notify
Return:
If the event was deduplicated, the previous, duplicate, event. Otherwise,
`event`.
@ -1441,6 +1444,7 @@ class EventCreationHandler:
events_and_context=events_and_context,
ratelimit=ratelimit,
extra_users=extra_users,
dont_notify=dont_notify,
),
run_in_background(
self.cache_joined_hosts_for_events, events_and_context
@ -1458,6 +1462,7 @@ class EventCreationHandler:
events_and_context: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None,
dont_notify: bool = False,
) -> EventBase:
"""Actually persists new events. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of
@ -1487,6 +1492,7 @@ class EventCreationHandler:
requester=requester,
ratelimit=ratelimit,
extra_users=extra_users,
dont_notify=dont_notify,
)
except SynapseError as e:
if e.code == HTTPStatus.CONFLICT:
@ -1516,6 +1522,7 @@ class EventCreationHandler:
events_and_context,
ratelimit=ratelimit,
extra_users=extra_users,
dont_notify=dont_notify,
)
return event
@ -1637,6 +1644,7 @@ class EventCreationHandler:
events_and_context: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None,
dont_notify: bool = False,
) -> EventBase:
"""Called when we have fully built the events, have already
calculated the push actions for the events, and checked auth.
@ -1939,7 +1947,7 @@ class EventCreationHandler:
pos = PersistedEventPosition(self._instance_name, stream_ordering)
events_and_pos.append((event, pos))
if event.type == EventTypes.Message:
if not dont_notify and event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
@ -1954,7 +1962,10 @@ class EventCreationHandler:
except Exception:
logger.exception("Error notifying about new room events")
run_in_background(_notify)
if not dont_notify:
# Skip notifying clients, this is used for Beeper's custom
# batch sending of non-historical messages.
run_in_background(_notify)
return persisted_events[-1]

View File

@ -274,6 +274,8 @@ class RoomBatchHandler:
inherited_depth: int,
initial_state_event_ids: List[str],
app_service_requester: Requester,
beeper_new_messages: bool,
beeper_initial_prev_event_ids: List[str] = None,
) -> List[str]:
"""Create and persists all events provided sequentially. Handles the
complexity of creating events in chronological order so they can
@ -293,21 +295,24 @@ class RoomBatchHandler:
the start of the historical batch since it's floating with no
prev_events to derive state from automatically.
app_service_requester: The requester of an application service.
beeper_new_messages: Is this a batch of new events rather than history?
beeper_initial_prev_event_ids: prev_event_ids for the first event to send.
Returns:
List of persisted event IDs
"""
assert app_service_requester.app_service
# We expect the first event in a historical batch to be an insertion event
assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
# We expect the last event in a historical batch to be an batch event
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
if not beeper_new_messages:
# We expect the first event in a historical batch to be an insertion event
assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
# We expect the last event in a historical batch to be an batch event
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []
prev_event_ids: List[str] = beeper_initial_prev_event_ids or []
event_ids = []
events_to_persist = []
@ -338,14 +343,14 @@ class RoomBatchHandler:
# Only the first event (which is the insertion event) in the
# chain should be floating. The rest should hang off each other
# in a chain.
allow_no_prev_events=index == 0,
allow_no_prev_events=index == 0 and not beeper_new_messages,
prev_event_ids=event_dict.get("prev_events"),
# Since the first event (which is the insertion event) in the
# chain is floating with no `prev_events`, it can't derive state
# from anywhere automatically. So we need to set some state
# explicitly.
state_event_ids=initial_state_event_ids if index == 0 else None,
historical=True,
historical=not beeper_new_messages,
depth=inherited_depth,
)
@ -373,6 +378,18 @@ class RoomBatchHandler:
event_ids.append(event_id)
prev_event_ids = [event_id]
if beeper_new_messages:
for index, (event, context) in enumerate(events_to_persist):
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event.sender, app_service_requester.app_service
),
event=event,
context=context,
dont_notify=index < len(events_to_persist) - 1,
)
return event_ids
# Persist events in reverse-chronological order so they have the
# correct stream_ordering as they are backfilled (which decrements).
# Events are sorted by (topological_ordering, stream_ordering)
@ -397,6 +414,8 @@ class RoomBatchHandler:
inherited_depth: int,
initial_state_event_ids: List[str],
app_service_requester: Requester,
beeper_new_messages: bool,
beeper_initial_prev_event_ids: List[str] = None,
) -> Tuple[List[str], str]:
"""
Handles creating and persisting all of the historical events as well as
@ -418,6 +437,8 @@ class RoomBatchHandler:
`/batch_send?prev_event_id=$abc` plus the outcome of
`persist_state_events_at_start`
app_service_requester: The requester of an application service.
beeper_new_messages: Is this a batch of new events rather than history?
beeper_initial_prev_event_ids: prev_event_ids for the first event to send.
Returns:
Tuple containing a list of created events and the next_batch_id
@ -438,8 +459,9 @@ class RoomBatchHandler:
# the last event we're inserting
"origin_server_ts": last_event_in_batch["origin_server_ts"],
}
# Add the batch event to the end of the batch (newest-in-time)
events_to_create.append(batch_event)
if not beeper_new_messages:
# Add the batch event to the end of the batch (newest-in-time)
events_to_create.append(batch_event)
# Add an "insertion" event to the start of each batch (next to the oldest-in-time
# event in the batch) so the next batch can be connected to this one.
@ -454,8 +476,9 @@ class RoomBatchHandler:
next_batch_id = insertion_event["content"][
EventContentFields.MSC2716_NEXT_BATCH_ID
]
# Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create
if not beeper_new_messages:
# Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create
# Create and persist all of the historical events
event_ids = await self.persist_historical_events(
@ -464,6 +487,8 @@ class RoomBatchHandler:
inherited_depth=inherited_depth,
initial_state_event_ids=initial_state_event_ids,
app_service_requester=app_service_requester,
beeper_new_messages=beeper_new_messages,
beeper_initial_prev_event_ids=beeper_initial_prev_event_ids,
)
return event_ids, next_batch_id

View File

@ -321,10 +321,10 @@ class BulkPushRuleEvaluator:
context: EventContext,
event_id_to_event: Mapping[str, EventBase],
) -> None:
if (
not event.internal_metadata.is_notifiable()
or event.internal_metadata.is_historical()
or event.content.get(EventContentFields.MSC2716_HISTORICAL)
):
# Push rules for events that aren't notifiable can't be processed by this and
# we want to skip push notification actions for historical messages

View File

@ -84,6 +84,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
requester: Requester,
ratelimit: bool,
extra_users: List[UserID],
dont_notify: bool,
) -> JsonDict:
"""
Args:
@ -94,6 +95,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
context
ratelimit
extra_users: Any extra users to notify about event
dont_notify
"""
serialized_context = await context.serialize(event, store)
@ -108,6 +110,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
"dont_notify": dont_notify,
}
return payload
@ -133,13 +136,18 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
dont_notify = content["dont_notify"]
logger.info(
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
event = await self.event_creation_handler.persist_and_notify_client_events(
requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users
requester,
[(event, context)],
ratelimit=ratelimit,
extra_users=extra_users,
dont_notify=dont_notify,
)
return (

View File

@ -82,6 +82,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
requester: Requester,
ratelimit: bool,
extra_users: List[UserID],
dont_notify: bool,
) -> JsonDict:
"""
Args:
@ -108,7 +109,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
}
serialized_events.append(serialized_event)
payload = {"events": serialized_events}
payload = {"events": serialized_events, "dont_notify": dont_notify}
return payload
@ -118,6 +119,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
with Measure(self.clock, "repl_send_events_parse"):
events_and_context = []
events = payload["events"]
dont_notify = payload["dont_notify"]
for event_payload in events:
event_dict = event_payload["event"]
@ -152,7 +154,11 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
last_event = (
await self.event_creation_handler.persist_and_notify_client_events(
requester, events_and_context, ratelimit, extra_users
requester,
events_and_context,
ratelimit,
extra_users,
dont_notify=dont_notify,
)
)

View File

@ -28,6 +28,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
parse_strings_from_args,
parse_boolean_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.rest.client.transactions import HttpTransactionCache
@ -100,6 +101,9 @@ class RoomBatchSendEventRestServlet(RestServlet):
request.args, "prev_event_id"
)
batch_id_from_query = parse_string(request, "batch_id")
beeper_new_messages = parse_boolean_from_args(
request.args, "com.beeper.new_messages"
)
if prev_event_ids_from_query is None:
raise SynapseError(
@ -155,7 +159,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
# Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member
# state events used to auth the upcoming historical messages.
if body["state_events_at_start"]:
if body["state_events_at_start"] and not beeper_new_messages:
state_event_ids_at_start = (
await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
@ -181,6 +185,8 @@ class RoomBatchSendEventRestServlet(RestServlet):
base_insertion_event = None
if batch_id_from_query:
batch_id_to_connect_to = batch_id_from_query
elif beeper_new_messages:
batch_id_to_connect_to = None
# Otherwise, create an insertion event to act as a starting point.
#
# We don't always have an insertion event to start hanging more history
@ -231,11 +237,20 @@ class RoomBatchSendEventRestServlet(RestServlet):
inherited_depth=inherited_depth,
initial_state_event_ids=state_event_ids,
app_service_requester=requester,
beeper_new_messages=beeper_new_messages,
beeper_initial_prev_event_ids=prev_event_ids_from_query
if beeper_new_messages
else None,
)
insertion_event_id = event_ids[0]
batch_event_id = event_ids[-1]
historical_event_ids = event_ids[1:-1]
if beeper_new_messages:
insertion_event_id = batch_event_id = None
historical_event_ids = event_ids
next_batch_id = None
else:
insertion_event_id = event_ids[0]
batch_event_id = event_ids[-1]
historical_event_ids = event_ids[1:-1]
response_dict = {
"state_event_ids": state_event_ids_at_start,