Add support for batch sending new events

This commit is contained in:
Tulir Asokan 2023-02-12 15:01:51 +02:00
parent a7bdc4a1ed
commit 4eddcf6653
6 changed files with 86 additions and 20 deletions

View File

@ -1360,6 +1360,7 @@ class EventCreationHandler:
ratelimit: bool = True, ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None, extra_users: Optional[List[UserID]] = None,
ignore_shadow_ban: bool = False, ignore_shadow_ban: bool = False,
dont_notify: bool = False,
) -> EventBase: ) -> EventBase:
"""Processes new events. Please note that if batch persisting events, an error in """Processes new events. Please note that if batch persisting events, an error in
handling any one of these events will result in all of the events being dropped. handling any one of these events will result in all of the events being dropped.
@ -1379,6 +1380,8 @@ class EventCreationHandler:
ignore_shadow_ban: True if shadow-banned users should be allowed to ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event. send this event.
dont_notify
Return: Return:
If the event was deduplicated, the previous, duplicate, event. Otherwise, If the event was deduplicated, the previous, duplicate, event. Otherwise,
`event`. `event`.
@ -1456,6 +1459,7 @@ class EventCreationHandler:
events_and_context=events_and_context, events_and_context=events_and_context,
ratelimit=ratelimit, ratelimit=ratelimit,
extra_users=extra_users, extra_users=extra_users,
dont_notify=dont_notify,
), ),
run_in_background( run_in_background(
self.cache_joined_hosts_for_events, events_and_context self.cache_joined_hosts_for_events, events_and_context
@ -1473,6 +1477,7 @@ class EventCreationHandler:
events_and_context: List[Tuple[EventBase, EventContext]], events_and_context: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True, ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None, extra_users: Optional[List[UserID]] = None,
dont_notify: bool = False,
) -> EventBase: ) -> EventBase:
"""Actually persists new events. Should only be called by """Actually persists new events. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of `handle_new_client_event`, and see its docstring for documentation of
@ -1502,6 +1507,7 @@ class EventCreationHandler:
requester=requester, requester=requester,
ratelimit=ratelimit, ratelimit=ratelimit,
extra_users=extra_users, extra_users=extra_users,
dont_notify=dont_notify,
) )
except SynapseError as e: except SynapseError as e:
if e.code == HTTPStatus.CONFLICT: if e.code == HTTPStatus.CONFLICT:
@ -1531,6 +1537,7 @@ class EventCreationHandler:
events_and_context, events_and_context,
ratelimit=ratelimit, ratelimit=ratelimit,
extra_users=extra_users, extra_users=extra_users,
dont_notify=dont_notify,
) )
return event return event
@ -1652,6 +1659,7 @@ class EventCreationHandler:
events_and_context: List[Tuple[EventBase, EventContext]], events_and_context: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True, ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None, extra_users: Optional[List[UserID]] = None,
dont_notify: bool = False,
) -> EventBase: ) -> EventBase:
"""Called when we have fully built the events, have already """Called when we have fully built the events, have already
calculated the push actions for the events, and checked auth. calculated the push actions for the events, and checked auth.
@ -1954,7 +1962,7 @@ class EventCreationHandler:
pos = PersistedEventPosition(self._instance_name, stream_ordering) pos = PersistedEventPosition(self._instance_name, stream_ordering)
events_and_pos.append((event, pos)) events_and_pos.append((event, pos))
if event.type == EventTypes.Message: if not dont_notify and event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This # We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while. # matters as sometimes presence code can take a while.
run_as_background_process( run_as_background_process(
@ -1969,7 +1977,10 @@ class EventCreationHandler:
except Exception: except Exception:
logger.exception("Error notifying about new room events") logger.exception("Error notifying about new room events")
run_in_background(_notify) if not dont_notify:
# Skip notifying clients, this is used for Beeper's custom
# batch sending of non-historical messages.
run_in_background(_notify)
return persisted_events[-1] return persisted_events[-1]

View File

@ -274,6 +274,8 @@ class RoomBatchHandler:
inherited_depth: int, inherited_depth: int,
initial_state_event_ids: List[str], initial_state_event_ids: List[str],
app_service_requester: Requester, app_service_requester: Requester,
beeper_new_messages: bool,
beeper_initial_prev_event_ids: List[str] = None,
) -> List[str]: ) -> List[str]:
"""Create and persists all events provided sequentially. Handles the """Create and persists all events provided sequentially. Handles the
complexity of creating events in chronological order so they can complexity of creating events in chronological order so they can
@ -293,21 +295,24 @@ class RoomBatchHandler:
the start of the historical batch since it's floating with no the start of the historical batch since it's floating with no
prev_events to derive state from automatically. prev_events to derive state from automatically.
app_service_requester: The requester of an application service. app_service_requester: The requester of an application service.
beeper_new_messages: Is this a batch of new events rather than history?
beeper_initial_prev_event_ids: prev_event_ids for the first event to send.
Returns: Returns:
List of persisted event IDs List of persisted event IDs
""" """
assert app_service_requester.app_service assert app_service_requester.app_service
# We expect the first event in a historical batch to be an insertion event if not beeper_new_messages:
assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION # We expect the first event in a historical batch to be an insertion event
# We expect the last event in a historical batch to be an batch event assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH # We expect the last event in a historical batch to be an batch event
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
# Make the historical event chain float off on its own by specifying no # Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to # prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later. # ask for the state at the start of the batch later.
prev_event_ids: List[str] = [] prev_event_ids: List[str] = beeper_initial_prev_event_ids or []
event_ids = [] event_ids = []
events_to_persist = [] events_to_persist = []
@ -338,14 +343,14 @@ class RoomBatchHandler:
# Only the first event (which is the insertion event) in the # Only the first event (which is the insertion event) in the
# chain should be floating. The rest should hang off each other # chain should be floating. The rest should hang off each other
# in a chain. # in a chain.
allow_no_prev_events=index == 0, allow_no_prev_events=index == 0 and not beeper_new_messages,
prev_event_ids=event_dict.get("prev_events"), prev_event_ids=event_dict.get("prev_events"),
# Since the first event (which is the insertion event) in the # Since the first event (which is the insertion event) in the
# chain is floating with no `prev_events`, it can't derive state # chain is floating with no `prev_events`, it can't derive state
# from anywhere automatically. So we need to set some state # from anywhere automatically. So we need to set some state
# explicitly. # explicitly.
state_event_ids=initial_state_event_ids if index == 0 else None, state_event_ids=initial_state_event_ids if index == 0 else None,
historical=True, historical=not beeper_new_messages,
depth=inherited_depth, depth=inherited_depth,
) )
context = await unpersisted_context.persist(event) context = await unpersisted_context.persist(event)
@ -373,6 +378,18 @@ class RoomBatchHandler:
event_ids.append(event_id) event_ids.append(event_id)
prev_event_ids = [event_id] prev_event_ids = [event_id]
if beeper_new_messages:
for index, (event, context) in enumerate(events_to_persist):
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event.sender, app_service_requester.app_service
),
event=event,
context=context,
dont_notify=index < len(events_to_persist) - 1,
)
return event_ids
# Persist events in reverse-chronological order so they have the # Persist events in reverse-chronological order so they have the
# correct stream_ordering as they are backfilled (which decrements). # correct stream_ordering as they are backfilled (which decrements).
# Events are sorted by (topological_ordering, stream_ordering) # Events are sorted by (topological_ordering, stream_ordering)
@ -397,6 +414,8 @@ class RoomBatchHandler:
inherited_depth: int, inherited_depth: int,
initial_state_event_ids: List[str], initial_state_event_ids: List[str],
app_service_requester: Requester, app_service_requester: Requester,
beeper_new_messages: bool,
beeper_initial_prev_event_ids: List[str] = None,
) -> Tuple[List[str], str]: ) -> Tuple[List[str], str]:
""" """
Handles creating and persisting all of the historical events as well as Handles creating and persisting all of the historical events as well as
@ -418,6 +437,8 @@ class RoomBatchHandler:
`/batch_send?prev_event_id=$abc` plus the outcome of `/batch_send?prev_event_id=$abc` plus the outcome of
`persist_state_events_at_start` `persist_state_events_at_start`
app_service_requester: The requester of an application service. app_service_requester: The requester of an application service.
beeper_new_messages: Is this a batch of new events rather than history?
beeper_initial_prev_event_ids: prev_event_ids for the first event to send.
Returns: Returns:
Tuple containing a list of created events and the next_batch_id Tuple containing a list of created events and the next_batch_id
@ -438,8 +459,9 @@ class RoomBatchHandler:
# the last event we're inserting # the last event we're inserting
"origin_server_ts": last_event_in_batch["origin_server_ts"], "origin_server_ts": last_event_in_batch["origin_server_ts"],
} }
# Add the batch event to the end of the batch (newest-in-time) if not beeper_new_messages:
events_to_create.append(batch_event) # Add the batch event to the end of the batch (newest-in-time)
events_to_create.append(batch_event)
# Add an "insertion" event to the start of each batch (next to the oldest-in-time # Add an "insertion" event to the start of each batch (next to the oldest-in-time
# event in the batch) so the next batch can be connected to this one. # event in the batch) so the next batch can be connected to this one.
@ -454,8 +476,9 @@ class RoomBatchHandler:
next_batch_id = insertion_event["content"][ next_batch_id = insertion_event["content"][
EventContentFields.MSC2716_NEXT_BATCH_ID EventContentFields.MSC2716_NEXT_BATCH_ID
] ]
# Prepend the insertion event to the start of the batch (oldest-in-time) if not beeper_new_messages:
events_to_create = [insertion_event] + events_to_create # Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create
# Create and persist all of the historical events # Create and persist all of the historical events
event_ids = await self.persist_historical_events( event_ids = await self.persist_historical_events(
@ -464,6 +487,8 @@ class RoomBatchHandler:
inherited_depth=inherited_depth, inherited_depth=inherited_depth,
initial_state_event_ids=initial_state_event_ids, initial_state_event_ids=initial_state_event_ids,
app_service_requester=app_service_requester, app_service_requester=app_service_requester,
beeper_new_messages=beeper_new_messages,
beeper_initial_prev_event_ids=beeper_initial_prev_event_ids,
) )
return event_ids, next_batch_id return event_ids, next_batch_id

View File

@ -332,6 +332,7 @@ class BulkPushRuleEvaluator:
if ( if (
not event.internal_metadata.is_notifiable() not event.internal_metadata.is_notifiable()
or event.internal_metadata.is_historical() or event.internal_metadata.is_historical()
or event.content.get(EventContentFields.MSC2716_HISTORICAL)
): ):
# Push rules for events that aren't notifiable can't be processed by this and # Push rules for events that aren't notifiable can't be processed by this and
# we want to skip push notification actions for historical messages # we want to skip push notification actions for historical messages

View File

@ -84,6 +84,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
requester: Requester, requester: Requester,
ratelimit: bool, ratelimit: bool,
extra_users: List[UserID], extra_users: List[UserID],
dont_notify: bool,
) -> JsonDict: ) -> JsonDict:
""" """
Args: Args:
@ -94,6 +95,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
context context
ratelimit ratelimit
extra_users: Any extra users to notify about event extra_users: Any extra users to notify about event
dont_notify
""" """
serialized_context = await context.serialize(event, store) serialized_context = await context.serialize(event, store)
@ -108,6 +110,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"requester": requester.serialize(), "requester": requester.serialize(),
"ratelimit": ratelimit, "ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users], "extra_users": [u.to_string() for u in extra_users],
"dont_notify": dont_notify,
} }
return payload return payload
@ -133,13 +136,18 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit = content["ratelimit"] ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]] extra_users = [UserID.from_string(u) for u in content["extra_users"]]
dont_notify = content["dont_notify"]
logger.info( logger.info(
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
) )
event = await self.event_creation_handler.persist_and_notify_client_events( event = await self.event_creation_handler.persist_and_notify_client_events(
requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users requester,
[(event, context)],
ratelimit=ratelimit,
extra_users=extra_users,
dont_notify=dont_notify,
) )
return ( return (

View File

@ -82,6 +82,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
requester: Requester, requester: Requester,
ratelimit: bool, ratelimit: bool,
extra_users: List[UserID], extra_users: List[UserID],
dont_notify: bool,
) -> JsonDict: ) -> JsonDict:
""" """
Args: Args:
@ -108,7 +109,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
} }
serialized_events.append(serialized_event) serialized_events.append(serialized_event)
payload = {"events": serialized_events} payload = {"events": serialized_events, "dont_notify": dont_notify}
return payload return payload
@ -118,6 +119,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
with Measure(self.clock, "repl_send_events_parse"): with Measure(self.clock, "repl_send_events_parse"):
events_and_context = [] events_and_context = []
events = payload["events"] events = payload["events"]
dont_notify = payload["dont_notify"]
for event_payload in events: for event_payload in events:
event_dict = event_payload["event"] event_dict = event_payload["event"]
@ -152,7 +154,11 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
last_event = ( last_event = (
await self.event_creation_handler.persist_and_notify_client_events( await self.event_creation_handler.persist_and_notify_client_events(
requester, events_and_context, ratelimit, extra_users requester,
events_and_context,
ratelimit,
extra_users,
dont_notify=dont_notify,
) )
) )

View File

@ -26,6 +26,7 @@ from synapse.http.servlet import (
parse_json_object_from_request, parse_json_object_from_request,
parse_string, parse_string,
parse_strings_from_args, parse_strings_from_args,
parse_boolean_from_args,
) )
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.types import JsonDict from synapse.types import JsonDict
@ -96,6 +97,9 @@ class RoomBatchSendEventRestServlet(RestServlet):
request.args, "prev_event_id" request.args, "prev_event_id"
) )
batch_id_from_query = parse_string(request, "batch_id") batch_id_from_query = parse_string(request, "batch_id")
beeper_new_messages = parse_boolean_from_args(
request.args, "com.beeper.new_messages"
)
if prev_event_ids_from_query is None: if prev_event_ids_from_query is None:
raise SynapseError( raise SynapseError(
@ -151,7 +155,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
# Create and persist all of the state events that float off on their own # Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member # before the batch. These will most likely be all of the invite/member
# state events used to auth the upcoming historical messages. # state events used to auth the upcoming historical messages.
if body["state_events_at_start"]: if body["state_events_at_start"] and not beeper_new_messages:
state_event_ids_at_start = ( state_event_ids_at_start = (
await self.room_batch_handler.persist_state_events_at_start( await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"], state_events_at_start=body["state_events_at_start"],
@ -177,6 +181,8 @@ class RoomBatchSendEventRestServlet(RestServlet):
base_insertion_event = None base_insertion_event = None
if batch_id_from_query: if batch_id_from_query:
batch_id_to_connect_to = batch_id_from_query batch_id_to_connect_to = batch_id_from_query
elif beeper_new_messages:
batch_id_to_connect_to = None
# Otherwise, create an insertion event to act as a starting point. # Otherwise, create an insertion event to act as a starting point.
# #
# We don't always have an insertion event to start hanging more history # We don't always have an insertion event to start hanging more history
@ -227,11 +233,20 @@ class RoomBatchSendEventRestServlet(RestServlet):
inherited_depth=inherited_depth, inherited_depth=inherited_depth,
initial_state_event_ids=state_event_ids, initial_state_event_ids=state_event_ids,
app_service_requester=requester, app_service_requester=requester,
beeper_new_messages=beeper_new_messages,
beeper_initial_prev_event_ids=prev_event_ids_from_query
if beeper_new_messages
else None,
) )
insertion_event_id = event_ids[0] if beeper_new_messages:
batch_event_id = event_ids[-1] insertion_event_id = batch_event_id = None
historical_event_ids = event_ids[1:-1] historical_event_ids = event_ids
next_batch_id = None
else:
insertion_event_id = event_ids[0]
batch_event_id = event_ids[-1]
historical_event_ids = event_ids[1:-1]
response_dict = { response_dict = {
"state_event_ids": state_event_ids_at_start, "state_event_ids": state_event_ids_at_start,