mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Faster partial join to room with complex auth graph (#7)
Instead of persisting outliers in a bunch of batches, let's just do them all at once. This is fine because all `_auth_and_persist_outliers_inner` is doing is checking the auth rules for each event, which requires the events to be topologically sorted by the auth graph.
This commit is contained in:
parent
a0f0fdf4d4
commit
c3f2f0f063
1
changelog.d/7.misc
Normal file
1
changelog.d/7.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Faster partial join to room with complex auth graph.
|
@ -94,7 +94,7 @@ from synapse.types import (
|
|||||||
)
|
)
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||||
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
|
from synapse.util.iterutils import batch_iter, partition, sorted_topologically
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
from synapse.util.stringutils import shortstr
|
from synapse.util.stringutils import shortstr
|
||||||
|
|
||||||
@ -1678,57 +1678,36 @@ class FederationEventHandler:
|
|||||||
|
|
||||||
# We need to persist an event's auth events before the event.
|
# We need to persist an event's auth events before the event.
|
||||||
auth_graph = {
|
auth_graph = {
|
||||||
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
|
ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map]
|
||||||
for ev in event_map.values()
|
for ev in event_map.values()
|
||||||
}
|
}
|
||||||
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
|
sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph)
|
||||||
if not roots:
|
sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids]
|
||||||
# if *none* of the remaining events are ready, that means
|
logger.info(
|
||||||
# we have a loop. This either means a bug in our logic, or that
|
"Persisting %i remaining outliers: %s",
|
||||||
# somebody has managed to create a loop (which requires finding a
|
len(sorted_auth_events),
|
||||||
# hash collision in room v2 and later).
|
shortstr(e.event_id for e in sorted_auth_events),
|
||||||
logger.warning(
|
)
|
||||||
"Loop found in auth events while fetching missing state/auth "
|
|
||||||
"events: %s",
|
|
||||||
shortstr(event_map.keys()),
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Persisting %i of %i remaining outliers: %s",
|
|
||||||
len(roots),
|
|
||||||
len(event_map),
|
|
||||||
shortstr(e.event_id for e in roots),
|
|
||||||
)
|
|
||||||
|
|
||||||
await self._auth_and_persist_outliers_inner(room_id, roots)
|
|
||||||
|
|
||||||
async def _auth_and_persist_outliers_inner(
|
|
||||||
self, room_id: str, fetched_events: Collection[EventBase]
|
|
||||||
) -> None:
|
|
||||||
"""Helper for _auth_and_persist_outliers
|
|
||||||
|
|
||||||
Persists a batch of events where we have (theoretically) already persisted all
|
|
||||||
of their auth events.
|
|
||||||
|
|
||||||
Marks the events as outliers, auths them, persists them to the database, and,
|
|
||||||
where appropriate (eg, an invite), awakes the notifier.
|
|
||||||
|
|
||||||
Params:
|
|
||||||
origin: where the events came from
|
|
||||||
room_id: the room that the events are meant to be in (though this has
|
|
||||||
not yet been checked)
|
|
||||||
fetched_events: the events to persist
|
|
||||||
"""
|
|
||||||
# get all the auth events for all the events in this batch. By now, they should
|
# get all the auth events for all the events in this batch. By now, they should
|
||||||
# have been persisted.
|
# have been persisted.
|
||||||
auth_events = {
|
auth_event_ids = {
|
||||||
aid for event in fetched_events for aid in event.auth_event_ids()
|
aid for event in sorted_auth_events for aid in event.auth_event_ids()
|
||||||
}
|
}
|
||||||
persisted_events = await self._store.get_events(
|
auth_map = {
|
||||||
auth_events,
|
ev.event_id: ev
|
||||||
allow_rejected=True,
|
for ev in sorted_auth_events
|
||||||
)
|
if ev.event_id in auth_event_ids
|
||||||
|
}
|
||||||
|
|
||||||
|
missing_events = auth_event_ids.difference(auth_map)
|
||||||
|
if missing_events:
|
||||||
|
persisted_events = await self._store.get_events(
|
||||||
|
missing_events,
|
||||||
|
allow_rejected=True,
|
||||||
|
redact_behaviour=EventRedactBehaviour.as_is,
|
||||||
|
)
|
||||||
|
auth_map.update(persisted_events)
|
||||||
|
|
||||||
events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
|
events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
|
||||||
|
|
||||||
@ -1736,7 +1715,7 @@ class FederationEventHandler:
|
|||||||
with nested_logging_context(suffix=event.event_id):
|
with nested_logging_context(suffix=event.event_id):
|
||||||
auth = []
|
auth = []
|
||||||
for auth_event_id in event.auth_event_ids():
|
for auth_event_id in event.auth_event_ids():
|
||||||
ae = persisted_events.get(auth_event_id)
|
ae = auth_map.get(auth_event_id)
|
||||||
if not ae:
|
if not ae:
|
||||||
# the fact we can't find the auth event doesn't mean it doesn't
|
# the fact we can't find the auth event doesn't mean it doesn't
|
||||||
# exist, which means it is premature to reject `event`. Instead we
|
# exist, which means it is premature to reject `event`. Instead we
|
||||||
@ -1755,7 +1734,9 @@ class FederationEventHandler:
|
|||||||
context = EventContext.for_outlier(self._storage_controllers)
|
context = EventContext.for_outlier(self._storage_controllers)
|
||||||
try:
|
try:
|
||||||
validate_event_for_room_version(event)
|
validate_event_for_room_version(event)
|
||||||
await check_state_independent_auth_rules(self._store, event)
|
await check_state_independent_auth_rules(
|
||||||
|
self._store, event, batched_auth_events=auth_map
|
||||||
|
)
|
||||||
check_state_dependent_auth_rules(event, auth)
|
check_state_dependent_auth_rules(event, auth)
|
||||||
except AuthError as e:
|
except AuthError as e:
|
||||||
logger.warning("Rejecting %r because %s", event, e)
|
logger.warning("Rejecting %r because %s", event, e)
|
||||||
@ -1772,7 +1753,7 @@ class FederationEventHandler:
|
|||||||
|
|
||||||
events_and_contexts_to_persist.append((event, context))
|
events_and_contexts_to_persist.append((event, context))
|
||||||
|
|
||||||
for event in fetched_events:
|
for event in sorted_auth_events:
|
||||||
await prep(event)
|
await prep(event)
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
|
Loading…
Reference in New Issue
Block a user