Allow profile changes to happen on workers

This commit is contained in:
Erik Johnston 2018-08-06 14:46:17 +01:00
parent 051a99c400
commit 495cb100d1
5 changed files with 106 additions and 8 deletions

View File

@ -45,6 +45,11 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.room import ( from synapse.rest.client.v1.room import (
JoinRoomAliasServlet, JoinRoomAliasServlet,
RoomMembershipRestServlet, RoomMembershipRestServlet,
@ -101,6 +106,9 @@ class EventCreatorServer(HomeServer):
RoomMembershipRestServlet(self).register(resource) RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource) RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource) JoinRoomAliasServlet(self).register(resource)
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
resources.update({ resources.update({
"/_matrix/client/r0": resource, "/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource, "/_matrix/client/unstable": resource,

View File

@ -19,6 +19,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, CodeMessageException, SynapseError from synapse.api.errors import AuthError, CodeMessageException, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.profile import ReplicationHandleProfileChangeRestServlet
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
from ._base import BaseHandler from ._base import BaseHandler
@ -45,6 +46,10 @@ class ProfileHandler(BaseHandler):
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
) )
self._notify_master_profile_change = (
ReplicationHandleProfileChangeRestServlet.make_client(hs)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_profile(self, user_id): def get_profile(self, user_id):
target_user = UserID.from_string(user_id) target_user = UserID.from_string(user_id)
@ -147,10 +152,16 @@ class ProfileHandler(BaseHandler):
) )
if self.hs.config.user_directory_search_all_users: if self.hs.config.user_directory_search_all_users:
if self.hs.config.worker_app is None:
profile = yield self.store.get_profileinfo(target_user.localpart) profile = yield self.store.get_profileinfo(target_user.localpart)
yield self.user_directory_handler.handle_local_profile_change( yield self.user_directory_handler.handle_local_profile_change(
target_user.to_string(), profile target_user.to_string(), profile
) )
else:
yield self._notify_master_profile_change(
requester=requester,
user_id=target_user.to_string(),
)
yield self._update_join_states(requester, target_user) yield self._update_join_states(requester, target_user)
@ -196,11 +207,16 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_avatar_url target_user.localpart, new_avatar_url
) )
if self.hs.config.user_directory_search_all_users: if self.hs.config.worker_app is None:
profile = yield self.store.get_profileinfo(target_user.localpart) profile = yield self.store.get_profileinfo(target_user.localpart)
yield self.user_directory_handler.handle_local_profile_change( yield self.user_directory_handler.handle_local_profile_change(
target_user.to_string(), profile target_user.to_string(), profile
) )
else:
yield self._notify_master_profile_change(
requester=requester,
user_id=target_user.to_string(),
)
yield self._update_join_states(requester, target_user) yield self._update_join_states(requester, target_user)

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.replication.http import membership, send_event from synapse.replication.http import membership, profile, send_event
REPLICATION_PREFIX = "/_synapse/replication" REPLICATION_PREFIX = "/_synapse/replication"
@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs): def register_servlets(self, hs):
send_event.register_servlets(hs, self) send_event.register_servlets(hs, self)
membership.register_servlets(hs, self) membership.register_servlets(hs, self)
profile.register_servlets(hs, self)

View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
logger = logging.getLogger(__name__)
class ReplicationHandleProfileChangeRestServlet(ReplicationEndpoint):
NAME = "profile_changed"
PATH_ARGS = ("user_id",)
POST = True
def __init__(self, hs):
super(ReplicationHandleProfileChangeRestServlet, self).__init__(hs)
self.user_directory_handler = hs.get_user_directory_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, user_id):
"""
Args:
requester (Requester)
user_id (str)
"""
return {
"requester": requester.serialize(),
}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
requester = Requester.deserialize(self.store, content["requester"])
if requester.user:
request.authenticated_entity = requester.user.to_string()
target_user = UserID.from_string(user_id)
profile = yield self.store.get_profileinfo(target_user.localpart)
yield self.user_directory_handler.handle_local_profile_change(
user_id, profile
)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
ReplicationHandleProfileChangeRestServlet(hs).register(http_server)

View File

@ -71,8 +71,6 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_from_remote_profile_cache", desc="get_from_remote_profile_cache",
) )
class ProfileStore(ProfileWorkerStore):
def create_profile(self, user_localpart): def create_profile(self, user_localpart):
return self._simple_insert( return self._simple_insert(
table="profiles", table="profiles",
@ -182,3 +180,7 @@ class ProfileStore(ProfileWorkerStore):
if res: if res:
defer.returnValue(True) defer.returnValue(True)
class ProfileStore(ProfileWorkerStore):
pass