Ensure that remote users' device list resyncing always happens on master (#9043)

Currently `DeviceMessageHandler` only ever exists on master, but that is about to change.
This commit is contained in:
Erik Johnston 2021-01-07 18:06:52 +00:00 committed by GitHub
parent 63593134a1
commit e34df813ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 4 deletions

1
changelog.d/9043.feature Normal file
View File

@ -0,0 +1 @@
Add experimental support for handling and persistence of to-device messages to happen on worker processes.

View File

@ -24,6 +24,7 @@ from synapse.logging.opentracing import (
set_tag, set_tag,
start_active_span, start_active_span,
) )
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
@ -50,7 +51,17 @@ class DeviceMessageHandler:
"m.direct_to_device", self.on_direct_to_device_edu "m.direct_to_device", self.on_direct_to_device_edu
) )
self._device_list_updater = hs.get_device_handler().device_list_updater # The handler to call when we think a user's device list might be out of
# sync. We do all device list resyncing on the master instance, so if
# we're on a worker we hit the device resync replication API.
if hs.config.worker.worker_app is None:
self._user_device_resync = (
hs.get_device_handler().device_list_updater.user_device_resync
)
else:
self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
local_messages = {} local_messages = {}
@ -138,9 +149,7 @@ class DeviceMessageHandler:
await self.store.mark_remote_user_device_cache_as_stale(sender_user_id) await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
# Immediately attempt a resync in the background # Immediately attempt a resync in the background
run_in_background( run_in_background(self._user_device_resync, sender_user_id)
self._device_list_updater.user_device_resync, sender_user_id
)
async def send_device_message( async def send_device_message(
self, self,