mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-12-31 18:00:15 -05:00
Merge remote-tracking branch 'origin/develop' into dbkr/e2e_backup_versions_are_numbers
This commit is contained in:
commit
bca3b91c2d
140 changed files with 2471 additions and 754 deletions
|
|
@ -27,4 +27,4 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "0.33.7"
|
||||
__version__ = "0.33.8"
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ class LoginType(object):
|
|||
EMAIL_IDENTITY = u"m.login.email.identity"
|
||||
MSISDN = u"m.login.msisdn"
|
||||
RECAPTCHA = u"m.login.recaptcha"
|
||||
TERMS = u"m.login.terms"
|
||||
DUMMY = u"m.login.dummy"
|
||||
|
||||
# Only for C/S API v1
|
||||
|
|
@ -102,6 +103,7 @@ class ThirdPartyEntityKind(object):
|
|||
class RoomVersions(object):
|
||||
V1 = "1"
|
||||
VDH_TEST = "vdh-test-version"
|
||||
STATE_V2_TEST = "state-v2-test"
|
||||
|
||||
|
||||
# the version we will give rooms which are created on this server
|
||||
|
|
@ -109,7 +111,11 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1
|
|||
|
||||
# vdh-test-version is a placeholder to get room versioning support working and tested
|
||||
# until we have a working v2.
|
||||
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
|
||||
KNOWN_ROOM_VERSIONS = {
|
||||
RoomVersions.V1,
|
||||
RoomVersions.VDH_TEST,
|
||||
RoomVersions.STATE_V2_TEST,
|
||||
}
|
||||
|
||||
ServerNoticeMsgType = "m.server_notice"
|
||||
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ FEDERATION_PREFIX = "/_matrix/federation/v1"
|
|||
STATIC_PREFIX = "/_matrix/static"
|
||||
WEB_CLIENT_PREFIX = "/_matrix/client"
|
||||
CONTENT_REPO_PREFIX = "/_matrix/content"
|
||||
SERVER_KEY_PREFIX = "/_matrix/key/v1"
|
||||
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
|
||||
MEDIA_PREFIX = "/_matrix/media/r0"
|
||||
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ from synapse.api.urls import (
|
|||
FEDERATION_PREFIX,
|
||||
LEGACY_MEDIA_PREFIX,
|
||||
MEDIA_PREFIX,
|
||||
SERVER_KEY_PREFIX,
|
||||
SERVER_KEY_V2_PREFIX,
|
||||
STATIC_PREFIX,
|
||||
WEB_CLIENT_PREFIX,
|
||||
|
|
@ -59,7 +58,6 @@ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirem
|
|||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
||||
from synapse.rest import ClientRestResource
|
||||
from synapse.rest.key.v1.server_key_resource import LocalKey
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.server import HomeServer
|
||||
|
|
@ -236,10 +234,7 @@ class SynapseHomeServer(HomeServer):
|
|||
)
|
||||
|
||||
if name in ["keys", "federation"]:
|
||||
resources.update({
|
||||
SERVER_KEY_PREFIX: LocalKey(self),
|
||||
SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
|
||||
})
|
||||
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
|
||||
|
||||
if name == "webclient":
|
||||
resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
|
||||
|
|
|
|||
|
|
@ -226,7 +226,15 @@ class SynchrotronPresence(object):
|
|||
class SynchrotronTyping(object):
|
||||
def __init__(self, hs):
|
||||
self._latest_room_serial = 0
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
"""
|
||||
Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
|
||||
def stream_positions(self):
|
||||
|
|
@ -236,6 +244,12 @@ class SynchrotronTyping(object):
|
|||
return {"typing": self._latest_room_serial}
|
||||
|
||||
def process_replication_rows(self, token, rows):
|
||||
if self._latest_room_serial > token:
|
||||
# The master has gone backwards. To prevent inconsistent data, just
|
||||
# clear everything.
|
||||
self._reset()
|
||||
|
||||
# Set the latest serial token to whatever the server gave us.
|
||||
self._latest_room_serial = token
|
||||
|
||||
for row in rows:
|
||||
|
|
|
|||
|
|
@ -42,6 +42,14 @@ DEFAULT_CONFIG = """\
|
|||
# until the user consents to the privacy policy. The value of the setting is
|
||||
# used as the text of the error.
|
||||
#
|
||||
# 'require_at_registration', if enabled, will add a step to the registration
|
||||
# process, similar to how captcha works. Users will be required to accept the
|
||||
# policy before their account is created.
|
||||
#
|
||||
# 'policy_name' is the display name of the policy users will see when registering
|
||||
# for an account. Has no effect unless `require_at_registration` is enabled.
|
||||
# Defaults to "Privacy Policy".
|
||||
#
|
||||
# user_consent:
|
||||
# template_dir: res/templates/privacy
|
||||
# version: 1.0
|
||||
|
|
@ -54,6 +62,8 @@ DEFAULT_CONFIG = """\
|
|||
# block_events_error: >-
|
||||
# To continue using this homeserver you must review and agree to the
|
||||
# terms and conditions at %(consent_uri)s
|
||||
# require_at_registration: False
|
||||
# policy_name: Privacy Policy
|
||||
#
|
||||
"""
|
||||
|
||||
|
|
@ -67,6 +77,8 @@ class ConsentConfig(Config):
|
|||
self.user_consent_server_notice_content = None
|
||||
self.user_consent_server_notice_to_guests = False
|
||||
self.block_events_without_consent_error = None
|
||||
self.user_consent_at_registration = False
|
||||
self.user_consent_policy_name = "Privacy Policy"
|
||||
|
||||
def read_config(self, config):
|
||||
consent_config = config.get("user_consent")
|
||||
|
|
@ -83,6 +95,12 @@ class ConsentConfig(Config):
|
|||
self.user_consent_server_notice_to_guests = bool(consent_config.get(
|
||||
"send_server_notice_to_guests", False,
|
||||
))
|
||||
self.user_consent_at_registration = bool(consent_config.get(
|
||||
"require_at_registration", False,
|
||||
))
|
||||
self.user_consent_policy_name = consent_config.get(
|
||||
"policy_name", "Privacy Policy",
|
||||
)
|
||||
|
||||
def default_config(self, **kwargs):
|
||||
return DEFAULT_CONFIG
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ handlers:
|
|||
maxBytes: 104857600
|
||||
backupCount: 10
|
||||
filters: [context]
|
||||
encoding: utf8
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
formatter: precise
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
|
|
@ -28,15 +30,15 @@ from synapse.util import logcontext
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
KEY_API_V1 = b"/_matrix/key/v1/"
|
||||
KEY_API_V2 = "/_matrix/key/v2/server/%s"
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
|
||||
def fetch_server_key(server_name, tls_client_options_factory, key_id):
|
||||
"""Fetch the keys for a remote server."""
|
||||
|
||||
factory = SynapseKeyClientFactory()
|
||||
factory.path = path
|
||||
factory.path = KEY_API_V2 % (urllib.parse.quote(key_id), )
|
||||
factory.host = server_name
|
||||
endpoint = matrix_federation_endpoint(
|
||||
reactor, server_name, tls_client_options_factory, timeout=30
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2017 New Vector Ltd.
|
||||
# Copyright 2017, 2018 New Vector Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
|
@ -18,8 +18,6 @@ import hashlib
|
|||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from signedjson.key import (
|
||||
decode_verify_key_bytes,
|
||||
encode_verify_key_base64,
|
||||
|
|
@ -395,32 +393,13 @@ class Keyring(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def get_keys_from_server(self, server_name_and_key_ids):
|
||||
@defer.inlineCallbacks
|
||||
def get_key(server_name, key_ids):
|
||||
keys = None
|
||||
try:
|
||||
keys = yield self.get_server_verify_key_v2_direct(
|
||||
server_name, key_ids
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Unable to get key %r for %r directly: %s %s",
|
||||
key_ids, server_name,
|
||||
type(e).__name__, str(e),
|
||||
)
|
||||
|
||||
if not keys:
|
||||
keys = yield self.get_server_verify_key_v1_direct(
|
||||
server_name, key_ids
|
||||
)
|
||||
|
||||
keys = {server_name: keys}
|
||||
|
||||
defer.returnValue(keys)
|
||||
|
||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_key, server_name, key_ids)
|
||||
run_in_background(
|
||||
self.get_server_verify_key_v2_direct,
|
||||
server_name,
|
||||
key_ids,
|
||||
)
|
||||
for server_name, key_ids in server_name_and_key_ids
|
||||
],
|
||||
consumeErrors=True,
|
||||
|
|
@ -525,10 +504,7 @@ class Keyring(object):
|
|||
continue
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_client_options_factory,
|
||||
path=("/_matrix/key/v2/server/%s" % (
|
||||
urllib.parse.quote(requested_key_id),
|
||||
)).encode("ascii"),
|
||||
server_name, self.hs.tls_client_options_factory, requested_key_id
|
||||
)
|
||||
|
||||
if (u"signatures" not in response
|
||||
|
|
@ -657,78 +633,6 @@ class Keyring(object):
|
|||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_server_verify_key_v1_direct(self, server_name, key_ids):
|
||||
"""Finds a verification key for the server with one of the key ids.
|
||||
Args:
|
||||
server_name (str): The name of the server to fetch a key for.
|
||||
keys_ids (list of str): The key_ids to check for.
|
||||
"""
|
||||
|
||||
# Try to fetch the key from the remote server.
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_client_options_factory
|
||||
)
|
||||
|
||||
# Check the response.
|
||||
|
||||
x509_certificate_bytes = crypto.dump_certificate(
|
||||
crypto.FILETYPE_ASN1, tls_certificate
|
||||
)
|
||||
|
||||
if ("signatures" not in response
|
||||
or server_name not in response["signatures"]):
|
||||
raise KeyLookupError("Key response not signed by remote server")
|
||||
|
||||
if "tls_certificate" not in response:
|
||||
raise KeyLookupError("Key response missing TLS certificate")
|
||||
|
||||
tls_certificate_b64 = response["tls_certificate"]
|
||||
|
||||
if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
|
||||
raise KeyLookupError("TLS certificate doesn't match")
|
||||
|
||||
# Cache the result in the datastore.
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
|
||||
verify_keys = {}
|
||||
for key_id, key_base64 in response["verify_keys"].items():
|
||||
if is_signing_algorithm_supported(key_id):
|
||||
key_bytes = decode_base64(key_base64)
|
||||
verify_key = decode_verify_key_bytes(key_id, key_bytes)
|
||||
verify_key.time_added = time_now_ms
|
||||
verify_keys[key_id] = verify_key
|
||||
|
||||
for key_id in response["signatures"][server_name]:
|
||||
if key_id not in response["verify_keys"]:
|
||||
raise KeyLookupError(
|
||||
"Key response must include verification keys for all"
|
||||
" signatures"
|
||||
)
|
||||
if key_id in verify_keys:
|
||||
verify_signed_json(
|
||||
response,
|
||||
server_name,
|
||||
verify_keys[key_id]
|
||||
)
|
||||
|
||||
yield self.store.store_server_certificate(
|
||||
server_name,
|
||||
server_name,
|
||||
time_now_ms,
|
||||
tls_certificate,
|
||||
)
|
||||
|
||||
yield self.store_keys(
|
||||
server_name=server_name,
|
||||
from_server=server_name,
|
||||
verify_keys=verify_keys,
|
||||
)
|
||||
|
||||
defer.returnValue(verify_keys)
|
||||
|
||||
def store_keys(self, server_name, from_server, verify_keys):
|
||||
"""Store a collection of verify keys for a given server
|
||||
Args:
|
||||
|
|
|
|||
|
|
@ -200,11 +200,11 @@ def _is_membership_change_allowed(event, auth_events):
|
|||
membership = event.content["membership"]
|
||||
|
||||
# Check if this is the room creator joining:
|
||||
if len(event.prev_events) == 1 and Membership.JOIN == membership:
|
||||
if len(event.prev_event_ids()) == 1 and Membership.JOIN == membership:
|
||||
# Get room creation event:
|
||||
key = (EventTypes.Create, "", )
|
||||
create = auth_events.get(key)
|
||||
if create and event.prev_events[0][0] == create.event_id:
|
||||
if create and event.prev_event_ids()[0] == create.event_id:
|
||||
if create.content["creator"] == event.state_key:
|
||||
return
|
||||
|
||||
|
|
|
|||
|
|
@ -159,6 +159,24 @@ class EventBase(object):
|
|||
def keys(self):
|
||||
return six.iterkeys(self._event_dict)
|
||||
|
||||
def prev_event_ids(self):
|
||||
"""Returns the list of prev event IDs. The order matches the order
|
||||
specified in the event, though there is no meaning to it.
|
||||
|
||||
Returns:
|
||||
list[str]: The list of event IDs of this event's prev_events
|
||||
"""
|
||||
return [e for e, _ in self.prev_events]
|
||||
|
||||
def auth_event_ids(self):
|
||||
"""Returns the list of auth event IDs. The order matches the order
|
||||
specified in the event, though there is no meaning to it.
|
||||
|
||||
Returns:
|
||||
list[str]: The list of event IDs of this event's auth_events
|
||||
"""
|
||||
return [e for e, _ in self.auth_events]
|
||||
|
||||
|
||||
class FrozenEvent(EventBase):
|
||||
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
|
||||
|
|
|
|||
|
|
@ -323,11 +323,6 @@ class FederationServer(FederationBase):
|
|||
else:
|
||||
defer.returnValue((404, ""))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_pull_request(self, origin, versions):
|
||||
raise NotImplementedError("Pull transactions not implemented")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_query_request(self, query_type, args):
|
||||
received_queries_counter.labels(query_type).inc()
|
||||
|
|
|
|||
|
|
@ -183,9 +183,7 @@ class TransactionQueue(object):
|
|||
# banned then it won't receive the event because it won't
|
||||
# be in the room after the ban.
|
||||
destinations = yield self.state.get_current_hosts_in_room(
|
||||
event.room_id, latest_event_ids=[
|
||||
prev_id for prev_id, _ in event.prev_events
|
||||
],
|
||||
event.room_id, latest_event_ids=event.prev_event_ids(),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
|
|
|
|||
|
|
@ -362,14 +362,6 @@ class FederationSendServlet(BaseFederationServlet):
|
|||
defer.returnValue((code, response))
|
||||
|
||||
|
||||
class FederationPullServlet(BaseFederationServlet):
|
||||
PATH = "/pull/"
|
||||
|
||||
# This is for when someone asks us for everything since version X
|
||||
def on_GET(self, origin, content, query):
|
||||
return self.handler.on_pull_request(query["origin"][0], query["v"])
|
||||
|
||||
|
||||
class FederationEventServlet(BaseFederationServlet):
|
||||
PATH = "/event/(?P<event_id>[^/]*)/"
|
||||
|
||||
|
|
@ -1261,7 +1253,6 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
|
|||
|
||||
FEDERATION_SERVLET_CLASSES = (
|
||||
FederationSendServlet,
|
||||
FederationPullServlet,
|
||||
FederationEventServlet,
|
||||
FederationStateServlet,
|
||||
FederationStateIdsServlet,
|
||||
|
|
|
|||
|
|
@ -117,9 +117,6 @@ class Transaction(JsonEncodedObject):
|
|||
"Require 'transaction_id' to construct a Transaction"
|
||||
)
|
||||
|
||||
for p in pdus:
|
||||
p.transaction_id = kwargs["transaction_id"]
|
||||
|
||||
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
|
||||
|
||||
return Transaction(**kwargs)
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@ class AuthHandler(BaseHandler):
|
|||
LoginType.EMAIL_IDENTITY: self._check_email_identity,
|
||||
LoginType.MSISDN: self._check_msisdn,
|
||||
LoginType.DUMMY: self._check_dummy_auth,
|
||||
LoginType.TERMS: self._check_terms_auth,
|
||||
}
|
||||
self.bcrypt_rounds = hs.config.bcrypt_rounds
|
||||
|
||||
|
|
@ -431,6 +432,9 @@ class AuthHandler(BaseHandler):
|
|||
def _check_dummy_auth(self, authdict, _):
|
||||
return defer.succeed(True)
|
||||
|
||||
def _check_terms_auth(self, authdict, _):
|
||||
return defer.succeed(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_threepid(self, medium, authdict):
|
||||
if 'threepid_creds' not in authdict:
|
||||
|
|
@ -462,6 +466,22 @@ class AuthHandler(BaseHandler):
|
|||
def _get_params_recaptcha(self):
|
||||
return {"public_key": self.hs.config.recaptcha_public_key}
|
||||
|
||||
def _get_params_terms(self):
|
||||
return {
|
||||
"policies": {
|
||||
"privacy_policy": {
|
||||
"version": self.hs.config.user_consent_version,
|
||||
"en": {
|
||||
"name": self.hs.config.user_consent_policy_name,
|
||||
"url": "%s/_matrix/consent?v=%s" % (
|
||||
self.hs.config.public_baseurl,
|
||||
self.hs.config.user_consent_version,
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def _auth_dict_for_flows(self, flows, session):
|
||||
public_flows = []
|
||||
for f in flows:
|
||||
|
|
@ -469,6 +489,7 @@ class AuthHandler(BaseHandler):
|
|||
|
||||
get_params = {
|
||||
LoginType.RECAPTCHA: self._get_params_recaptcha,
|
||||
LoginType.TERMS: self._get_params_terms,
|
||||
}
|
||||
|
||||
params = {}
|
||||
|
|
|
|||
|
|
@ -138,9 +138,30 @@ class DirectoryHandler(BaseHandler):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_association(self, requester, room_alias):
|
||||
# association deletion for human users
|
||||
def delete_association(self, requester, room_alias, send_event=True):
|
||||
"""Remove an alias from the directory
|
||||
|
||||
(this is only meant for human users; AS users should call
|
||||
delete_appservice_association)
|
||||
|
||||
Args:
|
||||
requester (Requester):
|
||||
room_alias (RoomAlias):
|
||||
send_event (bool): Whether to send an updated m.room.aliases event.
|
||||
Note that, if we delete the canonical alias, we will always attempt
|
||||
to send an m.room.canonical_alias event
|
||||
|
||||
Returns:
|
||||
Deferred[unicode]: room id that the alias used to point to
|
||||
|
||||
Raises:
|
||||
NotFoundError: if the alias doesn't exist
|
||||
|
||||
AuthError: if the user doesn't have perms to delete the alias (ie, the user
|
||||
is neither the creator of the alias, nor a server admin.
|
||||
|
||||
SynapseError: if the alias belongs to an AS
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
try:
|
||||
|
|
@ -168,10 +189,11 @@ class DirectoryHandler(BaseHandler):
|
|||
room_id = yield self._delete_association(room_alias)
|
||||
|
||||
try:
|
||||
yield self.send_room_alias_update_event(
|
||||
requester,
|
||||
room_id
|
||||
)
|
||||
if send_event:
|
||||
yield self.send_room_alias_update_event(
|
||||
requester,
|
||||
room_id
|
||||
)
|
||||
|
||||
yield self._update_canonical_alias(
|
||||
requester,
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ from six import iteritems
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
|
||||
from synapse.api.errors import NotFoundError, RoomKeysVersionError, StoreError
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -55,6 +55,8 @@ class E2eRoomKeysHandler(object):
|
|||
room_id(string): room ID to get keys for, for None to get keys for all rooms
|
||||
session_id(string): session ID to get keys for, for None to get keys for all
|
||||
sessions
|
||||
Raises:
|
||||
NotFoundError: if the backup version does not exist
|
||||
Returns:
|
||||
A deferred list of dicts giving the session_data and message metadata for
|
||||
these room keys.
|
||||
|
|
@ -63,13 +65,19 @@ class E2eRoomKeysHandler(object):
|
|||
# we deliberately take the lock to get keys so that changing the version
|
||||
# works atomically
|
||||
with (yield self._upload_linearizer.queue(user_id)):
|
||||
# make sure the backup version exists
|
||||
try:
|
||||
yield self.store.get_e2e_room_keys_version_info(user_id, version)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
raise NotFoundError("Unknown backup version")
|
||||
else:
|
||||
raise
|
||||
|
||||
results = yield self.store.get_e2e_room_keys(
|
||||
user_id, version, room_id, session_id
|
||||
)
|
||||
|
||||
if results['rooms'] == {}:
|
||||
raise SynapseError(404, "No room_keys found")
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -120,7 +128,7 @@ class E2eRoomKeysHandler(object):
|
|||
}
|
||||
|
||||
Raises:
|
||||
SynapseError: with code 404 if there are no versions defined
|
||||
NotFoundError: if there are no versions defined
|
||||
RoomKeysVersionError: if the uploaded version is not the current version
|
||||
"""
|
||||
|
||||
|
|
@ -134,7 +142,7 @@ class E2eRoomKeysHandler(object):
|
|||
version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
raise SynapseError(404, "Version '%s' not found" % (version,))
|
||||
raise NotFoundError("Version '%s' not found" % (version,))
|
||||
else:
|
||||
raise
|
||||
|
||||
|
|
@ -148,7 +156,7 @@ class E2eRoomKeysHandler(object):
|
|||
raise RoomKeysVersionError(current_version=version_info['version'])
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
raise SynapseError(404, "Version '%s' not found" % (version,))
|
||||
raise NotFoundError("Version '%s' not found" % (version,))
|
||||
else:
|
||||
raise
|
||||
|
||||
|
|
|
|||
|
|
@ -239,7 +239,7 @@ class FederationHandler(BaseHandler):
|
|||
room_id, event_id, min_depth,
|
||||
)
|
||||
|
||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||
prevs = set(pdu.prev_event_ids())
|
||||
seen = yield self.store.have_seen_events(prevs)
|
||||
|
||||
if min_depth and pdu.depth < min_depth:
|
||||
|
|
@ -607,7 +607,7 @@ class FederationHandler(BaseHandler):
|
|||
if e.event_id in seen_ids:
|
||||
continue
|
||||
e.internal_metadata.outlier = True
|
||||
auth_ids = [e_id for e_id, _ in e.auth_events]
|
||||
auth_ids = e.auth_event_ids()
|
||||
auth = {
|
||||
(e.type, e.state_key): e for e in auth_chain
|
||||
if e.event_id in auth_ids or e.type == EventTypes.Create
|
||||
|
|
@ -726,7 +726,7 @@ class FederationHandler(BaseHandler):
|
|||
edges = [
|
||||
ev.event_id
|
||||
for ev in events
|
||||
if set(e_id for e_id, _ in ev.prev_events) - event_ids
|
||||
if set(ev.prev_event_ids()) - event_ids
|
||||
]
|
||||
|
||||
logger.info(
|
||||
|
|
@ -753,7 +753,7 @@ class FederationHandler(BaseHandler):
|
|||
required_auth = set(
|
||||
a_id
|
||||
for event in events + list(state_events.values()) + list(auth_events.values())
|
||||
for a_id, _ in event.auth_events
|
||||
for a_id in event.auth_event_ids()
|
||||
)
|
||||
auth_events.update({
|
||||
e_id: event_map[e_id] for e_id in required_auth if e_id in event_map
|
||||
|
|
@ -769,7 +769,7 @@ class FederationHandler(BaseHandler):
|
|||
auth_events.update(ret_events)
|
||||
|
||||
required_auth.update(
|
||||
a_id for event in ret_events.values() for a_id, _ in event.auth_events
|
||||
a_id for event in ret_events.values() for a_id in event.auth_event_ids()
|
||||
)
|
||||
missing_auth = required_auth - set(auth_events)
|
||||
|
||||
|
|
@ -796,7 +796,7 @@ class FederationHandler(BaseHandler):
|
|||
required_auth.update(
|
||||
a_id
|
||||
for event in results if event
|
||||
for a_id, _ in event.auth_events
|
||||
for a_id in event.auth_event_ids()
|
||||
)
|
||||
missing_auth = required_auth - set(auth_events)
|
||||
|
||||
|
|
@ -816,7 +816,7 @@ class FederationHandler(BaseHandler):
|
|||
"auth_events": {
|
||||
(auth_events[a_id].type, auth_events[a_id].state_key):
|
||||
auth_events[a_id]
|
||||
for a_id, _ in a.auth_events
|
||||
for a_id in a.auth_event_ids()
|
||||
if a_id in auth_events
|
||||
}
|
||||
})
|
||||
|
|
@ -828,7 +828,7 @@ class FederationHandler(BaseHandler):
|
|||
"auth_events": {
|
||||
(auth_events[a_id].type, auth_events[a_id].state_key):
|
||||
auth_events[a_id]
|
||||
for a_id, _ in event_map[e_id].auth_events
|
||||
for a_id in event_map[e_id].auth_event_ids()
|
||||
if a_id in auth_events
|
||||
}
|
||||
})
|
||||
|
|
@ -1041,17 +1041,17 @@ class FederationHandler(BaseHandler):
|
|||
Raises:
|
||||
SynapseError if the event does not pass muster
|
||||
"""
|
||||
if len(ev.prev_events) > 20:
|
||||
if len(ev.prev_event_ids()) > 20:
|
||||
logger.warn("Rejecting event %s which has %i prev_events",
|
||||
ev.event_id, len(ev.prev_events))
|
||||
ev.event_id, len(ev.prev_event_ids()))
|
||||
raise SynapseError(
|
||||
http_client.BAD_REQUEST,
|
||||
"Too many prev_events",
|
||||
)
|
||||
|
||||
if len(ev.auth_events) > 10:
|
||||
if len(ev.auth_event_ids()) > 10:
|
||||
logger.warn("Rejecting event %s which has %i auth_events",
|
||||
ev.event_id, len(ev.auth_events))
|
||||
ev.event_id, len(ev.auth_event_ids()))
|
||||
raise SynapseError(
|
||||
http_client.BAD_REQUEST,
|
||||
"Too many auth_events",
|
||||
|
|
@ -1076,7 +1076,7 @@ class FederationHandler(BaseHandler):
|
|||
def on_event_auth(self, event_id):
|
||||
event = yield self.store.get_event(event_id)
|
||||
auth = yield self.store.get_auth_chain(
|
||||
[auth_id for auth_id, _ in event.auth_events],
|
||||
[auth_id for auth_id in event.auth_event_ids()],
|
||||
include_given=True
|
||||
)
|
||||
defer.returnValue([e for e in auth])
|
||||
|
|
@ -1698,7 +1698,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
missing_auth_events = set()
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
for e_id, _ in e.auth_events:
|
||||
for e_id in e.auth_event_ids():
|
||||
if e_id not in event_map:
|
||||
missing_auth_events.add(e_id)
|
||||
|
||||
|
|
@ -1717,7 +1717,7 @@ class FederationHandler(BaseHandler):
|
|||
for e in itertools.chain(auth_events, state, [event]):
|
||||
auth_for_e = {
|
||||
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
|
||||
for e_id, _ in e.auth_events
|
||||
for e_id in e.auth_event_ids()
|
||||
if e_id in event_map
|
||||
}
|
||||
if create_event:
|
||||
|
|
@ -1785,10 +1785,10 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
# This is a hack to fix some old rooms where the initial join event
|
||||
# didn't reference the create event in its auth events.
|
||||
if event.type == EventTypes.Member and not event.auth_events:
|
||||
if len(event.prev_events) == 1 and event.depth < 5:
|
||||
if event.type == EventTypes.Member and not event.auth_event_ids():
|
||||
if len(event.prev_event_ids()) == 1 and event.depth < 5:
|
||||
c = yield self.store.get_event(
|
||||
event.prev_events[0][0],
|
||||
event.prev_event_ids()[0],
|
||||
allow_none=True,
|
||||
)
|
||||
if c and c.type == EventTypes.Create:
|
||||
|
|
@ -1835,7 +1835,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
# Now get the current auth_chain for the event.
|
||||
local_auth_chain = yield self.store.get_auth_chain(
|
||||
[auth_id for auth_id, _ in event.auth_events],
|
||||
[auth_id for auth_id in event.auth_event_ids()],
|
||||
include_given=True
|
||||
)
|
||||
|
||||
|
|
@ -1891,7 +1891,7 @@ class FederationHandler(BaseHandler):
|
|||
"""
|
||||
# Check if we have all the auth events.
|
||||
current_state = set(e.event_id for e in auth_events.values())
|
||||
event_auth_events = set(e_id for e_id, _ in event.auth_events)
|
||||
event_auth_events = set(event.auth_event_ids())
|
||||
|
||||
if event.is_state():
|
||||
event_key = (event.type, event.state_key)
|
||||
|
|
@ -1935,7 +1935,7 @@ class FederationHandler(BaseHandler):
|
|||
continue
|
||||
|
||||
try:
|
||||
auth_ids = [e_id for e_id, _ in e.auth_events]
|
||||
auth_ids = e.auth_event_ids()
|
||||
auth = {
|
||||
(e.type, e.state_key): e for e in remote_auth_chain
|
||||
if e.event_id in auth_ids or e.type == EventTypes.Create
|
||||
|
|
@ -1956,7 +1956,7 @@ class FederationHandler(BaseHandler):
|
|||
pass
|
||||
|
||||
have_events = yield self.store.get_seen_events_with_rejections(
|
||||
[e_id for e_id, _ in event.auth_events]
|
||||
event.auth_event_ids()
|
||||
)
|
||||
seen_events = set(have_events.keys())
|
||||
except Exception:
|
||||
|
|
@ -2058,7 +2058,7 @@ class FederationHandler(BaseHandler):
|
|||
continue
|
||||
|
||||
try:
|
||||
auth_ids = [e_id for e_id, _ in ev.auth_events]
|
||||
auth_ids = ev.auth_event_ids()
|
||||
auth = {
|
||||
(e.type, e.state_key): e
|
||||
for e in result["auth_chain"]
|
||||
|
|
@ -2250,7 +2250,7 @@ class FederationHandler(BaseHandler):
|
|||
missing_remote_ids = [e.event_id for e in missing_remotes]
|
||||
base_remote_rejected = list(missing_remotes)
|
||||
for e in missing_remotes:
|
||||
for e_id, _ in e.auth_events:
|
||||
for e_id in e.auth_event_ids():
|
||||
if e_id in missing_remote_ids:
|
||||
try:
|
||||
base_remote_rejected.remove(e)
|
||||
|
|
|
|||
|
|
@ -427,6 +427,9 @@ class EventCreationHandler(object):
|
|||
|
||||
if event.is_state():
|
||||
prev_state = yield self.deduplicate_state_event(event, context)
|
||||
logger.info(
|
||||
"Not bothering to persist duplicate state event %s", event.event_id,
|
||||
)
|
||||
if prev_state is not None:
|
||||
defer.returnValue(prev_state)
|
||||
|
||||
|
|
|
|||
|
|
@ -104,6 +104,8 @@ class RoomCreationHandler(BaseHandler):
|
|||
creator_id=user_id, is_public=r["is_public"],
|
||||
)
|
||||
|
||||
logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
|
||||
|
||||
# we create and auth the tombstone event before properly creating the new
|
||||
# room, to check our user has perms in the old room.
|
||||
tombstone_event, tombstone_context = (
|
||||
|
|
@ -136,10 +138,15 @@ class RoomCreationHandler(BaseHandler):
|
|||
requester, tombstone_event, tombstone_context,
|
||||
)
|
||||
|
||||
# and finally, shut down the PLs in the old room, and update them in the new
|
||||
# room.
|
||||
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
|
||||
|
||||
# update any aliases
|
||||
yield self._move_aliases_to_new_room(
|
||||
requester, old_room_id, new_room_id, old_room_state,
|
||||
)
|
||||
|
||||
# and finally, shut down the PLs in the old room, and update them in the new
|
||||
# room.
|
||||
yield self._update_upgraded_room_pls(
|
||||
requester, old_room_id, new_room_id, old_room_state,
|
||||
)
|
||||
|
|
@ -245,11 +252,6 @@ class RoomCreationHandler(BaseHandler):
|
|||
if not self.spam_checker.user_may_create_room(user_id):
|
||||
raise SynapseError(403, "You are not permitted to create rooms")
|
||||
|
||||
# XXX check alias is free
|
||||
# canonical_alias = None
|
||||
|
||||
# XXX create association in directory handler
|
||||
|
||||
creation_content = {
|
||||
"room_version": new_room_version,
|
||||
"predecessor": {
|
||||
|
|
@ -295,7 +297,111 @@ class RoomCreationHandler(BaseHandler):
|
|||
|
||||
# XXX invites/joins
|
||||
# XXX 3pid invites
|
||||
# XXX directory_handler.send_room_alias_update_event
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _move_aliases_to_new_room(
|
||||
self, requester, old_room_id, new_room_id, old_room_state,
|
||||
):
|
||||
directory_handler = self.hs.get_handlers().directory_handler
|
||||
|
||||
aliases = yield self.store.get_aliases_for_room(old_room_id)
|
||||
|
||||
# check to see if we have a canonical alias.
|
||||
canonical_alias = None
|
||||
canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
|
||||
if canonical_alias_event_id:
|
||||
canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
|
||||
if canonical_alias_event:
|
||||
canonical_alias = canonical_alias_event.content.get("alias", "")
|
||||
|
||||
# first we try to remove the aliases from the old room (we suppress sending
|
||||
# the room_aliases event until the end).
|
||||
#
|
||||
# Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
|
||||
# and (b) unless the user is a server admin, which the user created.
|
||||
#
|
||||
# This is probably correct - given we don't allow such aliases to be deleted
|
||||
# normally, it would be odd to allow it in the case of doing a room upgrade -
|
||||
# but it makes the upgrade less effective, and you have to wonder why a room
|
||||
# admin can't remove aliases that point to that room anyway.
|
||||
# (cf https://github.com/matrix-org/synapse/issues/2360)
|
||||
#
|
||||
removed_aliases = []
|
||||
for alias_str in aliases:
|
||||
alias = RoomAlias.from_string(alias_str)
|
||||
try:
|
||||
yield directory_handler.delete_association(
|
||||
requester, alias, send_event=False,
|
||||
)
|
||||
removed_aliases.append(alias_str)
|
||||
except SynapseError as e:
|
||||
logger.warning(
|
||||
"Unable to remove alias %s from old room: %s",
|
||||
alias, e,
|
||||
)
|
||||
|
||||
# if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
|
||||
# of this.
|
||||
if not removed_aliases:
|
||||
return
|
||||
|
||||
try:
|
||||
# this can fail if, for some reason, our user doesn't have perms to send
|
||||
# m.room.aliases events in the old room (note that we've already checked that
|
||||
# they have perms to send a tombstone event, so that's not terribly likely).
|
||||
#
|
||||
# If that happens, it's regrettable, but we should carry on: it's the same
|
||||
# as when you remove an alias from the directory normally - it just means that
|
||||
# the aliases event gets out of sync with the directory
|
||||
# (cf https://github.com/vector-im/riot-web/issues/2369)
|
||||
yield directory_handler.send_room_alias_update_event(
|
||||
requester, old_room_id,
|
||||
)
|
||||
except AuthError as e:
|
||||
logger.warning(
|
||||
"Failed to send updated alias event on old room: %s", e,
|
||||
)
|
||||
|
||||
# we can now add any aliases we successfully removed to the new room.
|
||||
for alias in removed_aliases:
|
||||
try:
|
||||
yield directory_handler.create_association(
|
||||
requester, RoomAlias.from_string(alias),
|
||||
new_room_id, servers=(self.hs.hostname, ),
|
||||
send_event=False,
|
||||
)
|
||||
logger.info("Moved alias %s to new room", alias)
|
||||
except SynapseError as e:
|
||||
# I'm not really expecting this to happen, but it could if the spam
|
||||
# checking module decides it shouldn't, or similar.
|
||||
logger.error(
|
||||
"Error adding alias %s to new room: %s",
|
||||
alias, e,
|
||||
)
|
||||
|
||||
try:
|
||||
if canonical_alias and (canonical_alias in removed_aliases):
|
||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||
requester,
|
||||
{
|
||||
"type": EventTypes.CanonicalAlias,
|
||||
"state_key": "",
|
||||
"room_id": new_room_id,
|
||||
"sender": requester.user.to_string(),
|
||||
"content": {"alias": canonical_alias, },
|
||||
},
|
||||
ratelimit=False
|
||||
)
|
||||
|
||||
yield directory_handler.send_room_alias_update_event(
|
||||
requester, new_room_id,
|
||||
)
|
||||
except SynapseError as e:
|
||||
# again I'm not really expecting this to fail, but if it does, I'd rather
|
||||
# we returned the new room to the client at this point.
|
||||
logger.error(
|
||||
"Unable to send updated alias events in new room: %s", e,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def create_room(self, requester, config, ratelimit=True,
|
||||
|
|
@ -522,6 +628,7 @@ class RoomCreationHandler(BaseHandler):
|
|||
@defer.inlineCallbacks
|
||||
def send(etype, content, **kwargs):
|
||||
event = create(etype, content, **kwargs)
|
||||
logger.info("Sending %s in new room", etype)
|
||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||
creator,
|
||||
event,
|
||||
|
|
@ -544,6 +651,7 @@ class RoomCreationHandler(BaseHandler):
|
|||
content=creation_content,
|
||||
)
|
||||
|
||||
logger.info("Sending %s in new room", EventTypes.Member)
|
||||
yield self.room_member_handler.update_membership(
|
||||
creator,
|
||||
creator.user,
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership
|
|||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
|
@ -324,9 +325,12 @@ class SearchHandler(BaseHandler):
|
|||
else:
|
||||
last_event_id = event.event_id
|
||||
|
||||
state_filter = StateFilter.from_types(
|
||||
[(EventTypes.Member, sender) for sender in senders]
|
||||
)
|
||||
|
||||
state = yield self.store.get_state_for_event(
|
||||
last_event_id,
|
||||
types=[(EventTypes.Member, sender) for sender in senders]
|
||||
last_event_id, state_filter
|
||||
)
|
||||
|
||||
res["profile_info"] = {
|
||||
|
|
|
|||
|
|
@ -63,11 +63,8 @@ class TypingHandler(object):
|
|||
self._member_typing_until = {} # clock time we expect to stop
|
||||
self._member_last_federation_poke = {}
|
||||
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
self._latest_room_serial = 0
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
self._reset()
|
||||
|
||||
# caches which room_ids changed at which serials
|
||||
self._typing_stream_change_cache = StreamChangeCache(
|
||||
|
|
@ -79,6 +76,15 @@ class TypingHandler(object):
|
|||
5000,
|
||||
)
|
||||
|
||||
def _reset(self):
|
||||
"""
|
||||
Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
|
||||
def _handle_timeouts(self):
|
||||
logger.info("Checking for typing timeouts")
|
||||
|
||||
|
|
|
|||
|
|
@ -468,13 +468,13 @@ def set_cors_headers(request):
|
|||
Args:
|
||||
request (twisted.web.http.Request): The http request to add CORs to.
|
||||
"""
|
||||
request.setHeader("Access-Control-Allow-Origin", "*")
|
||||
request.setHeader(b"Access-Control-Allow-Origin", b"*")
|
||||
request.setHeader(
|
||||
"Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"
|
||||
b"Access-Control-Allow-Methods", b"GET, POST, PUT, DELETE, OPTIONS"
|
||||
)
|
||||
request.setHeader(
|
||||
"Access-Control-Allow-Headers",
|
||||
"Origin, X-Requested-With, Content-Type, Accept, Authorization"
|
||||
b"Access-Control-Allow-Headers",
|
||||
b"Origin, X-Requested-With, Content-Type, Accept, Authorization"
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -121,16 +121,15 @@ def parse_string(request, name, default=None, required=False,
|
|||
|
||||
Args:
|
||||
request: the twisted HTTP request.
|
||||
name (bytes/unicode): the name of the query parameter.
|
||||
default (bytes/unicode|None): value to use if the parameter is absent,
|
||||
name (bytes|unicode): the name of the query parameter.
|
||||
default (bytes|unicode|None): value to use if the parameter is absent,
|
||||
defaults to None. Must be bytes if encoding is None.
|
||||
required (bool): whether to raise a 400 SynapseError if the
|
||||
parameter is absent, defaults to False.
|
||||
allowed_values (list[bytes/unicode]): List of allowed values for the
|
||||
allowed_values (list[bytes|unicode]): List of allowed values for the
|
||||
string, or None if any value is allowed, defaults to None. Must be
|
||||
the same type as name, if given.
|
||||
encoding: The encoding to decode the name to, and decode the string
|
||||
content with.
|
||||
encoding (str|None): The encoding to decode the string content with.
|
||||
|
||||
Returns:
|
||||
bytes/unicode|None: A string value or the default. Unicode if encoding
|
||||
|
|
|
|||
|
|
@ -85,7 +85,10 @@ class EmailPusher(object):
|
|||
self.timed_call = None
|
||||
|
||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
|
||||
if self.max_stream_ordering:
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
|
||||
else:
|
||||
self.max_stream_ordering = max_stream_ordering
|
||||
self._start_processing()
|
||||
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id):
|
||||
|
|
|
|||
|
|
@ -311,10 +311,10 @@ class HttpPusher(object):
|
|||
]
|
||||
}
|
||||
}
|
||||
if event.type == 'm.room.member':
|
||||
if event.type == 'm.room.member' and event.is_state():
|
||||
d['notification']['membership'] = event.content['membership']
|
||||
d['notification']['user_is_target'] = event.state_key == self.user_id
|
||||
if self.hs.config.push_include_content and 'content' in event:
|
||||
if self.hs.config.push_include_content and event.content:
|
||||
d['notification']['content'] = event.content
|
||||
|
||||
# We no longer send aliases separately, instead, we send the human
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import bleach
|
|||
import jinja2
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.mail.smtp import sendmail
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import StoreError
|
||||
|
|
@ -85,6 +84,7 @@ class Mailer(object):
|
|||
self.notif_template_html = notif_template_html
|
||||
self.notif_template_text = notif_template_text
|
||||
|
||||
self.sendmail = self.hs.get_sendmail()
|
||||
self.store = self.hs.get_datastore()
|
||||
self.macaroon_gen = self.hs.get_macaroon_generator()
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
|
|
@ -191,11 +191,11 @@ class Mailer(object):
|
|||
multipart_msg.attach(html_part)
|
||||
|
||||
logger.info("Sending email push notification to %s" % email_address)
|
||||
# logger.debug(html_text)
|
||||
|
||||
yield sendmail(
|
||||
yield self.sendmail(
|
||||
self.hs.config.email_smtp_host,
|
||||
raw_from, raw_to, multipart_msg.as_string(),
|
||||
raw_from, raw_to, multipart_msg.as_string().encode('utf8'),
|
||||
reactor=self.hs.get_reactor(),
|
||||
port=self.hs.config.email_smtp_port,
|
||||
requireAuthentication=self.hs.config.email_smtp_user is not None,
|
||||
username=self.hs.config.email_smtp_user,
|
||||
|
|
@ -333,7 +333,7 @@ class Mailer(object):
|
|||
notif_events, user_id, reason):
|
||||
if len(notifs_by_room) == 1:
|
||||
# Only one room has new stuff
|
||||
room_id = notifs_by_room.keys()[0]
|
||||
room_id = list(notifs_by_room.keys())[0]
|
||||
|
||||
# If the room has some kind of name, use it, but we don't
|
||||
# want the generated-from-names one here otherwise we'll
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ class PushRuleEvaluatorForEvent(object):
|
|||
|
||||
# XXX: optimisation: cache our pattern regexps
|
||||
if condition['key'] == 'content.body':
|
||||
body = self._event["content"].get("body", None)
|
||||
body = self._event.content.get("body", None)
|
||||
if not body:
|
||||
return False
|
||||
|
||||
|
|
@ -140,7 +140,7 @@ class PushRuleEvaluatorForEvent(object):
|
|||
if not display_name:
|
||||
return False
|
||||
|
||||
body = self._event["content"].get("body", None)
|
||||
body = self._event.content.get("body", None)
|
||||
if not body:
|
||||
return False
|
||||
|
||||
|
|
|
|||
|
|
@ -68,6 +68,29 @@ function captchaDone() {
|
|||
</html>
|
||||
"""
|
||||
|
||||
TERMS_TEMPLATE = """
|
||||
<html>
|
||||
<head>
|
||||
<title>Authentication</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
</head>
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="%(myurl)s">
|
||||
<div>
|
||||
<p>
|
||||
Please click the button below if you agree to the
|
||||
<a href="%(terms_url)s">privacy policy of this homeserver.</a>
|
||||
</p>
|
||||
<input type="hidden" name="session" value="%(session)s" />
|
||||
<input type="submit" value="Agree" />
|
||||
</div>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
SUCCESS_TEMPLATE = """
|
||||
<html>
|
||||
<head>
|
||||
|
|
@ -130,6 +153,27 @@ class AuthRestServlet(RestServlet):
|
|||
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
|
||||
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
|
||||
|
||||
request.write(html_bytes)
|
||||
finish_request(request)
|
||||
defer.returnValue(None)
|
||||
elif stagetype == LoginType.TERMS:
|
||||
session = request.args['session'][0]
|
||||
|
||||
html = TERMS_TEMPLATE % {
|
||||
'session': session,
|
||||
'terms_url': "%s/_matrix/consent?v=%s" % (
|
||||
self.hs.config.public_baseurl,
|
||||
self.hs.config.user_consent_version,
|
||||
),
|
||||
'myurl': "%s/auth/%s/fallback/web" % (
|
||||
CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS
|
||||
),
|
||||
}
|
||||
html_bytes = html.encode("utf8")
|
||||
request.setResponseCode(200)
|
||||
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
|
||||
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
|
||||
|
||||
request.write(html_bytes)
|
||||
finish_request(request)
|
||||
defer.returnValue(None)
|
||||
|
|
@ -139,7 +183,7 @@ class AuthRestServlet(RestServlet):
|
|||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, stagetype):
|
||||
yield
|
||||
if stagetype == "m.login.recaptcha":
|
||||
if stagetype == LoginType.RECAPTCHA:
|
||||
if ('g-recaptcha-response' not in request.args or
|
||||
len(request.args['g-recaptcha-response'])) == 0:
|
||||
raise SynapseError(400, "No captcha response supplied")
|
||||
|
|
@ -178,6 +222,41 @@ class AuthRestServlet(RestServlet):
|
|||
request.write(html_bytes)
|
||||
finish_request(request)
|
||||
|
||||
defer.returnValue(None)
|
||||
elif stagetype == LoginType.TERMS:
|
||||
if ('session' not in request.args or
|
||||
len(request.args['session'])) == 0:
|
||||
raise SynapseError(400, "No session supplied")
|
||||
|
||||
session = request.args['session'][0]
|
||||
authdict = {'session': session}
|
||||
|
||||
success = yield self.auth_handler.add_oob_auth(
|
||||
LoginType.TERMS,
|
||||
authdict,
|
||||
self.hs.get_ip_from_request(request)
|
||||
)
|
||||
|
||||
if success:
|
||||
html = SUCCESS_TEMPLATE
|
||||
else:
|
||||
html = TERMS_TEMPLATE % {
|
||||
'session': session,
|
||||
'terms_url': "%s/_matrix/consent?v=%s" % (
|
||||
self.hs.config.public_baseurl,
|
||||
self.hs.config.user_consent_version,
|
||||
),
|
||||
'myurl': "%s/auth/%s/fallback/web" % (
|
||||
CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS
|
||||
),
|
||||
}
|
||||
html_bytes = html.encode("utf8")
|
||||
request.setResponseCode(200)
|
||||
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
|
||||
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
|
||||
|
||||
request.write(html_bytes)
|
||||
finish_request(request)
|
||||
defer.returnValue(None)
|
||||
else:
|
||||
raise SynapseError(404, "Unknown auth stage type")
|
||||
|
|
|
|||
|
|
@ -359,6 +359,13 @@ class RegisterRestServlet(RestServlet):
|
|||
[LoginType.MSISDN, LoginType.EMAIL_IDENTITY]
|
||||
])
|
||||
|
||||
# Append m.login.terms to all flows if we're requiring consent
|
||||
if self.hs.config.user_consent_at_registration:
|
||||
new_flows = []
|
||||
for flow in flows:
|
||||
flow.append(LoginType.TERMS)
|
||||
flows.extend(new_flows)
|
||||
|
||||
auth_result, params, session_id = yield self.auth_handler.check_auth(
|
||||
flows, body, self.hs.get_ip_from_request(request)
|
||||
)
|
||||
|
|
@ -445,6 +452,12 @@ class RegisterRestServlet(RestServlet):
|
|||
params.get("bind_msisdn")
|
||||
)
|
||||
|
||||
if auth_result and LoginType.TERMS in auth_result:
|
||||
logger.info("%s has consented to the privacy policy" % registered_user_id)
|
||||
yield self.store.user_set_consent_version(
|
||||
registered_user_id, self.hs.config.user_consent_version,
|
||||
)
|
||||
|
||||
defer.returnValue((200, return_dict))
|
||||
|
||||
def on_OPTIONS(self, _):
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import logging
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
parse_json_object_from_request,
|
||||
|
|
@ -208,10 +208,25 @@ class RoomKeysServlet(RestServlet):
|
|||
user_id, version, room_id, session_id
|
||||
)
|
||||
|
||||
# Convert room_keys to the right format to return.
|
||||
if session_id:
|
||||
room_keys = room_keys['rooms'][room_id]['sessions'][session_id]
|
||||
# If the client requests a specific session, but that session was
|
||||
# not backed up, then return an M_NOT_FOUND.
|
||||
if room_keys['rooms'] == {}:
|
||||
raise NotFoundError("No room_keys found")
|
||||
else:
|
||||
room_keys = room_keys['rooms'][room_id]['sessions'][session_id]
|
||||
elif room_id:
|
||||
room_keys = room_keys['rooms'][room_id]
|
||||
# If the client requests all sessions from a room, but no sessions
|
||||
# are found, then return an empty result rather than an error, so
|
||||
# that clients don't have to handle an error condition, and an
|
||||
# empty result is valid. (Similarly if the client requests all
|
||||
# sessions from the backup, but in that case, room_keys is already
|
||||
# in the right format, so we don't need to do anything about it.)
|
||||
if room_keys['rooms'] == {}:
|
||||
room_keys = {'sessions': {}}
|
||||
else:
|
||||
room_keys = room_keys['rooms'][room_id]
|
||||
|
||||
defer.returnValue((200, room_keys))
|
||||
|
||||
|
|
|
|||
|
|
@ -137,27 +137,33 @@ class ConsentResource(Resource):
|
|||
request (twisted.web.http.Request):
|
||||
"""
|
||||
|
||||
version = parse_string(request, "v",
|
||||
default=self._default_consent_version)
|
||||
username = parse_string(request, "u", required=True)
|
||||
userhmac = parse_string(request, "h", required=True, encoding=None)
|
||||
version = parse_string(request, "v", default=self._default_consent_version)
|
||||
username = parse_string(request, "u", required=False, default="")
|
||||
userhmac = None
|
||||
has_consented = False
|
||||
public_version = username == ""
|
||||
if not public_version or not self.hs.config.user_consent_at_registration:
|
||||
userhmac = parse_string(request, "h", required=True, encoding=None)
|
||||
|
||||
self._check_hash(username, userhmac)
|
||||
self._check_hash(username, userhmac)
|
||||
|
||||
if username.startswith('@'):
|
||||
qualified_user_id = username
|
||||
else:
|
||||
qualified_user_id = UserID(username, self.hs.hostname).to_string()
|
||||
if username.startswith('@'):
|
||||
qualified_user_id = username
|
||||
else:
|
||||
qualified_user_id = UserID(username, self.hs.hostname).to_string()
|
||||
|
||||
u = yield self.store.get_user_by_id(qualified_user_id)
|
||||
if u is None:
|
||||
raise NotFoundError("Unknown user")
|
||||
u = yield self.store.get_user_by_id(qualified_user_id)
|
||||
if u is None:
|
||||
raise NotFoundError("Unknown user")
|
||||
has_consented = u["consent_version"] == version
|
||||
|
||||
try:
|
||||
self._render_template(
|
||||
request, "%s.html" % (version,),
|
||||
user=username, userhmac=userhmac, version=version,
|
||||
has_consented=(u["consent_version"] == version),
|
||||
user=username,
|
||||
userhmac=userhmac.decode('ascii'),
|
||||
version=version,
|
||||
has_consented=has_consented, public_version=public_version,
|
||||
)
|
||||
except TemplateNotFound:
|
||||
raise NotFoundError("Unknown policy version")
|
||||
|
|
@ -223,7 +229,7 @@ class ConsentResource(Resource):
|
|||
key=self._hmac_secret,
|
||||
msg=userid.encode('utf-8'),
|
||||
digestmod=sha256,
|
||||
).hexdigest()
|
||||
).hexdigest().encode('ascii')
|
||||
|
||||
if not compare_digest(want_mac, userhmac):
|
||||
raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect")
|
||||
|
|
|
|||
|
|
@ -1,14 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
|
@ -1,92 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import logging
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
from signedjson.sign import sign_json
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from OpenSSL import crypto
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from synapse.http.server import respond_with_json_bytes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LocalKey(Resource):
|
||||
"""HTTP resource containing encoding the TLS X.509 certificate and NACL
|
||||
signature verification keys for this server::
|
||||
|
||||
GET /key HTTP/1.1
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
{
|
||||
"server_name": "this.server.example.com"
|
||||
"verify_keys": {
|
||||
"algorithm:version": # base64 encoded NACL verification key.
|
||||
},
|
||||
"tls_certificate": # base64 ASN.1 DER encoded X.509 tls cert.
|
||||
"signatures": {
|
||||
"this.server.example.com": {
|
||||
"algorithm:version": # NACL signature for this server.
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.response_body = encode_canonical_json(
|
||||
self.response_json_object(hs.config)
|
||||
)
|
||||
Resource.__init__(self)
|
||||
|
||||
@staticmethod
|
||||
def response_json_object(server_config):
|
||||
verify_keys = {}
|
||||
for key in server_config.signing_key:
|
||||
verify_key_bytes = key.verify_key.encode()
|
||||
key_id = "%s:%s" % (key.alg, key.version)
|
||||
verify_keys[key_id] = encode_base64(verify_key_bytes)
|
||||
|
||||
x509_certificate_bytes = crypto.dump_certificate(
|
||||
crypto.FILETYPE_ASN1,
|
||||
server_config.tls_certificate
|
||||
)
|
||||
json_object = {
|
||||
u"server_name": server_config.server_name,
|
||||
u"verify_keys": verify_keys,
|
||||
u"tls_certificate": encode_base64(x509_certificate_bytes)
|
||||
}
|
||||
for key in server_config.signing_key:
|
||||
json_object = sign_json(
|
||||
json_object,
|
||||
server_config.server_name,
|
||||
key,
|
||||
)
|
||||
|
||||
return json_object
|
||||
|
||||
def render_GET(self, request):
|
||||
return respond_with_json_bytes(
|
||||
request, 200, self.response_body,
|
||||
)
|
||||
|
||||
def getChild(self, name, request):
|
||||
if name == b'':
|
||||
return self
|
||||
|
|
@ -12,6 +12,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import cgi
|
||||
import datetime
|
||||
import errno
|
||||
|
|
@ -24,6 +25,7 @@ import shutil
|
|||
import sys
|
||||
import traceback
|
||||
|
||||
import six
|
||||
from six import string_types
|
||||
from six.moves import urllib_parse as urlparse
|
||||
|
||||
|
|
@ -98,7 +100,7 @@ class PreviewUrlResource(Resource):
|
|||
# XXX: if get_user_by_req fails, what should we do in an async render?
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
url = parse_string(request, "url")
|
||||
if "ts" in request.args:
|
||||
if b"ts" in request.args:
|
||||
ts = parse_integer(request, "ts")
|
||||
else:
|
||||
ts = self.clock.time_msec()
|
||||
|
|
@ -180,7 +182,12 @@ class PreviewUrlResource(Resource):
|
|||
cache_result["expires_ts"] > ts and
|
||||
cache_result["response_code"] / 100 == 2
|
||||
):
|
||||
defer.returnValue(cache_result["og"])
|
||||
# It may be stored as text in the database, not as bytes (such as
|
||||
# PostgreSQL). If so, encode it back before handing it on.
|
||||
og = cache_result["og"]
|
||||
if isinstance(og, six.text_type):
|
||||
og = og.encode('utf8')
|
||||
defer.returnValue(og)
|
||||
return
|
||||
|
||||
media_info = yield self._download_url(url, user)
|
||||
|
|
@ -213,14 +220,17 @@ class PreviewUrlResource(Resource):
|
|||
elif _is_html(media_info['media_type']):
|
||||
# TODO: somehow stop a big HTML tree from exploding synapse's RAM
|
||||
|
||||
file = open(media_info['filename'])
|
||||
body = file.read()
|
||||
file.close()
|
||||
with open(media_info['filename'], 'rb') as file:
|
||||
body = file.read()
|
||||
|
||||
# clobber the encoding from the content-type, or default to utf-8
|
||||
# XXX: this overrides any <meta/> or XML charset headers in the body
|
||||
# which may pose problems, but so far seems to work okay.
|
||||
match = re.match(r'.*; *charset=(.*?)(;|$)', media_info['media_type'], re.I)
|
||||
match = re.match(
|
||||
r'.*; *charset="?(.*?)"?(;|$)',
|
||||
media_info['media_type'],
|
||||
re.I
|
||||
)
|
||||
encoding = match.group(1) if match else "utf-8"
|
||||
|
||||
og = decode_and_calc_og(body, media_info['uri'], encoding)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import abc
|
|||
import logging
|
||||
|
||||
from twisted.enterprise import adbapi
|
||||
from twisted.mail.smtp import sendmail
|
||||
from twisted.web.client import BrowserLikePolicyForHTTPS
|
||||
|
||||
from synapse.api.auth import Auth
|
||||
|
|
@ -174,6 +175,7 @@ class HomeServer(object):
|
|||
'message_handler',
|
||||
'pagination_handler',
|
||||
'room_context_handler',
|
||||
'sendmail',
|
||||
]
|
||||
|
||||
# This is overridden in derived application classes
|
||||
|
|
@ -269,6 +271,9 @@ class HomeServer(object):
|
|||
def build_room_creation_handler(self):
|
||||
return RoomCreationHandler(self)
|
||||
|
||||
def build_sendmail(self):
|
||||
return sendmail
|
||||
|
||||
def build_state_handler(self):
|
||||
return StateHandler(self)
|
||||
|
||||
|
|
|
|||
|
|
@ -261,7 +261,7 @@ class StateHandler(object):
|
|||
logger.debug("calling resolve_state_groups from compute_event_context")
|
||||
|
||||
entry = yield self.resolve_state_groups_for_events(
|
||||
event.room_id, [e for e, _ in event.prev_events],
|
||||
event.room_id, event.prev_event_ids(),
|
||||
)
|
||||
|
||||
prev_state_ids = entry.state
|
||||
|
|
@ -607,7 +607,7 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
|
|||
return v1.resolve_events_with_store(
|
||||
state_sets, event_map, state_res_store.get_events,
|
||||
)
|
||||
elif room_version == RoomVersions.VDH_TEST:
|
||||
elif room_version in (RoomVersions.VDH_TEST, RoomVersions.STATE_V2_TEST):
|
||||
return v2.resolve_events_with_store(
|
||||
state_sets, event_map, state_res_store,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -53,6 +53,10 @@ def resolve_events_with_store(state_sets, event_map, state_res_store):
|
|||
|
||||
logger.debug("Computing conflicted state")
|
||||
|
||||
# We use event_map as a cache, so if its None we need to initialize it
|
||||
if event_map is None:
|
||||
event_map = {}
|
||||
|
||||
# First split up the un/conflicted state
|
||||
unconflicted_state, conflicted_state = _seperate(state_sets)
|
||||
|
||||
|
|
@ -155,7 +159,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store):
|
|||
event = yield _get_event(event_id, event_map, state_res_store)
|
||||
|
||||
pl = None
|
||||
for aid, _ in event.auth_events:
|
||||
for aid in event.auth_event_ids():
|
||||
aev = yield _get_event(aid, event_map, state_res_store)
|
||||
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
|
||||
pl = aev
|
||||
|
|
@ -163,7 +167,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store):
|
|||
|
||||
if pl is None:
|
||||
# Couldn't find power level. Check if they're the creator of the room
|
||||
for aid, _ in event.auth_events:
|
||||
for aid in event.auth_event_ids():
|
||||
aev = yield _get_event(aid, event_map, state_res_store)
|
||||
if (aev.type, aev.state_key) == (EventTypes.Create, ""):
|
||||
if aev.content.get("creator") == event.sender:
|
||||
|
|
@ -295,7 +299,7 @@ def _add_event_and_auth_chain_to_graph(graph, event_id, event_map,
|
|||
graph.setdefault(eid, set())
|
||||
|
||||
event = yield _get_event(eid, event_map, state_res_store)
|
||||
for aid, _ in event.auth_events:
|
||||
for aid in event.auth_event_ids():
|
||||
if aid in auth_diff:
|
||||
if aid not in graph:
|
||||
state.append(aid)
|
||||
|
|
@ -365,7 +369,7 @@ def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
|
|||
event = event_map[event_id]
|
||||
|
||||
auth_events = {}
|
||||
for aid, _ in event.auth_events:
|
||||
for aid in event.auth_event_ids():
|
||||
ev = yield _get_event(aid, event_map, state_res_store)
|
||||
|
||||
if ev.rejected_reason is None:
|
||||
|
|
@ -413,9 +417,9 @@ def _mainline_sort(event_ids, resolved_power_event_id, event_map,
|
|||
while pl:
|
||||
mainline.append(pl)
|
||||
pl_ev = yield _get_event(pl, event_map, state_res_store)
|
||||
auth_events = pl_ev.auth_events
|
||||
auth_events = pl_ev.auth_event_ids()
|
||||
pl = None
|
||||
for aid, _ in auth_events:
|
||||
for aid in auth_events:
|
||||
ev = yield _get_event(aid, event_map, state_res_store)
|
||||
if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
|
||||
pl = aid
|
||||
|
|
@ -460,10 +464,10 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor
|
|||
if depth is not None:
|
||||
defer.returnValue(depth)
|
||||
|
||||
auth_events = event.auth_events
|
||||
auth_events = event.auth_event_ids()
|
||||
event = None
|
||||
|
||||
for aid, _ in auth_events:
|
||||
for aid in auth_events:
|
||||
aev = yield _get_event(aid, event_map, state_res_store)
|
||||
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
|
||||
event = aev
|
||||
|
|
|
|||
|
|
@ -22,14 +22,19 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
|
||||
|
||||
from ._base import Cache, SQLBaseStore, db_to_json
|
||||
from ._base import Cache, db_to_json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
|
||||
"drop_device_list_streams_non_unique_indexes"
|
||||
)
|
||||
|
||||
class DeviceStore(SQLBaseStore):
|
||||
|
||||
class DeviceStore(BackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(DeviceStore, self).__init__(db_conn, hs)
|
||||
|
||||
|
|
@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
|
|||
columns=["user_id", "device_id"],
|
||||
)
|
||||
|
||||
# create a unique index on device_lists_remote_cache
|
||||
self.register_background_index_update(
|
||||
"device_lists_remote_cache_unique_idx",
|
||||
index_name="device_lists_remote_cache_unique_id",
|
||||
table="device_lists_remote_cache",
|
||||
columns=["user_id", "device_id"],
|
||||
unique=True,
|
||||
)
|
||||
|
||||
# And one on device_lists_remote_extremeties
|
||||
self.register_background_index_update(
|
||||
"device_lists_remote_extremeties_unique_idx",
|
||||
index_name="device_lists_remote_extremeties_unique_idx",
|
||||
table="device_lists_remote_extremeties",
|
||||
columns=["user_id"],
|
||||
unique=True,
|
||||
)
|
||||
|
||||
# once they complete, we can remove the old non-unique indexes.
|
||||
self.register_background_update_handler(
|
||||
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
|
||||
self._drop_device_list_streams_non_unique_indexes,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def store_device(self, user_id, device_id,
|
||||
initial_device_display_name):
|
||||
|
|
@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
|
|||
|
||||
def update_remote_device_list_cache_entry(self, user_id, device_id, content,
|
||||
stream_id):
|
||||
"""Updates a single user's device in the cache.
|
||||
"""Updates a single device in the cache of a remote user's devicelist.
|
||||
|
||||
Note: assumes that we are the only thread that can be updating this user's
|
||||
device list.
|
||||
|
||||
Args:
|
||||
user_id (str): User to update device list for
|
||||
device_id (str): ID of decivice being updated
|
||||
content (dict): new data on this device
|
||||
stream_id (int): the version of the device list
|
||||
|
||||
Returns:
|
||||
Deferred[None]
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"update_remote_device_list_cache_entry",
|
||||
|
|
@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
|
|||
},
|
||||
values={
|
||||
"content": json.dumps(content),
|
||||
}
|
||||
},
|
||||
|
||||
# we don't need to lock, because we assume we are the only thread
|
||||
# updating this user's devices.
|
||||
lock=False,
|
||||
)
|
||||
|
||||
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
|
||||
|
|
@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
|
|||
},
|
||||
values={
|
||||
"stream_id": stream_id,
|
||||
}
|
||||
},
|
||||
|
||||
# again, we can assume we are the only thread updating this user's
|
||||
# extremity.
|
||||
lock=False,
|
||||
)
|
||||
|
||||
def update_remote_device_list_cache(self, user_id, devices, stream_id):
|
||||
"""Replace the cache of the remote user's devices.
|
||||
"""Replace the entire cache of the remote user's devices.
|
||||
|
||||
Note: assumes that we are the only thread that can be updating this user's
|
||||
device list.
|
||||
|
||||
Args:
|
||||
user_id (str): User to update device list for
|
||||
devices (list[dict]): list of device objects supplied over federation
|
||||
stream_id (int): the version of the device list
|
||||
|
||||
Returns:
|
||||
Deferred[None]
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"update_remote_device_list_cache",
|
||||
|
|
@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
|
|||
},
|
||||
values={
|
||||
"stream_id": stream_id,
|
||||
}
|
||||
},
|
||||
|
||||
# we don't need to lock, because we can assume we are the only thread
|
||||
# updating this user's extremity.
|
||||
lock=False,
|
||||
)
|
||||
|
||||
def get_devices_by_remote(self, destination, from_stream_id):
|
||||
|
|
@ -722,3 +786,19 @@ class DeviceStore(SQLBaseStore):
|
|||
"_prune_old_outbound_device_pokes",
|
||||
_prune_txn,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
|
||||
def f(conn):
|
||||
txn = conn.cursor()
|
||||
txn.execute(
|
||||
"DROP INDEX IF EXISTS device_lists_remote_cache_id"
|
||||
)
|
||||
txn.execute(
|
||||
"DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
|
||||
)
|
||||
txn.close()
|
||||
|
||||
yield self.runWithConnection(f)
|
||||
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
|
||||
defer.returnValue(1)
|
||||
|
|
|
|||
|
|
@ -40,7 +40,10 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
allow_none=True,
|
||||
)
|
||||
|
||||
new_key_json = encode_canonical_json(device_keys)
|
||||
# In py3 we need old_key_json to match new_key_json type. The DB
|
||||
# returns unicode while encode_canonical_json returns bytes.
|
||||
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
|
||||
|
||||
if old_key_json == new_key_json:
|
||||
return False
|
||||
|
||||
|
|
|
|||
|
|
@ -477,7 +477,7 @@ class EventFederationStore(EventFederationWorkerStore):
|
|||
"is_state": False,
|
||||
}
|
||||
for ev in events
|
||||
for e_id, _ in ev.prev_events
|
||||
for e_id in ev.prev_event_ids()
|
||||
],
|
||||
)
|
||||
|
||||
|
|
@ -510,7 +510,7 @@ class EventFederationStore(EventFederationWorkerStore):
|
|||
|
||||
txn.executemany(query, [
|
||||
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
|
||||
for ev in events for e_id, _ in ev.prev_events
|
||||
for ev in events for e_id in ev.prev_event_ids()
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
])
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ from synapse.state import StateResolutionStore
|
|||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.event_federation import EventFederationStore
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.storage.state import StateGroupWorkerStore
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util import batch_iter
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
|
|
@ -205,7 +206,8 @@ def _retry_on_integrity_error(func):
|
|||
|
||||
# inherits from EventFederationStore so that we can call _update_backward_extremities
|
||||
# and _handle_mult_prev_events (though arguably those could both be moved in here)
|
||||
class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
|
||||
class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
|
||||
BackgroundUpdateStore):
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
|
||||
|
|
@ -414,7 +416,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
)
|
||||
if len_1:
|
||||
all_single_prev_not_state = all(
|
||||
len(event.prev_events) == 1
|
||||
len(event.prev_event_ids()) == 1
|
||||
and not event.is_state()
|
||||
for event, ctx in ev_ctx_rm
|
||||
)
|
||||
|
|
@ -438,7 +440,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
# guess this by looking at the prev_events and checking
|
||||
# if they match the current forward extremities.
|
||||
for ev, _ in ev_ctx_rm:
|
||||
prev_event_ids = set(e for e, _ in ev.prev_events)
|
||||
prev_event_ids = set(ev.prev_event_ids())
|
||||
if latest_event_ids == prev_event_ids:
|
||||
state_delta_reuse_delta_counter.inc()
|
||||
break
|
||||
|
|
@ -549,7 +551,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
result.difference_update(
|
||||
e_id
|
||||
for event in new_events
|
||||
for e_id, _ in event.prev_events
|
||||
for e_id in event.prev_event_ids()
|
||||
)
|
||||
|
||||
# Finally, remove any events which are prev_events of any existing events.
|
||||
|
|
@ -867,7 +869,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
"auth_id": auth_id,
|
||||
}
|
||||
for event, _ in events_and_contexts
|
||||
for auth_id, _ in event.auth_events
|
||||
for auth_id in event.auth_event_ids()
|
||||
if event.is_state()
|
||||
],
|
||||
)
|
||||
|
|
@ -2034,55 +2036,37 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
|
||||
logger.info("[purge] finding redundant state groups")
|
||||
|
||||
# Get all state groups that are only referenced by events that are
|
||||
# to be deleted.
|
||||
# This works by first getting state groups that we may want to delete,
|
||||
# joining against event_to_state_groups to get events that use that
|
||||
# state group, then left joining against events_to_purge again. Any
|
||||
# state group where the left join produce *no nulls* are referenced
|
||||
# only by events that are going to be purged.
|
||||
# Get all state groups that are referenced by events that are to be
|
||||
# deleted. We then go and check if they are referenced by other events
|
||||
# or state groups, and if not we delete them.
|
||||
txn.execute("""
|
||||
SELECT state_group FROM
|
||||
(
|
||||
SELECT DISTINCT state_group FROM events_to_purge
|
||||
INNER JOIN event_to_state_groups USING (event_id)
|
||||
) AS sp
|
||||
INNER JOIN event_to_state_groups USING (state_group)
|
||||
LEFT JOIN events_to_purge AS ep USING (event_id)
|
||||
GROUP BY state_group
|
||||
HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
|
||||
SELECT DISTINCT state_group FROM events_to_purge
|
||||
INNER JOIN event_to_state_groups USING (event_id)
|
||||
""")
|
||||
|
||||
state_rows = txn.fetchall()
|
||||
logger.info("[purge] found %i redundant state groups", len(state_rows))
|
||||
referenced_state_groups = set(sg for sg, in txn)
|
||||
logger.info(
|
||||
"[purge] found %i referenced state groups",
|
||||
len(referenced_state_groups),
|
||||
)
|
||||
|
||||
# make a set of the redundant state groups, so that we can look them up
|
||||
# efficiently
|
||||
state_groups_to_delete = set([sg for sg, in state_rows])
|
||||
logger.info("[purge] finding state groups that can be deleted")
|
||||
|
||||
# Now we get all the state groups that rely on these state groups
|
||||
logger.info("[purge] finding state groups which depend on redundant"
|
||||
" state groups")
|
||||
remaining_state_groups = []
|
||||
for i in range(0, len(state_rows), 100):
|
||||
chunk = [sg for sg, in state_rows[i:i + 100]]
|
||||
# look for state groups whose prev_state_group is one we are about
|
||||
# to delete
|
||||
rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=chunk,
|
||||
retcols=["state_group"],
|
||||
keyvalues={},
|
||||
state_groups_to_delete, remaining_state_groups = (
|
||||
self._find_unreferenced_groups_during_purge(
|
||||
txn, referenced_state_groups,
|
||||
)
|
||||
remaining_state_groups.extend(
|
||||
row["state_group"] for row in rows
|
||||
)
|
||||
|
||||
# exclude state groups we are about to delete: no point in
|
||||
# updating them
|
||||
if row["state_group"] not in state_groups_to_delete
|
||||
)
|
||||
logger.info(
|
||||
"[purge] found %i state groups to delete",
|
||||
len(state_groups_to_delete),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"[purge] de-delta-ing %i remaining state groups",
|
||||
len(remaining_state_groups),
|
||||
)
|
||||
|
||||
# Now we turn the state groups that reference to-be-deleted state
|
||||
# groups to non delta versions.
|
||||
|
|
@ -2127,11 +2111,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
logger.info("[purge] removing redundant state groups")
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups_state WHERE state_group = ?",
|
||||
state_rows
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups WHERE id = ?",
|
||||
state_rows
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
|
||||
logger.info("[purge] removing events from event_to_state_groups")
|
||||
|
|
@ -2227,6 +2211,85 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
|
||||
logger.info("[purge] done")
|
||||
|
||||
def _find_unreferenced_groups_during_purge(self, txn, state_groups):
|
||||
"""Used when purging history to figure out which state groups can be
|
||||
deleted and which need to be de-delta'ed (due to one of its prev groups
|
||||
being scheduled for deletion).
|
||||
|
||||
Args:
|
||||
txn
|
||||
state_groups (set[int]): Set of state groups referenced by events
|
||||
that are going to be deleted.
|
||||
|
||||
Returns:
|
||||
tuple[set[int], set[int]]: The set of state groups that can be
|
||||
deleted and the set of state groups that need to be de-delta'ed
|
||||
"""
|
||||
# Graph of state group -> previous group
|
||||
graph = {}
|
||||
|
||||
# Set of events that we have found to be referenced by events
|
||||
referenced_groups = set()
|
||||
|
||||
# Set of state groups we've already seen
|
||||
state_groups_seen = set(state_groups)
|
||||
|
||||
# Set of state groups to handle next.
|
||||
next_to_search = set(state_groups)
|
||||
while next_to_search:
|
||||
# We bound size of groups we're looking up at once, to stop the
|
||||
# SQL query getting too big
|
||||
if len(next_to_search) < 100:
|
||||
current_search = next_to_search
|
||||
next_to_search = set()
|
||||
else:
|
||||
current_search = set(itertools.islice(next_to_search, 100))
|
||||
next_to_search -= current_search
|
||||
|
||||
# Check if state groups are referenced
|
||||
sql = """
|
||||
SELECT DISTINCT state_group FROM event_to_state_groups
|
||||
LEFT JOIN events_to_purge AS ep USING (event_id)
|
||||
WHERE state_group IN (%s) AND ep.event_id IS NULL
|
||||
""" % (",".join("?" for _ in current_search),)
|
||||
txn.execute(sql, list(current_search))
|
||||
|
||||
referenced = set(sg for sg, in txn)
|
||||
referenced_groups |= referenced
|
||||
|
||||
# We don't continue iterating up the state group graphs for state
|
||||
# groups that are referenced.
|
||||
current_search -= referenced
|
||||
|
||||
rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=current_search,
|
||||
keyvalues={},
|
||||
retcols=("prev_state_group", "state_group",),
|
||||
)
|
||||
|
||||
prevs = set(row["state_group"] for row in rows)
|
||||
# We don't bother re-handling groups we've already seen
|
||||
prevs -= state_groups_seen
|
||||
next_to_search |= prevs
|
||||
state_groups_seen |= prevs
|
||||
|
||||
for row in rows:
|
||||
# Note: Each state group can have at most one prev group
|
||||
graph[row["state_group"]] = row["prev_state_group"]
|
||||
|
||||
to_delete = state_groups_seen - referenced_groups
|
||||
|
||||
to_dedelta = set()
|
||||
for sg in referenced_groups:
|
||||
prev_sg = graph.get(sg)
|
||||
if prev_sg and prev_sg in to_delete:
|
||||
to_dedelta.add(sg)
|
||||
|
||||
return to_delete, to_dedelta
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_event_after(self, event_id1, event_id2):
|
||||
"""Returns True if event_id1 is after event_id2 in the stream
|
||||
|
|
|
|||
|
|
@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache (
|
|||
content TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
|
||||
|
||||
|
||||
-- The last update we got for a user. Empty if we're not receiving updates for
|
||||
-- that user.
|
||||
CREATE TABLE device_lists_remote_extremeties (
|
||||
|
|
@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties (
|
|||
stream_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
|
||||
-- we used to create non-unique indexes on these tables, but as of update 52 we create
|
||||
-- unique indexes concurrently:
|
||||
--
|
||||
-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
|
||||
-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
|
||||
|
||||
|
||||
-- Stream of device lists updates. Includes both local and remotes
|
||||
|
|
|
|||
|
|
@ -0,0 +1,19 @@
|
|||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- This is needed to efficiently check for unreferenced state groups during
|
||||
-- purge. Added events_to_state_group(state_group) index
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('event_to_state_groups_sg_index', '{}');
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- register a background update which will create a unique index on
|
||||
-- device_lists_remote_cache
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('device_lists_remote_cache_unique_idx', '{}');
|
||||
|
||||
-- and one on device_lists_remote_extremeties
|
||||
INSERT into background_updates (update_name, progress_json, depends_on)
|
||||
VALUES (
|
||||
'device_lists_remote_extremeties_unique_idx', '{}',
|
||||
|
||||
-- doesn't really depend on this, but we need to make sure both happen
|
||||
-- before we drop the old indexes.
|
||||
'device_lists_remote_cache_unique_idx'
|
||||
);
|
||||
|
||||
-- once they complete, we can drop the old indexes.
|
||||
INSERT into background_updates (update_name, progress_json, depends_on)
|
||||
VALUES (
|
||||
'drop_device_list_streams_non_unique_indexes', '{}',
|
||||
'device_lists_remote_extremeties_unique_idx'
|
||||
);
|
||||
|
|
@ -1257,6 +1257,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
||||
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
||||
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
||||
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(StateStore, self).__init__(db_conn, hs)
|
||||
|
|
@ -1275,6 +1276,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
columns=["state_key"],
|
||||
where_clause="type='m.room.member'",
|
||||
)
|
||||
self.register_background_index_update(
|
||||
self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
|
||||
index_name="event_to_state_groups_sg_index",
|
||||
table="event_to_state_groups",
|
||||
columns=["state_group"],
|
||||
)
|
||||
|
||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||
state_groups = {}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue