diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3f3178c64..f75c12395 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1360,6 +1360,7 @@ class EventCreationHandler: ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ignore_shadow_ban: bool = False, + dont_notify: bool = False, ) -> EventBase: """Processes new events. Please note that if batch persisting events, an error in handling any one of these events will result in all of the events being dropped. @@ -1379,6 +1380,8 @@ class EventCreationHandler: ignore_shadow_ban: True if shadow-banned users should be allowed to send this event. + dont_notify + Return: If the event was deduplicated, the previous, duplicate, event. Otherwise, `event`. @@ -1456,6 +1459,7 @@ class EventCreationHandler: events_and_context=events_and_context, ratelimit=ratelimit, extra_users=extra_users, + dont_notify=dont_notify, ), run_in_background( self.cache_joined_hosts_for_events, events_and_context @@ -1473,6 +1477,7 @@ class EventCreationHandler: events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, + dont_notify: bool = False, ) -> EventBase: """Actually persists new events. Should only be called by `handle_new_client_event`, and see its docstring for documentation of @@ -1502,6 +1507,7 @@ class EventCreationHandler: requester=requester, ratelimit=ratelimit, extra_users=extra_users, + dont_notify=dont_notify, ) except SynapseError as e: if e.code == HTTPStatus.CONFLICT: @@ -1531,6 +1537,7 @@ class EventCreationHandler: events_and_context, ratelimit=ratelimit, extra_users=extra_users, + dont_notify=dont_notify, ) return event @@ -1652,6 +1659,7 @@ class EventCreationHandler: events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, + dont_notify: bool = False, ) -> EventBase: """Called when we have fully built the events, have already calculated the push actions for the events, and checked auth. @@ -1954,7 +1962,7 @@ class EventCreationHandler: pos = PersistedEventPosition(self._instance_name, stream_ordering) events_and_pos.append((event, pos)) - if event.type == EventTypes.Message: + if not dont_notify and event.type == EventTypes.Message: # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. run_as_background_process( @@ -1969,7 +1977,10 @@ class EventCreationHandler: except Exception: logger.exception("Error notifying about new room events") - run_in_background(_notify) + if not dont_notify: + # Skip notifying clients, this is used for Beeper's custom + # batch sending of non-historical messages. + run_in_background(_notify) return persisted_events[-1] diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index d63786d55..c938f6431 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -274,6 +274,8 @@ class RoomBatchHandler: inherited_depth: int, initial_state_event_ids: List[str], app_service_requester: Requester, + beeper_new_messages: bool, + beeper_initial_prev_event_ids: List[str] = None, ) -> List[str]: """Create and persists all events provided sequentially. Handles the complexity of creating events in chronological order so they can @@ -293,21 +295,24 @@ class RoomBatchHandler: the start of the historical batch since it's floating with no prev_events to derive state from automatically. app_service_requester: The requester of an application service. + beeper_new_messages: Is this a batch of new events rather than history? + beeper_initial_prev_event_ids: prev_event_ids for the first event to send. Returns: List of persisted event IDs """ assert app_service_requester.app_service - # We expect the first event in a historical batch to be an insertion event - assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION - # We expect the last event in a historical batch to be an batch event - assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH + if not beeper_new_messages: + # We expect the first event in a historical batch to be an insertion event + assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION + # We expect the last event in a historical batch to be an batch event + assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH # Make the historical event chain float off on its own by specifying no # prev_events for the first event in the chain which causes the HS to # ask for the state at the start of the batch later. - prev_event_ids: List[str] = [] + prev_event_ids: List[str] = beeper_initial_prev_event_ids or [] event_ids = [] events_to_persist = [] @@ -338,14 +343,14 @@ class RoomBatchHandler: # Only the first event (which is the insertion event) in the # chain should be floating. The rest should hang off each other # in a chain. - allow_no_prev_events=index == 0, + allow_no_prev_events=index == 0 and not beeper_new_messages, prev_event_ids=event_dict.get("prev_events"), # Since the first event (which is the insertion event) in the # chain is floating with no `prev_events`, it can't derive state # from anywhere automatically. So we need to set some state # explicitly. state_event_ids=initial_state_event_ids if index == 0 else None, - historical=True, + historical=not beeper_new_messages, depth=inherited_depth, ) context = await unpersisted_context.persist(event) @@ -373,6 +378,18 @@ class RoomBatchHandler: event_ids.append(event_id) prev_event_ids = [event_id] + if beeper_new_messages: + for index, (event, context) in enumerate(events_to_persist): + await self.event_creation_handler.handle_new_client_event( + await self.create_requester_for_user_id_from_app_service( + event.sender, app_service_requester.app_service + ), + event=event, + context=context, + dont_notify=index < len(events_to_persist) - 1, + ) + return event_ids + # Persist events in reverse-chronological order so they have the # correct stream_ordering as they are backfilled (which decrements). # Events are sorted by (topological_ordering, stream_ordering) @@ -397,6 +414,8 @@ class RoomBatchHandler: inherited_depth: int, initial_state_event_ids: List[str], app_service_requester: Requester, + beeper_new_messages: bool, + beeper_initial_prev_event_ids: List[str] = None, ) -> Tuple[List[str], str]: """ Handles creating and persisting all of the historical events as well as @@ -418,6 +437,8 @@ class RoomBatchHandler: `/batch_send?prev_event_id=$abc` plus the outcome of `persist_state_events_at_start` app_service_requester: The requester of an application service. + beeper_new_messages: Is this a batch of new events rather than history? + beeper_initial_prev_event_ids: prev_event_ids for the first event to send. Returns: Tuple containing a list of created events and the next_batch_id @@ -438,8 +459,9 @@ class RoomBatchHandler: # the last event we're inserting "origin_server_ts": last_event_in_batch["origin_server_ts"], } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) + if not beeper_new_messages: + # Add the batch event to the end of the batch (newest-in-time) + events_to_create.append(batch_event) # Add an "insertion" event to the start of each batch (next to the oldest-in-time # event in the batch) so the next batch can be connected to this one. @@ -454,8 +476,9 @@ class RoomBatchHandler: next_batch_id = insertion_event["content"][ EventContentFields.MSC2716_NEXT_BATCH_ID ] - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create + if not beeper_new_messages: + # Prepend the insertion event to the start of the batch (oldest-in-time) + events_to_create = [insertion_event] + events_to_create # Create and persist all of the historical events event_ids = await self.persist_historical_events( @@ -464,6 +487,8 @@ class RoomBatchHandler: inherited_depth=inherited_depth, initial_state_event_ids=initial_state_event_ids, app_service_requester=app_service_requester, + beeper_new_messages=beeper_new_messages, + beeper_initial_prev_event_ids=beeper_initial_prev_event_ids, ) return event_ids, next_batch_id diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index ba12b6d79..55e317e8a 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -332,6 +332,7 @@ class BulkPushRuleEvaluator: if ( not event.internal_metadata.is_notifiable() or event.internal_metadata.is_historical() + or event.content.get(EventContentFields.MSC2716_HISTORICAL) ): # Push rules for events that aren't notifiable can't be processed by this and # we want to skip push notification actions for historical messages diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 27ad91407..bc8622333 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -84,6 +84,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): requester: Requester, ratelimit: bool, extra_users: List[UserID], + dont_notify: bool, ) -> JsonDict: """ Args: @@ -94,6 +95,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): context ratelimit extra_users: Any extra users to notify about event + dont_notify """ serialized_context = await context.serialize(event, store) @@ -108,6 +110,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], + "dont_notify": dont_notify, } return payload @@ -133,13 +136,18 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): ratelimit = content["ratelimit"] extra_users = [UserID.from_string(u) for u in content["extra_users"]] + dont_notify = content["dont_notify"] logger.info( "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) event = await self.event_creation_handler.persist_and_notify_client_events( - requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users + requester, + [(event, context)], + ratelimit=ratelimit, + extra_users=extra_users, + dont_notify=dont_notify, ) return ( diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py index 4f82c9f96..a41eb1db5 100644 --- a/synapse/replication/http/send_events.py +++ b/synapse/replication/http/send_events.py @@ -82,6 +82,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): requester: Requester, ratelimit: bool, extra_users: List[UserID], + dont_notify: bool, ) -> JsonDict: """ Args: @@ -108,7 +109,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): } serialized_events.append(serialized_event) - payload = {"events": serialized_events} + payload = {"events": serialized_events, "dont_notify": dont_notify} return payload @@ -118,6 +119,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): with Measure(self.clock, "repl_send_events_parse"): events_and_context = [] events = payload["events"] + dont_notify = payload["dont_notify"] for event_payload in events: event_dict = event_payload["event"] @@ -152,7 +154,11 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): last_event = ( await self.event_creation_handler.persist_and_notify_client_events( - requester, events_and_context, ratelimit, extra_users + requester, + events_and_context, + ratelimit, + extra_users, + dont_notify=dont_notify, ) ) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index ef284ecc1..79a1ad96d 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -26,6 +26,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, parse_strings_from_args, + parse_boolean_from_args, ) from synapse.http.site import SynapseRequest from synapse.types import JsonDict @@ -96,6 +97,9 @@ class RoomBatchSendEventRestServlet(RestServlet): request.args, "prev_event_id" ) batch_id_from_query = parse_string(request, "batch_id") + beeper_new_messages = parse_boolean_from_args( + request.args, "com.beeper.new_messages" + ) if prev_event_ids_from_query is None: raise SynapseError( @@ -151,7 +155,7 @@ class RoomBatchSendEventRestServlet(RestServlet): # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member # state events used to auth the upcoming historical messages. - if body["state_events_at_start"]: + if body["state_events_at_start"] and not beeper_new_messages: state_event_ids_at_start = ( await self.room_batch_handler.persist_state_events_at_start( state_events_at_start=body["state_events_at_start"], @@ -177,6 +181,8 @@ class RoomBatchSendEventRestServlet(RestServlet): base_insertion_event = None if batch_id_from_query: batch_id_to_connect_to = batch_id_from_query + elif beeper_new_messages: + batch_id_to_connect_to = None # Otherwise, create an insertion event to act as a starting point. # # We don't always have an insertion event to start hanging more history @@ -227,11 +233,20 @@ class RoomBatchSendEventRestServlet(RestServlet): inherited_depth=inherited_depth, initial_state_event_ids=state_event_ids, app_service_requester=requester, + beeper_new_messages=beeper_new_messages, + beeper_initial_prev_event_ids=prev_event_ids_from_query + if beeper_new_messages + else None, ) - insertion_event_id = event_ids[0] - batch_event_id = event_ids[-1] - historical_event_ids = event_ids[1:-1] + if beeper_new_messages: + insertion_event_id = batch_event_id = None + historical_event_ids = event_ids + next_batch_id = None + else: + insertion_event_id = event_ids[0] + batch_event_id = event_ids[-1] + historical_event_ids = event_ids[1:-1] response_dict = { "state_event_ids": state_event_ids_at_start,