Add local group server support

This commit is contained in:
Erik Johnston 2017-07-10 14:52:27 +01:00
parent b3de67234e
commit 2f9eafdd36
10 changed files with 1248 additions and 2 deletions

View File

@ -472,6 +472,72 @@ class TransportLayerClient(object):
defer.returnValue(content) defer.returnValue(content)
@log_function
def get_group_profile(self, destination, group_id, requester_user_id):
path = PREFIX + "/groups/%s/profile" % (group_id,)
return self.client.post_json(
destination=destination,
path=path,
data={"requester_user_id": requester_user_id},
ignore_backoff=True,
)
@log_function
def get_group_summary(self, destination, group_id, requester_user_id):
path = PREFIX + "/groups/%s/summary" % (group_id,)
return self.client.post_json(
destination=destination,
path=path,
data={"requester_user_id": requester_user_id},
ignore_backoff=True,
)
@log_function
def get_group_rooms(self, destination, group_id, requester_user_id):
path = PREFIX + "/groups/%s/rooms" % (group_id,)
return self.client.post_json(
destination=destination,
path=path,
data={"requester_user_id": requester_user_id},
ignore_backoff=True,
)
@log_function
def get_group_users(self, destination, group_id, requester_user_id):
path = PREFIX + "/groups/%s/users" % (group_id,)
return self.client.post_json(
destination=destination,
path=path,
data={"requester_user_id": requester_user_id},
ignore_backoff=True,
)
@log_function
def accept_group_invite(self, destination, group_id, user_id, content):
path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
return self.client.post_json(
destination=destination,
path=path,
data=content,
ignore_backoff=True,
)
@log_function
def invite_to_group(self, destination, group_id, user_id, content):
path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
return self.client.post_json(
destination=destination,
path=path,
data=content,
ignore_backoff=True,
)
@log_function @log_function
def invite_to_group_notification(self, destination, group_id, user_id, content): def invite_to_group_notification(self, destination, group_id, user_id, content):
"""Sent by group server to inform a user's server that they have been """Sent by group server to inform a user's server that they have been
@ -487,6 +553,17 @@ class TransportLayerClient(object):
ignore_backoff=True, ignore_backoff=True,
) )
@log_function
def remove_user_from_group(self, destination, group_id, user_id, content):
path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
return self.client.post_json(
destination=destination,
path=path,
data=content,
ignore_backoff=True,
)
@log_function @log_function
def remove_user_from_group_notification(self, destination, group_id, user_id, def remove_user_from_group_notification(self, destination, group_id, user_id,
content): content):

View File

@ -715,6 +715,21 @@ class FederationGroupsInviteServlet(BaseFederationServlet):
defer.returnValue((200, new_content)) defer.returnValue((200, new_content))
class FederationGroupsLocalInviteServlet(BaseFederationServlet):
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
if get_domain_from_id(group_id) != origin:
raise SynapseError(403, "group_id doesn't match origin")
new_content = yield self.handler.on_invite(
group_id, user_id, content,
)
defer.returnValue((200, new_content))
class FederationGroupsAcceptInviteServlet(BaseFederationServlet): class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
"""Accept an invitation from the group server """Accept an invitation from the group server
""" """
@ -750,6 +765,21 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet):
defer.returnValue((200, new_content)) defer.returnValue((200, new_content))
class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
if get_domain_from_id(group_id) != origin:
raise SynapseError(403, "user_id doesn't match origin")
new_content = yield self.handler.user_removed_from_group(
group_id, user_id, content,
)
defer.returnValue((200, new_content))
class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
"""A group or user's server renews their attestation """A group or user's server renews their attestation
""" """
@ -1053,6 +1083,12 @@ GROUP_SERVER_SERVLET_CLASSES = (
) )
GROUP_LOCAL_SERVLET_CLASSES = (
FederationGroupsLocalInviteServlet,
FederationGroupsRemoveLocalUserServlet,
)
GROUP_ATTESTATION_SERVLET_CLASSES = ( GROUP_ATTESTATION_SERVLET_CLASSES = (
FederationGroupsRenewAttestaionServlet, FederationGroupsRenewAttestaionServlet,
) )
@ -1083,6 +1119,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
server_name=hs.hostname, server_name=hs.hostname,
).register(resource) ).register(resource)
for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
servletclass(
handler=hs.get_groups_local_handler(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
).register(resource)
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
servletclass( servletclass(
handler=hs.get_groups_attestation_renewer(), handler=hs.get_groups_attestation_renewer(),

View File

@ -462,7 +462,9 @@ class GroupsServerHandler(object):
} }
if self.hs.is_mine_id(user_id): if self.hs.is_mine_id(user_id):
raise NotImplementedError() groups_local = self.hs.get_groups_local_handler()
res = yield groups_local.on_invite(group_id, user_id, content)
local_attestation = None
else: else:
local_attestation = self.attestations.create_attestation(group_id, user_id) local_attestation = self.attestations.create_attestation(group_id, user_id)
content.update({ content.update({
@ -590,7 +592,8 @@ class GroupsServerHandler(object):
if is_kick: if is_kick:
if self.hs.is_mine_id(user_id): if self.hs.is_mine_id(user_id):
raise NotImplementedError() groups_local = self.hs.get_groups_local_handler()
yield groups_local.user_removed_from_group(group_id, user_id, {})
else: else:
yield self.transport_client.remove_user_from_group_notification( yield self.transport_client.remove_user_from_group_notification(
get_domain_from_id(user_id), group_id, user_id, {} get_domain_from_id(user_id), group_id, user_id, {}

View File

@ -0,0 +1,278 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.errors import SynapseError
import logging
logger = logging.getLogger(__name__)
# TODO: Validate attestations
# TODO: Allow users to "knock" or simpkly join depending on rules
# TODO: is_priveged flag to users and is_public to users and rooms
# TODO: Roles
# TODO: Audit log for admins (profile updates, membership changes, users who tried
# to join but were rejected, etc)
# TODO: Flairs
# TODO: Add group memebership /sync
def _create_rerouter(name):
def f(self, group_id, *args, **kwargs):
if self.is_mine_id(group_id):
return getattr(self.groups_server_handler, name)(
group_id, *args, **kwargs
)
repl_layer = self.hs.get_replication_layer()
return getattr(repl_layer, name)(group_id, *args, **kwargs)
return f
class GroupsLocalHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.room_list_handler = hs.get_room_list_handler()
self.groups_server_handler = hs.get_groups_server_handler()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.is_mine_id = hs.is_mine_id
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.attestations = hs.get_groups_attestation_signing()
# Ensure attestations get renewed
hs.get_groups_attestation_renewer()
get_group_profile = _create_rerouter("get_group_profile")
get_rooms_in_group = _create_rerouter("get_rooms_in_group")
update_group_summary_room = _create_rerouter("update_group_summary_room")
delete_group_summary_room = _create_rerouter("delete_group_summary_room")
update_group_category = _create_rerouter("update_group_category")
delete_group_category = _create_rerouter("delete_group_category")
get_group_category = _create_rerouter("get_group_category")
get_group_categories = _create_rerouter("get_group_categories")
update_group_summary_user = _create_rerouter("update_group_summary_user")
delete_group_summary_user = _create_rerouter("delete_group_summary_user")
update_group_role = _create_rerouter("update_group_role")
delete_group_role = _create_rerouter("delete_group_role")
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_group_summary(
group_id, requester_user_id
)
defer.returnValue(res)
repl_layer = self.hs.get_replication_layer()
res = yield repl_layer.get_group_summary(group_id, requester_user_id)
chunk = res["users_section"]["users"]
valid_users = []
for entry in chunk:
g_user_id = entry["user_id"]
attestation = entry.pop("attestation")
try:
yield self.attestations.verify_attestation(
attestation,
group_id=group_id,
user_id=g_user_id,
)
valid_users.append(entry)
except Exception as e:
logger.info("Failed to verify user is in group: %s", e)
res["users_section"]["users"] = valid_users
res["users_section"]["users"].sort(key=lambda e: e.get("order", 0))
res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0))
defer.returnValue(res)
def create_group(self, group_id, user_id, content):
logger.info("Asking to create group with ID: %r", group_id)
if self.is_mine_id(group_id):
return self.groups_server_handler.create_group(
group_id, user_id, content
)
repl_layer = self.hs.get_replication_layer()
return repl_layer.create_group(group_id, user_id, content) # TODO
def add_room(self, group_id, user_id, room_id, content):
if self.is_mine_id(group_id):
return self.groups_server_handler.add_room(
group_id, user_id, room_id, content
)
repl_layer = self.hs.get_replication_layer()
return repl_layer.add_room_to_group(group_id, user_id, room_id, content) # TODO
@defer.inlineCallbacks
def get_users_in_group(self, group_id, requester_user_id):
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_users_in_group(
group_id, requester_user_id
)
defer.returnValue(res)
repl_layer = self.hs.get_replication_layer()
res = yield repl_layer.get_users_in_group(group_id, requester_user_id) # TODO
chunk = res["chunk"]
valid_entries = []
for entry in chunk:
g_user_id = entry["user_id"]
attestation = entry.pop("attestation")
try:
yield self.attestations.verify_attestation(
attestation,
group_id=group_id,
user_id=g_user_id,
)
valid_entries.append(entry)
except Exception as e:
logger.info("Failed to verify user is in group: %s", e)
res["chunk"] = valid_entries
defer.returnValue(res)
@defer.inlineCallbacks
def join_group(self, group_id, user_id, content):
raise NotImplementedError() # TODO
@defer.inlineCallbacks
def accept_invite(self, group_id, user_id, content):
if self.is_mine_id(group_id):
yield self.groups_server_handler.accept_invite(
group_id, user_id, content
)
local_attestation = None
remote_attestation = None
else:
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
repl_layer = self.hs.get_replication_layer()
res = yield repl_layer.accept_group_invite(group_id, user_id, content)
remote_attestation = res["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
group_id=group_id,
user_id=user_id,
)
yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=False,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
defer.returnValue({})
@defer.inlineCallbacks
def invite(self, group_id, user_id, requester_user_id, config):
content = {
"requester_user_id": requester_user_id,
"config": config,
}
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.invite_to_group(
group_id, user_id, requester_user_id, content,
)
else:
repl_layer = self.hs.get_replication_layer()
res = yield repl_layer.invite_to_group(
group_id, user_id, content,
)
defer.returnValue(res)
@defer.inlineCallbacks
def on_invite(self, group_id, user_id, content):
# TODO: Support auto join and rejection
if not self.is_mine_id(user_id):
raise SynapseError(400, "User not on this server")
local_profile = {}
if "profile" in content:
if "name" in content["profile"]:
local_profile["name"] = content["profile"]["name"]
if "avatar_url" in content["profile"]:
local_profile["avatar_url"] = content["profile"]["avatar_url"]
yield self.store.register_user_group_membership(
group_id, user_id,
membership="invite",
content={"profile": local_profile, "inviter": content["inviter"]},
)
defer.returnValue({"state": "invite"})
@defer.inlineCallbacks
def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
if user_id == requester_user_id:
yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
# TODO: Should probably remember that we tried to leave so that we can
# retry if the group server is currently down.
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.remove_user_from_group(
group_id, user_id, requester_user_id, content,
)
else:
content["requester_user_id"] = requester_user_id
repl_layer = self.hs.get_replication_layer()
res = yield repl_layer.remove_user_from_group(
group_id, user_id, content
) # TODO
defer.returnValue(res)
@defer.inlineCallbacks
def user_removed_from_group(self, group_id, user_id, content):
# TODO: Check if user in group
yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
@defer.inlineCallbacks
def get_joined_groups(self, user_id):
group_ids = yield self.store.get_joined_groups(user_id)
defer.returnValue({"groups": group_ids})

View File

@ -52,6 +52,7 @@ from synapse.rest.client.v2_alpha import (
thirdparty, thirdparty,
sendtodevice, sendtodevice,
user_directory, user_directory,
groups,
) )
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
@ -102,3 +103,4 @@ class ClientRestResource(JsonResource):
thirdparty.register_servlets(hs, client_resource) thirdparty.register_servlets(hs, client_resource)
sendtodevice.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource)
user_directory.register_servlets(hs, client_resource) user_directory.register_servlets(hs, client_resource)
groups.register_servlets(hs, client_resource)

View File

@ -0,0 +1,642 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.types import GroupID
from ._base import client_v2_patterns
import logging
logger = logging.getLogger(__name__)
class GroupServlet(RestServlet):
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/profile$")
def __init__(self, hs):
super(GroupServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
group_description = yield self.groups_handler.get_group_profile(group_id, user_id)
defer.returnValue((200, group_description))
class GroupSummaryServlet(RestServlet):
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/summary$")
def __init__(self, hs):
super(GroupSummaryServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id)
defer.returnValue((200, get_group_summary))
class GroupSummaryRoomsServlet(RestServlet):
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/summary/rooms$")
def __init__(self, hs):
super(GroupSummaryServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id)
defer.returnValue((200, get_group_summary))
class GroupSummaryRoomsDefaultCatServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/summary/rooms/(?P<room_id>[^/]*)$"
)
def __init__(self, hs):
super(GroupSummaryRoomsDefaultCatServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id, room_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
resp = yield self.groups_handler.update_group_summary_room(
group_id, user_id,
room_id=room_id,
category_id=None,
content=content,
)
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, room_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
resp = yield self.groups_handler.delete_group_summary_room(
group_id, user_id,
room_id=room_id,
category_id=None,
)
defer.returnValue((200, resp))
class GroupSummaryRoomsCatServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/summary"
"/categories/(?P<category_id>[^/]+)/rooms/(?P<room_id>[^/]+)$"
)
def __init__(self, hs):
super(GroupSummaryRoomsCatServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id, category_id, room_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
resp = yield self.groups_handler.update_group_summary_room(
group_id, user_id,
room_id=room_id,
category_id=category_id,
content=content,
)
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, category_id, room_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
resp = yield self.groups_handler.delete_group_summary_room(
group_id, user_id,
room_id=room_id,
category_id=category_id,
)
defer.returnValue((200, resp))
class GroupCategoryServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$"
)
def __init__(self, hs):
super(GroupCategoryServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id, category_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_category(
group_id, user_id,
category_id=category_id,
)
defer.returnValue((200, category))
@defer.inlineCallbacks
def on_PUT(self, request, group_id, category_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
resp = yield self.groups_handler.update_group_category(
group_id, user_id,
category_id=category_id,
content=content,
)
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, category_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
resp = yield self.groups_handler.delete_group_category(
group_id, user_id,
category_id=category_id,
)
defer.returnValue((200, resp))
class GroupCategoriesServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/categories/$"
)
def __init__(self, hs):
super(GroupCategoriesServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_categories(
group_id, user_id,
)
defer.returnValue((200, category))
class GroupRoleServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$"
)
def __init__(self, hs):
super(GroupRoleServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id, role_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_role(
group_id, user_id,
role_id=role_id,
)
defer.returnValue((200, category))
@defer.inlineCallbacks
def on_PUT(self, request, group_id, role_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
resp = yield self.groups_handler.update_group_role(
group_id, user_id,
role_id=role_id,
content=content,
)
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, role_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
resp = yield self.groups_handler.delete_group_role(
group_id, user_id,
role_id=role_id,
)
defer.returnValue((200, resp))
class GroupRolesServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/roles/$"
)
def __init__(self, hs):
super(GroupRolesServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_roles(
group_id, user_id,
)
defer.returnValue((200, category))
class GroupSummaryUsersDefaultRoleServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/summary/users/(?P<user_id>[^/]*)$"
)
def __init__(self, hs):
super(GroupSummaryUsersDefaultRoleServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id, user_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
resp = yield self.groups_handler.update_group_summary_user(
group_id, requester_user_id,
user_id=user_id,
role_id=None,
content=content,
)
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, user_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
resp = yield self.groups_handler.delete_group_summary_user(
group_id, requester_user_id,
user_id=user_id,
role_id=None,
)
defer.returnValue((200, resp))
class GroupSummaryUsersRoleServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/summary"
"/roles/(?P<role_id>[^/]+)/users/(?P<user_id>[^/]+)$"
)
def __init__(self, hs):
super(GroupSummaryUsersRoleServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id, role_id, user_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
resp = yield self.groups_handler.update_group_summary_user(
group_id, requester_user_id,
user_id=user_id,
role_id=role_id,
content=content,
)
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, role_id, user_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
resp = yield self.groups_handler.delete_group_summary_user(
group_id, requester_user_id,
user_id=user_id,
role_id=role_id,
)
defer.returnValue((200, resp))
class GroupRoomServlet(RestServlet):
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/rooms$")
def __init__(self, hs):
super(GroupRoomServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
result = yield self.groups_handler.get_rooms_in_group(group_id, user_id)
defer.returnValue((200, result))
class GroupUsersServlet(RestServlet):
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/users$")
def __init__(self, hs):
super(GroupUsersServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
result = yield self.groups_handler.get_users_in_group(group_id, user_id)
defer.returnValue((200, result))
class GroupCreateServlet(RestServlet):
PATTERNS = client_v2_patterns("/create_group$")
def __init__(self, hs):
super(GroupCreateServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
self.server_name = hs.hostname
@defer.inlineCallbacks
def on_POST(self, request):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
# TODO: Create group on remote server
content = parse_json_object_from_request(request)
localpart = content.pop("localpart")
group_id = GroupID.create(localpart, self.server_name).to_string()
result = yield self.groups_handler.create_group(group_id, user_id, content)
defer.returnValue((200, result))
class GroupAdminRoomsServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/admin/rooms/(?P<room_id>[^/]*)$"
)
def __init__(self, hs):
super(GroupAdminRoomsServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id, room_id):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.add_room(group_id, user_id, room_id, content)
defer.returnValue((200, result))
class GroupAdminUsersInviteServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/admin/users/invite/(?P<user_id>[^/]*)$"
)
def __init__(self, hs):
super(GroupAdminUsersInviteServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
self.store = hs.get_datastore()
self.is_mine_id = hs.is_mine_id
@defer.inlineCallbacks
def on_PUT(self, request, group_id, user_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
config = content.get("config", {})
result = yield self.groups_handler.invite(
group_id, user_id, requester_user_id, config,
)
defer.returnValue((200, result))
class GroupAdminUsersKickServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/admin/users/remove/(?P<user_id>[^/]*)$"
)
def __init__(self, hs):
super(GroupAdminUsersKickServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id, user_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.remove_user_from_group(
group_id, user_id, requester_user_id, content,
)
defer.returnValue((200, result))
class GroupSelfLeaveServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/self/leave$"
)
def __init__(self, hs):
super(GroupSelfLeaveServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.remove_user_from_group(
group_id, requester_user_id, requester_user_id, content,
)
defer.returnValue((200, result))
class GroupSelfJoinServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/self/join$"
)
def __init__(self, hs):
super(GroupSelfJoinServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.join_group(
group_id, requester_user_id, content,
)
defer.returnValue((200, result))
class GroupSelfAcceptInviteServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/groups/(?P<group_id>[^/]*)/self/accept_invite$"
)
def __init__(self, hs):
super(GroupSelfAcceptInviteServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.accept_invite(
group_id, requester_user_id, content,
)
defer.returnValue((200, result))
class GroupsForUserServlet(RestServlet):
PATTERNS = client_v2_patterns(
"/joined_groups$"
)
def __init__(self, hs):
super(GroupsForUserServlet, self).__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
result = yield self.groups_handler.get_joined_groups(user_id)
defer.returnValue((200, result))
def register_servlets(hs, http_server):
GroupServlet(hs).register(http_server)
GroupSummaryServlet(hs).register(http_server)
GroupUsersServlet(hs).register(http_server)
GroupRoomServlet(hs).register(http_server)
GroupCreateServlet(hs).register(http_server)
GroupAdminRoomsServlet(hs).register(http_server)
GroupAdminUsersInviteServlet(hs).register(http_server)
GroupAdminUsersKickServlet(hs).register(http_server)
GroupSelfLeaveServlet(hs).register(http_server)
GroupSelfJoinServlet(hs).register(http_server)
GroupSelfAcceptInviteServlet(hs).register(http_server)
GroupsForUserServlet(hs).register(http_server)
GroupSummaryRoomsDefaultCatServlet(hs).register(http_server)
GroupCategoryServlet(hs).register(http_server)
GroupCategoriesServlet(hs).register(http_server)
GroupSummaryRoomsCatServlet(hs).register(http_server)
GroupRoleServlet(hs).register(http_server)
GroupRolesServlet(hs).register(http_server)
GroupSummaryUsersDefaultRoleServlet(hs).register(http_server)
GroupSummaryUsersRoleServlet(hs).register(http_server)

View File

@ -50,6 +50,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.user_directory import UserDirectoyHandler from synapse.handlers.user_directory import UserDirectoyHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.groups_server import GroupsServerHandler
from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
@ -141,6 +142,7 @@ class HomeServer(object):
'read_marker_handler', 'read_marker_handler',
'action_generator', 'action_generator',
'user_directory_handler', 'user_directory_handler',
'groups_local_handler',
'groups_server_handler', 'groups_server_handler',
'groups_attestation_signing', 'groups_attestation_signing',
'groups_attestation_renewer', 'groups_attestation_renewer',
@ -314,6 +316,9 @@ class HomeServer(object):
def build_user_directory_handler(self): def build_user_directory_handler(self):
return UserDirectoyHandler(self) return UserDirectoyHandler(self)
def build_groups_local_handler(self):
return GroupsLocalHandler(self)
def build_groups_server_handler(self): def build_groups_server_handler(self):
return GroupsServerHandler(self) return GroupsServerHandler(self)

View File

@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "pushers", "id", db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")], extra_tables=[("deleted_pushers", "stream_id")],
) )
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator( self._cache_id_gen = StreamIdGenerator(
@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=curr_state_delta_prefill, prefilled_cache=curr_state_delta_prefill,
) )
_group_updates_prefill, min_group_updates_id = self._get_cache_dict(
db_conn, "local_group_updates",
entity_column="user_id",
stream_column="stream_id",
max_value=self._group_updates_id_gen.get_current_token(),
limit=1000,
)
self._group_updates_stream_cache = StreamChangeCache(
"_group_updates_stream_cache", min_group_updates_id,
prefilled_cache=_group_updates_prefill,
)
cur = LoggingTransaction( cur = LoggingTransaction(
db_conn.cursor(), db_conn.cursor(),
name="_find_stream_orderings_for_times_txn", name="_find_stream_orderings_for_times_txn",

View File

@ -756,6 +756,103 @@ class GroupServerStore(SQLBaseStore):
desc="add_room_to_group", desc="add_room_to_group",
) )
@defer.inlineCallbacks
def register_user_group_membership(self, group_id, user_id, membership,
is_admin=False, content={},
local_attestation=None,
remote_attestation=None,
):
def _register_user_group_membership_txn(txn, next_id):
# TODO: Upsert?
self._simple_delete_txn(
txn,
table="local_group_membership",
keyvalues={
"group_id": group_id,
"user_id": user_id,
},
)
self._simple_insert_txn(
txn,
table="local_group_membership",
values={
"group_id": group_id,
"user_id": user_id,
"is_admin": is_admin,
"membership": membership,
"content": json.dumps(content),
},
)
self._simple_delete_txn(
txn,
table="local_group_updates",
keyvalues={
"group_id": group_id,
"user_id": user_id,
"type": "membership",
},
)
self._simple_insert_txn(
txn,
table="local_group_updates",
values={
"stream_id": next_id,
"group_id": group_id,
"user_id": user_id,
"type": "membership",
"content": json.dumps({"membership": membership, "content": content}),
}
)
self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
# TODO: Insert profile to ensuer it comes down stream if its a join.
if membership == "join":
if local_attestation:
self._simple_insert_txn(
txn,
table="group_attestations_renewals",
values={
"group_id": group_id,
"user_id": user_id,
"valid_until_ms": local_attestation["valid_until_ms"],
}
)
if remote_attestation:
self._simple_insert_txn(
txn,
table="group_attestations_remote",
values={
"group_id": group_id,
"user_id": user_id,
"valid_until_ms": remote_attestation["valid_until_ms"],
"attestation": json.dumps(remote_attestation),
}
)
else:
self._simple_delete_txn(
txn,
table="group_attestations_renewals",
keyvalues={
"group_id": group_id,
"user_id": user_id,
},
)
self._simple_delete_txn(
txn,
table="group_attestations_remote",
keyvalues={
"group_id": group_id,
"user_id": user_id,
},
)
with self._group_updates_id_gen.get_next() as next_id:
yield self.runInteraction(
"register_user_group_membership",
_register_user_group_membership_txn, next_id,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def create_group(self, group_id, user_id, name, avatar_url, short_description, def create_group(self, group_id, user_id, name, avatar_url, short_description,
long_description,): long_description,):
@ -771,6 +868,61 @@ class GroupServerStore(SQLBaseStore):
desc="create_group", desc="create_group",
) )
def get_joined_groups(self, user_id):
return self._simple_select_onecol(
table="local_group_membership",
keyvalues={
"user_id": user_id,
"membership": "join",
},
retcol="group_id",
desc="get_joined_groups",
)
def get_all_groups_for_user(self, user_id, now_token):
def _get_all_groups_for_user_txn(txn):
sql = """
SELECT group_id, type, membership, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND membership != 'leave'
AND stream_id <= ?
"""
txn.execute(sql, (user_id, now_token,))
return self.cursor_to_dict(txn)
return self.runInteraction(
"get_all_groups_for_user", _get_all_groups_for_user_txn,
)
def get_groups_changes_for_user(self, user_id, from_token, to_token):
from_token = int(from_token)
has_changed = self._group_updates_stream_cache.has_entity_changed(
user_id, from_token,
)
if not has_changed:
return []
def _get_groups_changes_for_user_txn(txn):
sql = """
SELECT group_id, membership, type, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (user_id, from_token, to_token,))
return [{
"group_id": group_id,
"membership": membership,
"type": gtype,
"content": json.loads(content_json),
} for group_id, membership, gtype, content_json in txn]
return self.runInteraction(
"get_groups_changes_for_user", _get_groups_changes_for_user_txn,
)
def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()
def get_attestations_need_renewals(self, valid_until_ms): def get_attestations_need_renewals(self, valid_until_ms):
"""Get all attestations that need to be renewed until givent time """Get all attestations that need to be renewed until givent time
""" """

View File

@ -142,3 +142,31 @@ CREATE TABLE group_attestations_remote (
CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id);
CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id);
CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms);
CREATE TABLE local_group_membership (
group_id TEXT NOT NULL,
user_id TEXT NOT NULL,
is_admin BOOLEAN NOT NULL,
membership TEXT NOT NULL,
content TEXT NOT NULL
);
CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id);
CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id);
CREATE TABLE local_group_updates (
stream_id BIGINT NOT NULL,
group_id TEXT NOT NULL,
user_id TEXT NOT NULL,
type TEXT NOT NULL,
content TEXT NOT NULL
);
CREATE TABLE local_group_profiles (
group_id TEXT NOT NULL,
name TEXT,
avatar_url TEXT
);