mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-10-01 11:49:51 -04:00
Hack together a query param for batch sending non-historical events
This commit is contained in:
parent
4d95e65860
commit
84d583f267
@ -1115,6 +1115,7 @@ class EventCreationHandler:
|
||||
ratelimit: bool = True,
|
||||
extra_users: Optional[List[UserID]] = None,
|
||||
ignore_shadow_ban: bool = False,
|
||||
dont_notify: bool = False,
|
||||
) -> EventBase:
|
||||
"""Processes a new event.
|
||||
|
||||
@ -1134,6 +1135,8 @@ class EventCreationHandler:
|
||||
ignore_shadow_ban: True if shadow-banned users should be allowed to
|
||||
send this event.
|
||||
|
||||
dont_notify
|
||||
|
||||
Return:
|
||||
If the event was deduplicated, the previous, duplicate, event. Otherwise,
|
||||
`event`.
|
||||
@ -1217,6 +1220,7 @@ class EventCreationHandler:
|
||||
context=context,
|
||||
ratelimit=ratelimit,
|
||||
extra_users=extra_users,
|
||||
dont_notify=dont_notify,
|
||||
),
|
||||
run_in_background(
|
||||
self.cache_joined_hosts_for_event, event, context
|
||||
@ -1235,6 +1239,7 @@ class EventCreationHandler:
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
extra_users: Optional[List[UserID]] = None,
|
||||
dont_notify: bool = False,
|
||||
) -> EventBase:
|
||||
"""Actually persists the event. Should only be called by
|
||||
`handle_new_client_event`, and see its docstring for documentation of
|
||||
@ -1246,7 +1251,7 @@ class EventCreationHandler:
|
||||
# The historical messages also do not have the proper `context.current_state_ids`
|
||||
# and `state_groups` because they have `prev_events` that aren't persisted yet
|
||||
# (historical messages persisted in reverse-chronological order).
|
||||
if not event.internal_metadata.is_historical():
|
||||
if not event.internal_metadata.is_historical() and not event.content.get(EventContentFields.MSC2716_HISTORICAL):
|
||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||
|
||||
try:
|
||||
@ -1262,6 +1267,7 @@ class EventCreationHandler:
|
||||
context=context,
|
||||
ratelimit=ratelimit,
|
||||
extra_users=extra_users,
|
||||
dont_notify=dont_notify,
|
||||
)
|
||||
stream_id = result["stream_id"]
|
||||
event_id = result["event_id"]
|
||||
@ -1278,7 +1284,7 @@ class EventCreationHandler:
|
||||
return event
|
||||
|
||||
event = await self.persist_and_notify_client_event(
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users, dont_notify=dont_notify,
|
||||
)
|
||||
|
||||
return event
|
||||
@ -1380,6 +1386,7 @@ class EventCreationHandler:
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
extra_users: Optional[List[UserID]] = None,
|
||||
dont_notify: bool = False,
|
||||
) -> EventBase:
|
||||
"""Called when we have fully built the event, have already
|
||||
calculated the push actions for the event, and checked auth.
|
||||
@ -1634,6 +1641,11 @@ class EventCreationHandler:
|
||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
if dont_notify:
|
||||
# Skip notifying clients, this is used for Beeper's custom
|
||||
# batch sending of non-historical messages.
|
||||
return event
|
||||
|
||||
async def _notify() -> None:
|
||||
try:
|
||||
await self.notifier.on_new_room_event(
|
||||
|
@ -271,6 +271,8 @@ class RoomBatchHandler:
|
||||
inherited_depth: int,
|
||||
initial_state_event_ids: List[str],
|
||||
app_service_requester: Requester,
|
||||
beeper_new_messages: bool,
|
||||
beeper_initial_prev_event_ids: List[str] = None,
|
||||
) -> List[str]:
|
||||
"""Create and persists all events provided sequentially. Handles the
|
||||
complexity of creating events in chronological order so they can
|
||||
@ -290,21 +292,24 @@ class RoomBatchHandler:
|
||||
the start of the historical batch since it's floating with no
|
||||
prev_events to derive state from automatically.
|
||||
app_service_requester: The requester of an application service.
|
||||
beeper_new_messages: Is this a batch of new events rather than history?
|
||||
beeper_initial_prev_event_ids: prev_event_ids for the first event to send.
|
||||
|
||||
Returns:
|
||||
List of persisted event IDs
|
||||
"""
|
||||
assert app_service_requester.app_service
|
||||
|
||||
# We expect the first event in a historical batch to be an insertion event
|
||||
assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
|
||||
# We expect the last event in a historical batch to be an batch event
|
||||
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
|
||||
if not beeper_new_messages:
|
||||
# We expect the first event in a historical batch to be an insertion event
|
||||
assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
|
||||
# We expect the last event in a historical batch to be an batch event
|
||||
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
|
||||
|
||||
# Make the historical event chain float off on its own by specifying no
|
||||
# prev_events for the first event in the chain which causes the HS to
|
||||
# ask for the state at the start of the batch later.
|
||||
prev_event_ids: List[str] = []
|
||||
prev_event_ids: List[str] = beeper_initial_prev_event_ids or []
|
||||
|
||||
event_ids = []
|
||||
events_to_persist = []
|
||||
@ -335,14 +340,14 @@ class RoomBatchHandler:
|
||||
# Only the first event (which is the insertion event) in the
|
||||
# chain should be floating. The rest should hang off each other
|
||||
# in a chain.
|
||||
allow_no_prev_events=index == 0,
|
||||
allow_no_prev_events=index == 0 and not beeper_new_messages,
|
||||
prev_event_ids=event_dict.get("prev_events"),
|
||||
# Since the first event (which is the insertion event) in the
|
||||
# chain is floating with no `prev_events`, it can't derive state
|
||||
# from anywhere automatically. So we need to set some state
|
||||
# explicitly.
|
||||
state_event_ids=initial_state_event_ids if index == 0 else None,
|
||||
historical=True,
|
||||
historical=not beeper_new_messages,
|
||||
depth=inherited_depth,
|
||||
)
|
||||
|
||||
@ -370,6 +375,18 @@ class RoomBatchHandler:
|
||||
event_ids.append(event_id)
|
||||
prev_event_ids = [event_id]
|
||||
|
||||
if beeper_new_messages:
|
||||
for index, (event, context) in enumerate(events_to_persist):
|
||||
await self.event_creation_handler.handle_new_client_event(
|
||||
await self.create_requester_for_user_id_from_app_service(
|
||||
event.sender, app_service_requester.app_service
|
||||
),
|
||||
event=event,
|
||||
context=context,
|
||||
dont_notify=index < len(events_to_persist) - 1
|
||||
)
|
||||
return event_ids
|
||||
|
||||
# Persist events in reverse-chronological order so they have the
|
||||
# correct stream_ordering as they are backfilled (which decrements).
|
||||
# Events are sorted by (topological_ordering, stream_ordering)
|
||||
@ -393,6 +410,8 @@ class RoomBatchHandler:
|
||||
inherited_depth: int,
|
||||
initial_state_event_ids: List[str],
|
||||
app_service_requester: Requester,
|
||||
beeper_new_messages: bool,
|
||||
beeper_initial_prev_event_ids: List[str] = None,
|
||||
) -> Tuple[List[str], str]:
|
||||
"""
|
||||
Handles creating and persisting all of the historical events as well as
|
||||
@ -414,6 +433,8 @@ class RoomBatchHandler:
|
||||
`/batch_send?prev_event_id=$abc` plus the outcome of
|
||||
`persist_state_events_at_start`
|
||||
app_service_requester: The requester of an application service.
|
||||
beeper_new_messages: Is this a batch of new events rather than history?
|
||||
beeper_initial_prev_event_ids: prev_event_ids for the first event to send.
|
||||
|
||||
Returns:
|
||||
Tuple containing a list of created events and the next_batch_id
|
||||
@ -434,8 +455,9 @@ class RoomBatchHandler:
|
||||
# the last event we're inserting
|
||||
"origin_server_ts": last_event_in_batch["origin_server_ts"],
|
||||
}
|
||||
# Add the batch event to the end of the batch (newest-in-time)
|
||||
events_to_create.append(batch_event)
|
||||
if not beeper_new_messages:
|
||||
# Add the batch event to the end of the batch (newest-in-time)
|
||||
events_to_create.append(batch_event)
|
||||
|
||||
# Add an "insertion" event to the start of each batch (next to the oldest-in-time
|
||||
# event in the batch) so the next batch can be connected to this one.
|
||||
@ -450,8 +472,9 @@ class RoomBatchHandler:
|
||||
next_batch_id = insertion_event["content"][
|
||||
EventContentFields.MSC2716_NEXT_BATCH_ID
|
||||
]
|
||||
# Prepend the insertion event to the start of the batch (oldest-in-time)
|
||||
events_to_create = [insertion_event] + events_to_create
|
||||
if not beeper_new_messages:
|
||||
# Prepend the insertion event to the start of the batch (oldest-in-time)
|
||||
events_to_create = [insertion_event] + events_to_create
|
||||
|
||||
# Create and persist all of the historical events
|
||||
event_ids = await self.persist_historical_events(
|
||||
@ -460,6 +483,8 @@ class RoomBatchHandler:
|
||||
inherited_depth=inherited_depth,
|
||||
initial_state_event_ids=initial_state_event_ids,
|
||||
app_service_requester=app_service_requester,
|
||||
beeper_new_messages=beeper_new_messages,
|
||||
beeper_initial_prev_event_ids=beeper_initial_prev_event_ids,
|
||||
)
|
||||
|
||||
return event_ids, next_batch_id
|
||||
|
@ -82,6 +82,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
requester: Requester,
|
||||
ratelimit: bool,
|
||||
extra_users: List[UserID],
|
||||
dont_notify: bool,
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Args:
|
||||
@ -92,6 +93,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
context
|
||||
ratelimit
|
||||
extra_users: Any extra users to notify about event
|
||||
dont_notify
|
||||
"""
|
||||
serialized_context = await context.serialize(event, store)
|
||||
|
||||
@ -106,6 +108,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
"requester": requester.serialize(),
|
||||
"ratelimit": ratelimit,
|
||||
"extra_users": [u.to_string() for u in extra_users],
|
||||
"dont_notify": dont_notify,
|
||||
}
|
||||
|
||||
return payload
|
||||
@ -131,13 +134,14 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
|
||||
ratelimit = content["ratelimit"]
|
||||
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
|
||||
dont_notify = content["dont_notify"]
|
||||
|
||||
logger.info(
|
||||
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
|
||||
)
|
||||
|
||||
event = await self.event_creation_handler.persist_and_notify_client_event(
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users, dont_notify=dont_notify,
|
||||
)
|
||||
|
||||
return (
|
||||
|
@ -28,6 +28,7 @@ from synapse.http.servlet import (
|
||||
parse_json_object_from_request,
|
||||
parse_string,
|
||||
parse_strings_from_args,
|
||||
parse_boolean_from_args,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
@ -100,6 +101,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||
request.args, "prev_event_id"
|
||||
)
|
||||
batch_id_from_query = parse_string(request, "batch_id")
|
||||
beeper_new_messages = parse_boolean_from_args(request.args, "com.beeper.new_messages")
|
||||
|
||||
if prev_event_ids_from_query is None:
|
||||
raise SynapseError(
|
||||
@ -148,7 +150,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||
# Create and persist all of the state events that float off on their own
|
||||
# before the batch. These will most likely be all of the invite/member
|
||||
# state events used to auth the upcoming historical messages.
|
||||
if body["state_events_at_start"]:
|
||||
if body["state_events_at_start"] and not beeper_new_messages:
|
||||
state_event_ids_at_start = (
|
||||
await self.room_batch_handler.persist_state_events_at_start(
|
||||
state_events_at_start=body["state_events_at_start"],
|
||||
@ -174,6 +176,8 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||
base_insertion_event = None
|
||||
if batch_id_from_query:
|
||||
batch_id_to_connect_to = batch_id_from_query
|
||||
elif beeper_new_messages:
|
||||
batch_id_to_connect_to = None
|
||||
# Otherwise, create an insertion event to act as a starting point.
|
||||
#
|
||||
# We don't always have an insertion event to start hanging more history
|
||||
@ -224,11 +228,18 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||
inherited_depth=inherited_depth,
|
||||
initial_state_event_ids=state_event_ids,
|
||||
app_service_requester=requester,
|
||||
beeper_new_messages=beeper_new_messages,
|
||||
beeper_initial_prev_event_ids=prev_event_ids_from_query if beeper_new_messages else None,
|
||||
)
|
||||
|
||||
insertion_event_id = event_ids[0]
|
||||
batch_event_id = event_ids[-1]
|
||||
historical_event_ids = event_ids[1:-1]
|
||||
if beeper_new_messages:
|
||||
insertion_event_id = batch_event_id = None
|
||||
historical_event_ids = event_ids
|
||||
next_batch_id = None
|
||||
else:
|
||||
insertion_event_id = event_ids[0]
|
||||
batch_event_id = event_ids[-1]
|
||||
historical_event_ids = event_ids[1:-1]
|
||||
|
||||
response_dict = {
|
||||
"state_event_ids": state_event_ids_at_start,
|
||||
|
Loading…
Reference in New Issue
Block a user