Merge pull request #2466 from matrix-org/erikj/groups_merged

Initial Group Implementation
This commit is contained in:
Erik Johnston 2017-10-11 13:20:07 +01:00 committed by GitHub
commit 535cc49f27
35 changed files with 4793 additions and 66 deletions

View file

@ -20,7 +20,6 @@ from .room import (
from .room_member import RoomMemberHandler
from .message import MessageHandler
from .federation import FederationHandler
from .profile import ProfileHandler
from .directory import DirectoryHandler
from .admin import AdminHandler
from .identity import IdentityHandler
@ -52,7 +51,6 @@ class Handlers(object):
self.room_creation_handler = RoomCreationHandler(hs)
self.room_member_handler = RoomMemberHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs)

View file

@ -0,0 +1,402 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
import logging
logger = logging.getLogger(__name__)
def _create_rerouter(func_name):
"""Returns a function that looks at the group id and calls the function
on federation or the local group server if the group is local
"""
def f(self, group_id, *args, **kwargs):
if self.is_mine_id(group_id):
return getattr(self.groups_server_handler, func_name)(
group_id, *args, **kwargs
)
else:
destination = get_domain_from_id(group_id)
return getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
return f
class GroupsLocalHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.room_list_handler = hs.get_room_list_handler()
self.groups_server_handler = hs.get_groups_server_handler()
self.transport_client = hs.get_federation_transport_client()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.is_mine_id = hs.is_mine_id
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.attestations = hs.get_groups_attestation_signing()
self.profile_handler = hs.get_profile_handler()
# Ensure attestations get renewed
hs.get_groups_attestation_renewer()
# The following functions merely route the query to the local groups server
# or federation depending on if the group is local or remote
get_group_profile = _create_rerouter("get_group_profile")
update_group_profile = _create_rerouter("update_group_profile")
get_rooms_in_group = _create_rerouter("get_rooms_in_group")
add_room_to_group = _create_rerouter("add_room_to_group")
remove_room_from_group = _create_rerouter("remove_room_from_group")
update_group_summary_room = _create_rerouter("update_group_summary_room")
delete_group_summary_room = _create_rerouter("delete_group_summary_room")
update_group_category = _create_rerouter("update_group_category")
delete_group_category = _create_rerouter("delete_group_category")
get_group_category = _create_rerouter("get_group_category")
get_group_categories = _create_rerouter("get_group_categories")
update_group_summary_user = _create_rerouter("update_group_summary_user")
delete_group_summary_user = _create_rerouter("delete_group_summary_user")
update_group_role = _create_rerouter("update_group_role")
delete_group_role = _create_rerouter("delete_group_role")
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
If the group is remote we check that the users have valid attestations.
"""
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_group_summary(
group_id, requester_user_id
)
else:
res = yield self.transport_client.get_group_summary(
get_domain_from_id(group_id), group_id, requester_user_id,
)
# Loop through the users and validate the attestations.
chunk = res["users_section"]["users"]
valid_users = []
for entry in chunk:
g_user_id = entry["user_id"]
attestation = entry.pop("attestation")
try:
yield self.attestations.verify_attestation(
attestation,
group_id=group_id,
user_id=g_user_id,
)
valid_users.append(entry)
except Exception as e:
logger.info("Failed to verify user is in group: %s", e)
res["users_section"]["users"] = valid_users
res["users_section"]["users"].sort(key=lambda e: e.get("order", 0))
res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0))
# Add `is_publicised` flag to indicate whether the user has publicised their
# membership of the group on their profile
result = yield self.store.get_publicised_groups_for_user(requester_user_id)
is_publicised = group_id in result
res.setdefault("user", {})["is_publicised"] = is_publicised
defer.returnValue(res)
@defer.inlineCallbacks
def create_group(self, group_id, user_id, content):
"""Create a group
"""
logger.info("Asking to create group with ID: %r", group_id)
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.create_group(
group_id, user_id, content
)
local_attestation = None
remote_attestation = None
else:
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
content["user_profile"] = yield self.profile_handler.get_profile(user_id)
res = yield self.transport_client.create_group(
get_domain_from_id(group_id), group_id, user_id, content,
)
remote_attestation = res["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
group_id=group_id,
user_id=user_id,
)
is_publicised = content.get("publicise", False)
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=True,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
is_publicised=is_publicised,
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
defer.returnValue(res)
@defer.inlineCallbacks
def get_users_in_group(self, group_id, requester_user_id):
"""Get users in a group
"""
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_users_in_group(
group_id, requester_user_id
)
defer.returnValue(res)
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id,
)
chunk = res["chunk"]
valid_entries = []
for entry in chunk:
g_user_id = entry["user_id"]
attestation = entry.pop("attestation")
try:
yield self.attestations.verify_attestation(
attestation,
group_id=group_id,
user_id=g_user_id,
)
valid_entries.append(entry)
except Exception as e:
logger.info("Failed to verify user is in group: %s", e)
res["chunk"] = valid_entries
defer.returnValue(res)
@defer.inlineCallbacks
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
raise NotImplementedError() # TODO
@defer.inlineCallbacks
def accept_invite(self, group_id, user_id, content):
"""Accept an invite to a group
"""
if self.is_mine_id(group_id):
yield self.groups_server_handler.accept_invite(
group_id, user_id, content
)
local_attestation = None
remote_attestation = None
else:
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
res = yield self.transport_client.accept_group_invite(
get_domain_from_id(group_id), group_id, user_id, content,
)
remote_attestation = res["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
group_id=group_id,
user_id=user_id,
)
# TODO: Check that the group is public and we're being added publically
is_publicised = content.get("publicise", False)
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=False,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
is_publicised=is_publicised,
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
defer.returnValue({})
@defer.inlineCallbacks
def invite(self, group_id, user_id, requester_user_id, config):
"""Invite a user to a group
"""
content = {
"requester_user_id": requester_user_id,
"config": config,
}
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.invite_to_group(
group_id, user_id, requester_user_id, content,
)
else:
res = yield self.transport_client.invite_to_group(
get_domain_from_id(group_id), group_id, user_id, requester_user_id,
content,
)
defer.returnValue(res)
@defer.inlineCallbacks
def on_invite(self, group_id, user_id, content):
"""One of our users were invited to a group
"""
# TODO: Support auto join and rejection
if not self.is_mine_id(user_id):
raise SynapseError(400, "User not on this server")
local_profile = {}
if "profile" in content:
if "name" in content["profile"]:
local_profile["name"] = content["profile"]["name"]
if "avatar_url" in content["profile"]:
local_profile["avatar_url"] = content["profile"]["avatar_url"]
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="invite",
content={"profile": local_profile, "inviter": content["inviter"]},
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
user_profile = yield self.profile_handler.get_profile(user_id)
defer.returnValue({"state": "invite", "user_profile": user_profile})
@defer.inlineCallbacks
def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
"""Remove a user from a group
"""
if user_id == requester_user_id:
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
# TODO: Should probably remember that we tried to leave so that we can
# retry if the group server is currently down.
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.remove_user_from_group(
group_id, user_id, requester_user_id, content,
)
else:
content["requester_user_id"] = requester_user_id
res = yield self.transport_client.remove_user_from_group(
get_domain_from_id(group_id), group_id, requester_user_id,
user_id, content,
)
defer.returnValue(res)
@defer.inlineCallbacks
def user_removed_from_group(self, group_id, user_id, content):
"""One of our users was removed/kicked from a group
"""
# TODO: Check if user in group
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
@defer.inlineCallbacks
def get_joined_groups(self, user_id):
group_ids = yield self.store.get_joined_groups(user_id)
defer.returnValue({"groups": group_ids})
@defer.inlineCallbacks
def get_publicised_groups_for_user(self, user_id):
if self.hs.is_mine_id(user_id):
result = yield self.store.get_publicised_groups_for_user(user_id)
defer.returnValue({"groups": result})
else:
result = yield self.transport_client.get_publicised_groups_for_user(
get_domain_from_id(user_id), user_id
)
# TODO: Verify attestations
defer.returnValue(result)
@defer.inlineCallbacks
def bulk_get_publicised_groups(self, user_ids, proxy=True):
destinations = {}
local_users = set()
for user_id in user_ids:
if self.hs.is_mine_id(user_id):
local_users.add(user_id)
else:
destinations.setdefault(
get_domain_from_id(user_id), set()
).add(user_id)
if not proxy and destinations:
raise SynapseError(400, "Some user_ids are not local")
results = {}
failed_results = []
for destination, dest_user_ids in destinations.iteritems():
try:
r = yield self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids),
)
results.update(r["users"])
except Exception:
failed_results.extend(dest_user_ids)
for uid in local_users:
results[uid] = yield self.store.get_publicised_groups_for_user(
uid
)
defer.returnValue({"users": results})

View file

@ -47,6 +47,7 @@ class MessageHandler(BaseHandler):
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.profile_handler = hs.get_profile_handler()
self.pagination_lock = ReadWriteLock()
@ -212,7 +213,7 @@ class MessageHandler(BaseHandler):
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.hs.get_handlers().profile_handler
profile = self.profile_handler
content = builder.content
try:

View file

@ -19,14 +19,15 @@ from twisted.internet import defer
import synapse.types
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.types import UserID
from synapse.types import UserID, get_domain_from_id
from ._base import BaseHandler
logger = logging.getLogger(__name__)
class ProfileHandler(BaseHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)
@ -36,6 +37,63 @@ class ProfileHandler(BaseHandler):
"profile", self.on_profile_query
)
self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
displayname = yield self.store.get_profile_displayname(
target_user.localpart
)
avatar_url = yield self.store.get_profile_avatar_url(
target_user.localpart
)
defer.returnValue({
"displayname": displayname,
"avatar_url": avatar_url,
})
else:
try:
result = yield self.federation.make_query(
destination=target_user.domain,
query_type="profile",
args={
"user_id": user_id,
},
ignore_backoff=True,
)
defer.returnValue(result)
except CodeMessageException as e:
if e.code != 404:
logger.exception("Failed to get displayname")
raise
@defer.inlineCallbacks
def get_profile_from_cache(self, user_id):
"""Get the profile information from our local cache. If the user is
ours then the profile information will always be corect. Otherwise,
it may be out of date/missing.
"""
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
displayname = yield self.store.get_profile_displayname(
target_user.localpart
)
avatar_url = yield self.store.get_profile_avatar_url(
target_user.localpart
)
defer.returnValue({
"displayname": displayname,
"avatar_url": avatar_url,
})
else:
profile = yield self.store.get_from_remote_profile_cache(user_id)
defer.returnValue(profile or {})
@defer.inlineCallbacks
def get_displayname(self, target_user):
if self.hs.is_mine(target_user):
@ -182,3 +240,44 @@ class ProfileHandler(BaseHandler):
"Failed to update join event for room %s - %s",
room_id, str(e.message)
)
def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't
checked in a while.
"""
entries = yield self.store.get_remote_profile_cache_entries_that_expire(
last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS
)
for user_id, displayname, avatar_url in entries:
is_subscribed = yield self.store.is_subscribed_remote_profile_for_user(
user_id,
)
if not is_subscribed:
yield self.store.maybe_delete_remote_profile_cache(user_id)
continue
try:
profile = yield self.federation.make_query(
destination=get_domain_from_id(user_id),
query_type="profile",
args={
"user_id": user_id,
},
ignore_backoff=True,
)
except:
logger.exception("Failed to get avatar_url")
yield self.store.update_remote_profile_cache(
user_id, displayname, avatar_url
)
continue
new_name = profile.get("displayname")
new_avatar = profile.get("avatar_url")
# We always hit update to update the last_check timestamp
yield self.store.update_remote_profile_cache(
user_id, new_name, new_avatar
)

View file

@ -36,6 +36,7 @@ class RegistrationHandler(BaseHandler):
super(RegistrationHandler, self).__init__(hs)
self.auth = hs.get_auth()
self.profile_handler = hs.get_profile_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self._next_generated_user_id = None
@ -423,8 +424,7 @@ class RegistrationHandler(BaseHandler):
if displayname is not None:
logger.info("setting user display name: %s -> %s", user_id, displayname)
profile_handler = self.hs.get_handlers().profile_handler
yield profile_handler.set_displayname(
yield self.profile_handler.set_displayname(
user, requester, displayname, by_admin=True,
)

View file

@ -276,13 +276,14 @@ class RoomListHandler(BaseHandler):
# We've already got enough, so lets just drop it.
return
result = yield self._generate_room_entry(room_id, num_joined_users)
result = yield self.generate_room_entry(room_id, num_joined_users)
if result and _matches_room_entry(result, search_filter):
chunk.append(result)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def _generate_room_entry(self, room_id, num_joined_users, cache_context):
def generate_room_entry(self, room_id, num_joined_users, cache_context,
with_alias=True, allow_private=False):
"""Returns the entry for a room
"""
result = {
@ -316,14 +317,15 @@ class RoomListHandler(BaseHandler):
join_rules_event = current_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if join_rule and join_rule != JoinRules.PUBLIC:
if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
aliases = yield self.store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate
)
if aliases:
result["aliases"] = aliases
if with_alias:
aliases = yield self.store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate
)
if aliases:
result["aliases"] = aliases
name_event = yield current_state.get((EventTypes.Name, ""))
if name_event:

View file

@ -45,6 +45,8 @@ class RoomMemberHandler(BaseHandler):
def __init__(self, hs):
super(RoomMemberHandler, self).__init__(hs)
self.profile_handler = hs.get_profile_handler()
self.member_linearizer = Linearizer(name="member")
self.clock = hs.get_clock()
@ -282,7 +284,7 @@ class RoomMemberHandler(BaseHandler):
content["membership"] = Membership.JOIN
profile = self.hs.get_handlers().profile_handler
profile = self.profile_handler
if not content_specified:
content["displayname"] = yield profile.get_displayname(target)
content["avatar_url"] = yield profile.get_avatar_url(target)

View file

@ -108,6 +108,17 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
return True
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
"join",
"invite",
"leave",
])):
__slots__ = []
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
class DeviceLists(collections.namedtuple("DeviceLists", [
"changed", # list of user_ids whose devices may have changed
"left", # list of user_ids whose devices we no longer track
@ -129,6 +140,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"device_lists", # List of user_ids whose devices have chanegd
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",
])):
__slots__ = []
@ -144,7 +156,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.archived or
self.account_data or
self.to_device or
self.device_lists
self.device_lists or
self.groups
)
@ -595,6 +608,8 @@ class SyncHandler(object):
user_id, device_id
)
yield self._generate_sync_entry_for_groups(sync_result_builder)
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@ -603,10 +618,57 @@ class SyncHandler(object):
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
groups=sync_result_builder.groups,
device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token,
))
@measure_func("_generate_sync_entry_for_groups")
@defer.inlineCallbacks
def _generate_sync_entry_for_groups(self, sync_result_builder):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
if since_token and since_token.groups_key:
results = yield self.store.get_groups_changes_for_user(
user_id, since_token.groups_key, now_token.groups_key,
)
else:
results = yield self.store.get_all_groups_for_user(
user_id, now_token.groups_key,
)
invited = {}
joined = {}
left = {}
for result in results:
membership = result["membership"]
group_id = result["group_id"]
gtype = result["type"]
content = result["content"]
if membership == "join":
if gtype == "membership":
# TODO: Add profile
content.pop("membership", None)
joined[group_id] = content["content"]
else:
joined.setdefault(group_id, {})[gtype] = content
elif membership == "invite":
if gtype == "membership":
content.pop("membership", None)
invited[group_id] = content["content"]
else:
if gtype == "membership":
left[group_id] = content["content"]
sync_result_builder.groups = GroupsSyncResult(
join=joined,
invite=invited,
leave=left,
)
@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(self, sync_result_builder,
@ -1368,6 +1430,7 @@ class SyncResultBuilder(object):
self.invited = []
self.archived = []
self.device = []
self.groups = None
self.to_device = []