From 84d583f267ba47726b285750ee9f3c0147df4cff Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 19 May 2022 11:19:02 +0300 Subject: [PATCH] Hack together a query param for batch sending non-historical events --- synapse/handlers/message.py | 16 +++++++-- synapse/handlers/room_batch.py | 47 ++++++++++++++++++++------ synapse/replication/http/send_event.py | 6 +++- synapse/rest/client/room_batch.py | 19 ++++++++--- 4 files changed, 70 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 329b671cb..c0670d87b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1115,6 +1115,7 @@ class EventCreationHandler: ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ignore_shadow_ban: bool = False, + dont_notify: bool = False, ) -> EventBase: """Processes a new event. @@ -1134,6 +1135,8 @@ class EventCreationHandler: ignore_shadow_ban: True if shadow-banned users should be allowed to send this event. + dont_notify + Return: If the event was deduplicated, the previous, duplicate, event. Otherwise, `event`. @@ -1217,6 +1220,7 @@ class EventCreationHandler: context=context, ratelimit=ratelimit, extra_users=extra_users, + dont_notify=dont_notify, ), run_in_background( self.cache_joined_hosts_for_event, event, context @@ -1235,6 +1239,7 @@ class EventCreationHandler: context: EventContext, ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, + dont_notify: bool = False, ) -> EventBase: """Actually persists the event. Should only be called by `handle_new_client_event`, and see its docstring for documentation of @@ -1246,7 +1251,7 @@ class EventCreationHandler: # The historical messages also do not have the proper `context.current_state_ids` # and `state_groups` because they have `prev_events` that aren't persisted yet # (historical messages persisted in reverse-chronological order). - if not event.internal_metadata.is_historical(): + if not event.internal_metadata.is_historical() and not event.content.get(EventContentFields.MSC2716_HISTORICAL): await self.action_generator.handle_push_actions_for_event(event, context) try: @@ -1262,6 +1267,7 @@ class EventCreationHandler: context=context, ratelimit=ratelimit, extra_users=extra_users, + dont_notify=dont_notify, ) stream_id = result["stream_id"] event_id = result["event_id"] @@ -1278,7 +1284,7 @@ class EventCreationHandler: return event event = await self.persist_and_notify_client_event( - requester, event, context, ratelimit=ratelimit, extra_users=extra_users + requester, event, context, ratelimit=ratelimit, extra_users=extra_users, dont_notify=dont_notify, ) return event @@ -1380,6 +1386,7 @@ class EventCreationHandler: context: EventContext, ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, + dont_notify: bool = False, ) -> EventBase: """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. @@ -1634,6 +1641,11 @@ class EventCreationHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) + if dont_notify: + # Skip notifying clients, this is used for Beeper's custom + # batch sending of non-historical messages. + return event + async def _notify() -> None: try: await self.notifier.on_new_room_event( diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 1c4eca5a4..2b361377f 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -271,6 +271,8 @@ class RoomBatchHandler: inherited_depth: int, initial_state_event_ids: List[str], app_service_requester: Requester, + beeper_new_messages: bool, + beeper_initial_prev_event_ids: List[str] = None, ) -> List[str]: """Create and persists all events provided sequentially. Handles the complexity of creating events in chronological order so they can @@ -290,21 +292,24 @@ class RoomBatchHandler: the start of the historical batch since it's floating with no prev_events to derive state from automatically. app_service_requester: The requester of an application service. + beeper_new_messages: Is this a batch of new events rather than history? + beeper_initial_prev_event_ids: prev_event_ids for the first event to send. Returns: List of persisted event IDs """ assert app_service_requester.app_service - # We expect the first event in a historical batch to be an insertion event - assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION - # We expect the last event in a historical batch to be an batch event - assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH + if not beeper_new_messages: + # We expect the first event in a historical batch to be an insertion event + assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION + # We expect the last event in a historical batch to be an batch event + assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH # Make the historical event chain float off on its own by specifying no # prev_events for the first event in the chain which causes the HS to # ask for the state at the start of the batch later. - prev_event_ids: List[str] = [] + prev_event_ids: List[str] = beeper_initial_prev_event_ids or [] event_ids = [] events_to_persist = [] @@ -335,14 +340,14 @@ class RoomBatchHandler: # Only the first event (which is the insertion event) in the # chain should be floating. The rest should hang off each other # in a chain. - allow_no_prev_events=index == 0, + allow_no_prev_events=index == 0 and not beeper_new_messages, prev_event_ids=event_dict.get("prev_events"), # Since the first event (which is the insertion event) in the # chain is floating with no `prev_events`, it can't derive state # from anywhere automatically. So we need to set some state # explicitly. state_event_ids=initial_state_event_ids if index == 0 else None, - historical=True, + historical=not beeper_new_messages, depth=inherited_depth, ) @@ -370,6 +375,18 @@ class RoomBatchHandler: event_ids.append(event_id) prev_event_ids = [event_id] + if beeper_new_messages: + for index, (event, context) in enumerate(events_to_persist): + await self.event_creation_handler.handle_new_client_event( + await self.create_requester_for_user_id_from_app_service( + event.sender, app_service_requester.app_service + ), + event=event, + context=context, + dont_notify=index < len(events_to_persist) - 1 + ) + return event_ids + # Persist events in reverse-chronological order so they have the # correct stream_ordering as they are backfilled (which decrements). # Events are sorted by (topological_ordering, stream_ordering) @@ -393,6 +410,8 @@ class RoomBatchHandler: inherited_depth: int, initial_state_event_ids: List[str], app_service_requester: Requester, + beeper_new_messages: bool, + beeper_initial_prev_event_ids: List[str] = None, ) -> Tuple[List[str], str]: """ Handles creating and persisting all of the historical events as well as @@ -414,6 +433,8 @@ class RoomBatchHandler: `/batch_send?prev_event_id=$abc` plus the outcome of `persist_state_events_at_start` app_service_requester: The requester of an application service. + beeper_new_messages: Is this a batch of new events rather than history? + beeper_initial_prev_event_ids: prev_event_ids for the first event to send. Returns: Tuple containing a list of created events and the next_batch_id @@ -434,8 +455,9 @@ class RoomBatchHandler: # the last event we're inserting "origin_server_ts": last_event_in_batch["origin_server_ts"], } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) + if not beeper_new_messages: + # Add the batch event to the end of the batch (newest-in-time) + events_to_create.append(batch_event) # Add an "insertion" event to the start of each batch (next to the oldest-in-time # event in the batch) so the next batch can be connected to this one. @@ -450,8 +472,9 @@ class RoomBatchHandler: next_batch_id = insertion_event["content"][ EventContentFields.MSC2716_NEXT_BATCH_ID ] - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create + if not beeper_new_messages: + # Prepend the insertion event to the start of the batch (oldest-in-time) + events_to_create = [insertion_event] + events_to_create # Create and persist all of the historical events event_ids = await self.persist_historical_events( @@ -460,6 +483,8 @@ class RoomBatchHandler: inherited_depth=inherited_depth, initial_state_event_ids=initial_state_event_ids, app_service_requester=app_service_requester, + beeper_new_messages=beeper_new_messages, + beeper_initial_prev_event_ids=beeper_initial_prev_event_ids, ) return event_ids, next_batch_id diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index ce7817683..bf6a3385c 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -82,6 +82,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): requester: Requester, ratelimit: bool, extra_users: List[UserID], + dont_notify: bool, ) -> JsonDict: """ Args: @@ -92,6 +93,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): context ratelimit extra_users: Any extra users to notify about event + dont_notify """ serialized_context = await context.serialize(event, store) @@ -106,6 +108,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], + "dont_notify": dont_notify, } return payload @@ -131,13 +134,14 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): ratelimit = content["ratelimit"] extra_users = [UserID.from_string(u) for u in content["extra_users"]] + dont_notify = content["dont_notify"] logger.info( "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) event = await self.event_creation_handler.persist_and_notify_client_event( - requester, event, context, ratelimit=ratelimit, extra_users=extra_users + requester, event, context, ratelimit=ratelimit, extra_users=extra_users, dont_notify=dont_notify, ) return ( diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index dd91dabed..57471cc5b 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -28,6 +28,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, parse_strings_from_args, + parse_boolean_from_args, ) from synapse.http.site import SynapseRequest from synapse.rest.client.transactions import HttpTransactionCache @@ -100,6 +101,7 @@ class RoomBatchSendEventRestServlet(RestServlet): request.args, "prev_event_id" ) batch_id_from_query = parse_string(request, "batch_id") + beeper_new_messages = parse_boolean_from_args(request.args, "com.beeper.new_messages") if prev_event_ids_from_query is None: raise SynapseError( @@ -148,7 +150,7 @@ class RoomBatchSendEventRestServlet(RestServlet): # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member # state events used to auth the upcoming historical messages. - if body["state_events_at_start"]: + if body["state_events_at_start"] and not beeper_new_messages: state_event_ids_at_start = ( await self.room_batch_handler.persist_state_events_at_start( state_events_at_start=body["state_events_at_start"], @@ -174,6 +176,8 @@ class RoomBatchSendEventRestServlet(RestServlet): base_insertion_event = None if batch_id_from_query: batch_id_to_connect_to = batch_id_from_query + elif beeper_new_messages: + batch_id_to_connect_to = None # Otherwise, create an insertion event to act as a starting point. # # We don't always have an insertion event to start hanging more history @@ -224,11 +228,18 @@ class RoomBatchSendEventRestServlet(RestServlet): inherited_depth=inherited_depth, initial_state_event_ids=state_event_ids, app_service_requester=requester, + beeper_new_messages=beeper_new_messages, + beeper_initial_prev_event_ids=prev_event_ids_from_query if beeper_new_messages else None, ) - insertion_event_id = event_ids[0] - batch_event_id = event_ids[-1] - historical_event_ids = event_ids[1:-1] + if beeper_new_messages: + insertion_event_id = batch_event_id = None + historical_event_ids = event_ids + next_batch_id = None + else: + insertion_event_id = event_ids[0] + batch_event_id = event_ids[-1] + historical_event_ids = event_ids[1:-1] response_dict = { "state_event_ids": state_event_ids_at_start,