mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Faster room joins: Resume state re-syncing after a Synapse restart (#12813)
Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
parent
2fba1076c5
commit
641908f72f
1
changelog.d/12813.misc
Normal file
1
changelog.d/12813.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Resume state re-syncing for rooms with partial state after a Synapse restart.
|
@ -169,6 +169,14 @@ class FederationHandler:
|
|||||||
|
|
||||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||||
|
|
||||||
|
# if this is the main process, fire off a background process to resume
|
||||||
|
# any partial-state-resync operations which were in flight when we
|
||||||
|
# were shut down.
|
||||||
|
if not hs.config.worker.worker_app:
|
||||||
|
run_as_background_process(
|
||||||
|
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
|
||||||
|
)
|
||||||
|
|
||||||
async def maybe_backfill(
|
async def maybe_backfill(
|
||||||
self, room_id: str, current_depth: int, limit: int
|
self, room_id: str, current_depth: int, limit: int
|
||||||
) -> bool:
|
) -> bool:
|
||||||
@ -470,6 +478,8 @@ class FederationHandler:
|
|||||||
"""
|
"""
|
||||||
# TODO: We should be able to call this on workers, but the upgrading of
|
# TODO: We should be able to call this on workers, but the upgrading of
|
||||||
# room stuff after join currently doesn't work on workers.
|
# room stuff after join currently doesn't work on workers.
|
||||||
|
# TODO: Before we relax this condition, we need to allow re-syncing of
|
||||||
|
# partial room state to happen on workers.
|
||||||
assert self.config.worker.worker_app is None
|
assert self.config.worker.worker_app is None
|
||||||
|
|
||||||
logger.debug("Joining %s to %s", joinee, room_id)
|
logger.debug("Joining %s to %s", joinee, room_id)
|
||||||
@ -550,8 +560,6 @@ class FederationHandler:
|
|||||||
if ret.partial_state:
|
if ret.partial_state:
|
||||||
# Kick off the process of asynchronously fetching the state for this
|
# Kick off the process of asynchronously fetching the state for this
|
||||||
# room.
|
# room.
|
||||||
#
|
|
||||||
# TODO(faster_joins): pick this up again on restart
|
|
||||||
run_as_background_process(
|
run_as_background_process(
|
||||||
desc="sync_partial_state_room",
|
desc="sync_partial_state_room",
|
||||||
func=self._sync_partial_state_room,
|
func=self._sync_partial_state_room,
|
||||||
@ -1463,6 +1471,20 @@ class FederationHandler:
|
|||||||
# well.
|
# well.
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def _resume_sync_partial_state_room(self) -> None:
|
||||||
|
"""Resumes resyncing of all partial-state rooms after a restart."""
|
||||||
|
assert not self.config.worker.worker_app
|
||||||
|
|
||||||
|
partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
|
||||||
|
for room_id, servers_in_room in partial_state_rooms.items():
|
||||||
|
run_as_background_process(
|
||||||
|
desc="sync_partial_state_room",
|
||||||
|
func=self._sync_partial_state_room,
|
||||||
|
initial_destination=None,
|
||||||
|
other_destinations=servers_in_room,
|
||||||
|
room_id=room_id,
|
||||||
|
)
|
||||||
|
|
||||||
async def _sync_partial_state_room(
|
async def _sync_partial_state_room(
|
||||||
self,
|
self,
|
||||||
initial_destination: Optional[str],
|
initial_destination: Optional[str],
|
||||||
@ -1477,6 +1499,7 @@ class FederationHandler:
|
|||||||
`initial_destination` is unavailable
|
`initial_destination` is unavailable
|
||||||
room_id: room to be resynced
|
room_id: room to be resynced
|
||||||
"""
|
"""
|
||||||
|
assert not self.config.worker.worker_app
|
||||||
|
|
||||||
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
|
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
|
||||||
# worker processes kick off a resync in parallel? Perhaps we should just elect
|
# worker processes kick off a resync in parallel? Perhaps we should just elect
|
||||||
|
@ -23,6 +23,7 @@ from typing import (
|
|||||||
Collection,
|
Collection,
|
||||||
Dict,
|
Dict,
|
||||||
List,
|
List,
|
||||||
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Tuple,
|
Tuple,
|
||||||
Union,
|
Union,
|
||||||
@ -1081,6 +1082,32 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
|||||||
get_rooms_for_retention_period_in_range_txn,
|
get_rooms_for_retention_period_in_range_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_partial_state_rooms_and_servers(
|
||||||
|
self,
|
||||||
|
) -> Mapping[str, Collection[str]]:
|
||||||
|
"""Get all rooms containing events with partial state, and the servers known
|
||||||
|
to be in the room.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary of rooms with partial state, with room IDs as keys and
|
||||||
|
lists of servers in rooms as values.
|
||||||
|
"""
|
||||||
|
room_servers: Dict[str, List[str]] = {}
|
||||||
|
|
||||||
|
rows = await self.db_pool.simple_select_list(
|
||||||
|
"partial_state_rooms_servers",
|
||||||
|
keyvalues=None,
|
||||||
|
retcols=("room_id", "server_name"),
|
||||||
|
desc="get_partial_state_rooms",
|
||||||
|
)
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
room_id = row["room_id"]
|
||||||
|
server_name = row["server_name"]
|
||||||
|
room_servers.setdefault(room_id, []).append(server_name)
|
||||||
|
|
||||||
|
return room_servers
|
||||||
|
|
||||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||||
# this can race with incoming events, so we watch out for FK errors.
|
# this can race with incoming events, so we watch out for FK errors.
|
||||||
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
|
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
|
||||||
|
Loading…
Reference in New Issue
Block a user