Merge branch 'develop' into dinsic

This commit is contained in:
Andrew Morgan 2019-04-15 15:22:14 +01:00
commit 7d71975e6a
204 changed files with 5053 additions and 10013 deletions

1
changelog.d/4474.misc Normal file
View File

@ -0,0 +1 @@
Add test to verify threepid auth check added in #4435.

1
changelog.d/4555.bugfix Normal file
View File

@ -0,0 +1 @@
Avoid redundant URL encoding of redirect URL for SSO login in the fallback login page. Fixes a regression introduced in [#4220](https://github.com/matrix-org/synapse/pull/4220). Contributed by Marcel Fabian Krüger ("[zaugin](https://github.com/zauguin)").

1
changelog.d/4942.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where presence updates were sent to all servers in a room when a new server joined, rather than to just the new server.

1
changelog.d/4947.feature Normal file
View File

@ -0,0 +1 @@
Add ability for password provider modules to bind email addresses to users upon registration.

1
changelog.d/4949.misc Normal file
View File

@ -0,0 +1 @@
Fix/improve some docstrings in the replication code.

2
changelog.d/4953.misc Normal file
View File

@ -0,0 +1,2 @@
Split synapse.replication.tcp.streams into smaller files.

1
changelog.d/4954.misc Normal file
View File

@ -0,0 +1 @@
Refactor replication row generation/parsing.

1
changelog.d/4955.bugfix Normal file
View File

@ -0,0 +1 @@
Fix sync bug which made accepting invites unreliable in worker-mode synapses.

1
changelog.d/4956.bugfix Normal file
View File

@ -0,0 +1 @@
Fix sync bug which made accepting invites unreliable in worker-mode synapses.

1
changelog.d/4959.misc Normal file
View File

@ -0,0 +1 @@
Run `black` to clean up formatting on `synapse/storage/roommember.py` and `synapse/storage/events.py`.

1
changelog.d/4965.misc Normal file
View File

@ -0,0 +1 @@
Remove log line for password via the admin API.

1
changelog.d/4968.misc Normal file
View File

@ -0,0 +1 @@
Fix typo in TLS filenames in docker/README.md. Also add the '-p' commandline option to the 'docker run' example. Contributed by Jurrie Overgoor.

2
changelog.d/4969.misc Normal file
View File

@ -0,0 +1,2 @@
Refactor room version definitions.

1
changelog.d/4974.misc Normal file
View File

@ -0,0 +1 @@
Add `config.signing_key_path` that can be read by `synapse.config` utility.

1
changelog.d/4981.bugfix Normal file
View File

@ -0,0 +1 @@
start.sh: Fix the --no-rate-limit option for messages and make it bypass rate limit on registration and login too.

1
changelog.d/4982.misc Normal file
View File

@ -0,0 +1 @@
Track which identity server is used when binding a threepid and use that for unbinding, as per MSC1915.

1
changelog.d/4985.misc Normal file
View File

@ -0,0 +1 @@
Rewrite KeyringTestCase as a HomeserverTestCase.

1
changelog.d/4987.misc Normal file
View File

@ -0,0 +1 @@
README updates: Corrected the default POSTGRES_USER. Added port forwarding hint in TLS section.

1
changelog.d/4989.feature Normal file
View File

@ -0,0 +1 @@
Remove presence list support as per MSC 1819.

1
changelog.d/4990.bugfix Normal file
View File

@ -0,0 +1 @@
Transfer related groups on room upgrade.

1
changelog.d/4991.feature Normal file
View File

@ -0,0 +1 @@
Reduce CPU usage starting pushers during start up.

1
changelog.d/4992.misc Normal file
View File

@ -0,0 +1 @@
Remove a number of unused tables from the database schema.

1
changelog.d/4996.misc Normal file
View File

@ -0,0 +1 @@
Run `black` on the remainder of `synapse/storage/`.

1
changelog.d/4998.misc Normal file
View File

@ -0,0 +1 @@
Fix grammar in get_current_users_in_room and give it a docstring.

1
changelog.d/4999.bugfix Normal file
View File

@ -0,0 +1 @@
Prevent the ability to kick users from a room they aren't in.

1
changelog.d/5001.misc Normal file
View File

@ -0,0 +1 @@
Clean up some code in the server-key Keyring.

1
changelog.d/5002.feature Normal file
View File

@ -0,0 +1 @@
Add a delete group admin API.

1
changelog.d/5003.bugfix Normal file
View File

@ -0,0 +1 @@
Fix issue #4596 so synapse_port_db script works with --curses option on Python 3. Contributed by Anders Jensen-Waud <anders@jensenwaud.com>.

1
changelog.d/5007.misc Normal file
View File

@ -0,0 +1 @@
Refactor synapse.storage._base._simple_select_list_paginate.

1
changelog.d/5020.feature Normal file
View File

@ -0,0 +1 @@
Add context to phonehome stats.

1
changelog.d/5024.misc Normal file
View File

@ -0,0 +1 @@
Store the notary server name correctly in server_keys_json.

1
changelog.d/5027.feature Normal file
View File

@ -0,0 +1 @@
Add time-based account expiration.

1
changelog.d/5028.misc Normal file
View File

@ -0,0 +1 @@
Remove a number of unused tables from the database schema.

1
changelog.d/5030.misc Normal file
View File

@ -0,0 +1 @@
Rewrite Datastore.get_server_verify_keys to reduce the number of database transactions.

1
changelog.d/5032.bugfix Normal file
View File

@ -0,0 +1 @@
Fix "cannot import name execute_batch" error with postgres.

1
changelog.d/5033.misc Normal file
View File

@ -0,0 +1 @@
Remove a number of unused tables from the database schema.

1
changelog.d/5035.bugfix Normal file
View File

@ -0,0 +1 @@
Fix disappearing exceptions in manhole.

1
changelog.d/5046.misc Normal file
View File

@ -0,0 +1 @@
Remove extraneous period from copyright headers.

File diff suppressed because one or more lines are too long

View File

@ -27,17 +27,27 @@ for port in 8080 8081 8082; do
--config-path "$DIR/etc/$port.config" \
--report-stats no
printf '\n\n# Customisation made by demo/start.sh\n' >> $DIR/etc/$port.config
echo 'enable_registration: true' >> $DIR/etc/$port.config
# Check script parameters
if [ $# -eq 1 ]; then
if [ $1 = "--no-rate-limit" ]; then
# Set high limits in config file to disable rate limiting
perl -p -i -e 's/rc_messages_per_second.*/rc_messages_per_second: 1000/g' $DIR/etc/$port.config
perl -p -i -e 's/rc_message_burst_count.*/rc_message_burst_count: 1000/g' $DIR/etc/$port.config
# messages rate limit
echo 'rc_messages_per_second: 1000' >> $DIR/etc/$port.config
echo 'rc_message_burst_count: 1000' >> $DIR/etc/$port.config
# registration rate limit
printf 'rc_registration:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
# login rate limit
echo 'rc_login:' >> $DIR/etc/$port.config
printf ' address:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
printf ' account:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
printf ' failed_attempts:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
fi
fi
perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
if ! grep -F "full_twisted_stacktraces" -q $DIR/etc/$port.config; then
echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config
fi

View File

@ -31,6 +31,7 @@ docker run \
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
-p 8448:8448 \
matrixdotorg/synapse:latest
```
@ -57,9 +58,10 @@ configuration file there. Multiple application services are supported.
Synapse requires a valid TLS certificate. You can do one of the following:
* Provide your own certificate and key (as
`${DATA_PATH}/${SYNAPSE_SERVER_NAME}.crt` and
`${DATA_PATH}/${SYNAPSE_SERVER_NAME}.key`, or elsewhere by providing an
entire config as `${SYNAPSE_CONFIG_PATH}`).
`${DATA_PATH}/${SYNAPSE_SERVER_NAME}.tls.crt` and
`${DATA_PATH}/${SYNAPSE_SERVER_NAME}.tls.key`, or elsewhere by providing an
entire config as `${SYNAPSE_CONFIG_PATH}`). In this case, you should forward
traffic to port 8448 in the container, for example with `-p 443:8448`.
* Use a reverse proxy to terminate incoming TLS, and forward the plain http
traffic to port 8008 in the container. In this case you should set `-e
@ -137,7 +139,7 @@ Database specific values (will use SQLite if not set):
**NOTE**: You are highly encouraged to use postgresql! Please use the compose
file to make it easier to deploy.
* `POSTGRES_USER` - The user for the synapse postgres database. [default:
`matrix`]
`synapse`]
Mail server specific values (will not send emails if not set):

View File

@ -0,0 +1,14 @@
# Delete a local group
This API lets a server admin delete a local group. Doing so will kick all
users out of the group so that their clients will correctly handle the group
being deleted.
The API is:
```
POST /_matrix/client/r0/admin/delete_group/<group_id>
```
including an `access_token` of a server admin.

View File

@ -236,6 +236,9 @@ listeners:
# - medium: 'email'
# address: 'reserved_user@example.com'
# Used by phonehome stats to group together related servers.
#server_context: context
## TLS ##
@ -643,6 +646,12 @@ uploads_path: "DATADIR/uploads"
#
#enable_registration: false
# Optional account validity parameter. This allows for, e.g., accounts to
# be denied any request after a given period.
#
#account_validity:
# period: 6w
# The user must provide all of the below types of 3PID when registering.
#
#registrations_require_3pid:

View File

@ -58,15 +58,11 @@ BOOLEAN_COLUMNS = {
APPEND_ONLY_TABLES = [
"event_content_hashes",
"event_reference_hashes",
"event_signatures",
"event_edge_hashes",
"events",
"event_json",
"state_events",
"room_memberships",
"feedback",
"topics",
"room_names",
"rooms",
@ -88,7 +84,6 @@ APPEND_ONLY_TABLES = [
"event_search",
"presence_stream",
"push_rules_stream",
"current_state_resets",
"ex_outlier_stream",
"cache_invalidation_stream",
"public_room_list_stream",
@ -811,7 +806,7 @@ class CursesProgress(Progress):
middle_space = 1
items = self.tables.items()
items.sort(key=lambda i: (i[1]["perc"], i[0]))
items = sorted(items, key=lambda i: (i[1]["perc"], i[0]))
for i, (table, data) in enumerate(items):
if i + 2 >= rows:

View File

@ -64,6 +64,8 @@ class Auth(object):
self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
register_cache("cache", "token_cache", self.token_cache)
self._account_validity = hs.config.account_validity
@defer.inlineCallbacks
def check_from_context(self, room_version, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids(self.store)
@ -227,6 +229,16 @@ class Auth(object):
token_id = user_info["token_id"]
is_guest = user_info["is_guest"]
# Deny the request if the user account has expired.
if self._account_validity.enabled:
expiration_ts = yield self.store.get_expiration_ts_for_user(user)
if self.clock.time_msec() >= expiration_ts:
raise AuthError(
403,
"User account has expired",
errcode=Codes.EXPIRED_ACCOUNT,
)
# device_id may not be present if get_user_by_access_token has been
# stubbed out.
device_id = user_info.get("device_id")

View File

@ -69,6 +69,7 @@ class EventTypes(object):
Redaction = "m.room.redaction"
ThirdPartyInvite = "m.room.third_party_invite"
Encryption = "m.room.encryption"
RelatedGroups = "m.room.related_groups"
RoomHistoryVisibility = "m.room.history_visibility"
CanonicalAlias = "m.room.canonical_alias"
@ -103,46 +104,6 @@ class ThirdPartyEntityKind(object):
LOCATION = "location"
class RoomVersions(object):
V1 = "1"
V2 = "2"
V3 = "3"
STATE_V2_TEST = "state-v2-test"
class RoomDisposition(object):
STABLE = "stable"
UNSTABLE = "unstable"
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = RoomVersions.V1
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {
RoomVersions.V1,
RoomVersions.V2,
RoomVersions.V3,
RoomVersions.STATE_V2_TEST,
RoomVersions.V3,
}
class EventFormatVersions(object):
"""This is an internal enum for tracking the version of the event format,
independently from the room version.
"""
V1 = 1
V2 = 2
KNOWN_EVENT_FORMAT_VERSIONS = {
EventFormatVersions.V1,
EventFormatVersions.V2,
}
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -60,6 +60,7 @@ class Codes(object):
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
class CodeMessageException(RuntimeError):

View File

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import attr
class EventFormatVersions(object):
"""This is an internal enum for tracking the version of the event format,
independently from the room version.
"""
V1 = 1 # $id:server format
V2 = 2 # MSC1659-style $hash format: introduced for room v3
KNOWN_EVENT_FORMAT_VERSIONS = {
EventFormatVersions.V1,
EventFormatVersions.V2,
}
class StateResolutionVersions(object):
"""Enum to identify the state resolution algorithms"""
V1 = 1 # room v1 state res
V2 = 2 # MSC1442 state res: room v2 and later
class RoomDisposition(object):
STABLE = "stable"
UNSTABLE = "unstable"
@attr.s(slots=True, frozen=True)
class RoomVersion(object):
"""An object which describes the unique attributes of a room version."""
identifier = attr.ib() # str; the identifier for this version
disposition = attr.ib() # str; one of the RoomDispositions
event_format = attr.ib() # int; one of the EventFormatVersions
state_res = attr.ib() # int; one of the StateResolutionVersions
class RoomVersions(object):
V1 = RoomVersion(
"1",
RoomDisposition.STABLE,
EventFormatVersions.V1,
StateResolutionVersions.V1,
)
STATE_V2_TEST = RoomVersion(
"state-v2-test",
RoomDisposition.UNSTABLE,
EventFormatVersions.V1,
StateResolutionVersions.V2,
)
V2 = RoomVersion(
"2",
RoomDisposition.STABLE,
EventFormatVersions.V1,
StateResolutionVersions.V2,
)
V3 = RoomVersion(
"3",
RoomDisposition.STABLE,
EventFormatVersions.V2,
StateResolutionVersions.V2,
)
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = RoomVersions.V1
KNOWN_ROOM_VERSIONS = {
v.identifier: v for v in (
RoomVersions.V1,
RoomVersions.V2,
RoomVersions.V3,
RoomVersions.STATE_V2_TEST,
)
} # type: dict[str, RoomVersion]

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt

View File

@ -518,6 +518,7 @@ def run(hs):
uptime = 0
stats["homeserver"] = hs.config.server_name
stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
@ -558,7 +559,6 @@ def run(hs):
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
yield hs.get_simple_http_client().put_json(

View File

@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
event = yield self.store.get_event(row.event_id)
if row.type != EventsStreamEventRow.TypeId:
continue
event = yield self.store.get_event(row.data.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)

View File

@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamCurrentStateRow,
)
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
prefilled_cache=curr_state_delta_prefill,
)
self._current_state_delta_pos = events_max
def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions()
result["current_state_deltas"] = self._current_state_delta_pos
return result
def process_replication_rows(self, stream_name, token, rows):
if stream_name == "current_state_deltas":
self._current_state_delta_pos = token
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(token)
for row in rows:
if row.type != EventsStreamCurrentStateRow.TypeId:
continue
self._curr_state_delta_stream_cache.entity_has_changed(
row.room_id, token
row.data.room_id, token
)
return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows
@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory)
@defer.inlineCallbacks

View File

@ -42,7 +42,8 @@ class KeyConfig(Config):
if "signing_key" in config:
self.signing_key = read_signing_keys([config["signing_key"]])
else:
self.signing_key = self.read_signing_key(config["signing_key_path"])
self.signing_key_path = config["signing_key_path"]
self.signing_key = self.read_signing_key(self.signing_key_path)
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})

View File

@ -20,6 +20,15 @@ from synapse.types import RoomAlias
from synapse.util.stringutils import random_string_with_symbols
class AccountValidityConfig(Config):
def __init__(self, config):
self.enabled = (len(config) > 0)
period = config.get("period", None)
if period:
self.period = self.parse_duration(period)
class RegistrationConfig(Config):
def read_config(self, config):
@ -31,6 +40,8 @@ class RegistrationConfig(Config):
strtobool(str(config["disable_registration"]))
)
self.account_validity = AccountValidityConfig(config.get("account_validity", {}))
self.registrations_require_3pid = config.get("registrations_require_3pid", [])
self.allowed_local_3pids = config.get("allowed_local_3pids", [])
self.check_is_for_allowed_local_3pids = config.get(
@ -97,6 +108,12 @@ class RegistrationConfig(Config):
#
#enable_registration: false
# Optional account validity parameter. This allows for, e.g., accounts to
# be denied any request after a given period.
#
#account_validity:
# period: 6w
# The user must provide all of the below types of 3PID when registering.
#
#registrations_require_3pid:

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd.
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@ -37,6 +37,7 @@ class ServerConfig(Config):
def read_config(self, config):
self.server_name = config["server_name"]
self.server_context = config.get("server_context", None)
try:
parse_and_validate_server_name(self.server_name)
@ -484,6 +485,9 @@ class ServerConfig(Config):
#mau_limit_reserved_threepids:
# - medium: 'email'
# address: 'reserved_user@example.com'
# Used by phonehome stats to group together related servers.
#server_context: context
""" % locals()
def read_arguments(self, args):

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017, 2018 New Vector Ltd.
# Copyright 2017, 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ from collections import namedtuple
from six import raise_from
from six.moves import urllib
import nacl.signing
from signedjson.key import (
decode_verify_key_bytes,
encode_verify_key_base64,
@ -274,10 +275,6 @@ class Keyring(object):
@defer.inlineCallbacks
def do_iterations():
with Measure(self.clock, "get_server_verify_keys"):
# dict[str, dict[str, VerifyKey]]: results so far.
# map server_name -> key_id -> VerifyKey
merged_results = {}
# dict[str, set(str)]: keys to fetch for each server
missing_keys = {}
for verify_request in verify_requests:
@ -287,29 +284,29 @@ class Keyring(object):
for fn in key_fetch_fns:
results = yield fn(missing_keys.items())
merged_results.update(results)
# We now need to figure out which verify requests we have keys
# for and which we don't
missing_keys = {}
requests_missing_keys = []
for verify_request in verify_requests:
server_name = verify_request.server_name
result_keys = merged_results[server_name]
if verify_request.deferred.called:
# We've already called this deferred, which probably
# means that we've already found a key for it.
continue
server_name = verify_request.server_name
# see if any of the keys we got this time are sufficient to
# complete this VerifyKeyRequest.
result_keys = results.get(server_name, {})
for key_id in verify_request.key_ids:
if key_id in result_keys:
key = result_keys.get(key_id)
if key:
with PreserveLoggingContext():
verify_request.deferred.callback((
server_name,
key_id,
result_keys[key_id],
))
verify_request.deferred.callback(
(server_name, key_id, key)
)
break
else:
# The else block is only reached if the loop above
@ -343,27 +340,24 @@ class Keyring(object):
@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
"""
Args:
server_name_and_key_ids (list[(str, iterable[str])]):
server_name_and_key_ids (iterable(Tuple[str, iterable[str]]):
list of (server_name, iterable[key_id]) tuples to fetch keys for
Returns:
Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
Deferred: resolves to dict[str, dict[str, VerifyKey|None]]: map from
server_name -> key_id -> VerifyKey
"""
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.get_server_verify_keys,
server_name, key_ids,
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError))
defer.returnValue(dict(res))
keys_to_fetch = (
(server_name, key_id)
for server_name, key_ids in server_name_and_key_ids
for key_id in key_ids
)
res = yield self.store.get_server_verify_keys(keys_to_fetch)
keys = {}
for (server_name, key_id), key in res.items():
keys.setdefault(server_name, {})[key_id] = key
defer.returnValue(keys)
@defer.inlineCallbacks
def get_keys_from_perspectives(self, server_name_and_key_ids):
@ -494,11 +488,11 @@ class Keyring(object):
)
processed_response = yield self.process_v2_response(
perspective_name, response, only_from_server=False
perspective_name, response
)
server_name = response["server_name"]
for server_name, response_keys in processed_response.items():
keys.setdefault(server_name, {}).update(response_keys)
keys.setdefault(server_name, {}).update(processed_response)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@ -517,7 +511,7 @@ class Keyring(object):
@defer.inlineCallbacks
def get_server_verify_key_v2_direct(self, server_name, key_ids):
keys = {}
keys = {} # type: dict[str, nacl.signing.VerifyKey]
for requested_key_id in key_ids:
if requested_key_id in keys:
@ -542,6 +536,11 @@ class Keyring(object):
or server_name not in response[u"signatures"]):
raise KeyLookupError("Key response not signed by remote server")
if response["server_name"] != server_name:
raise KeyLookupError("Expected a response for server %r not %r" % (
server_name, response["server_name"]
))
response_keys = yield self.process_v2_response(
from_server=server_name,
requested_ids=[requested_key_id],
@ -550,24 +549,45 @@ class Keyring(object):
keys.update(response_keys)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store_keys,
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
)
for key_server_name, verify_keys in keys.items()
],
consumeErrors=True
).addErrback(unwrapFirstError))
defer.returnValue(keys)
yield self.store_keys(
server_name=server_name,
from_server=server_name,
verify_keys=keys,
)
defer.returnValue({server_name: keys})
@defer.inlineCallbacks
def process_v2_response(self, from_server, response_json,
requested_ids=[], only_from_server=True):
def process_v2_response(
self, from_server, response_json, requested_ids=[],
):
"""Parse a 'Server Keys' structure from the result of a /key request
This is used to parse either the entirety of the response from
GET /_matrix/key/v2/server, or a single entry from the list returned by
POST /_matrix/key/v2/query.
Checks that each signature in the response that claims to come from the origin
server is valid. (Does not check that there actually is such a signature, for
some reason.)
Stores the json in server_keys_json so that it can be used for future responses
to /_matrix/key/v2/query.
Args:
from_server (str): the name of the server producing this result: either
the origin server for a /_matrix/key/v2/server request, or the notary
for a /_matrix/key/v2/query.
response_json (dict): the json-decoded Server Keys response object
requested_ids (iterable[str]): a list of the key IDs that were requested.
We will store the json for these key ids as well as any that are
actually in the response
Returns:
Deferred[dict[str, nacl.signing.VerifyKey]]:
map from key_id to key object
"""
time_now_ms = self.clock.time_msec()
response_keys = {}
verify_keys = {}
@ -589,15 +609,7 @@ class Keyring(object):
verify_key.time_added = time_now_ms
old_verify_keys[key_id] = verify_key
results = {}
server_name = response_json["server_name"]
if only_from_server:
if server_name != from_server:
raise KeyLookupError(
"Expected a response for server %r not %r" % (
from_server, server_name
)
)
for key_id in response_json["signatures"].get(server_name, {}):
if key_id not in response_json["verify_keys"]:
raise KeyLookupError(
@ -633,7 +645,7 @@ class Keyring(object):
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=server_name,
from_server=from_server,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
@ -643,9 +655,7 @@ class Keyring(object):
consumeErrors=True,
).addErrback(unwrapFirstError))
results[server_name] = response_keys
defer.returnValue(results)
defer.returnValue(response_keys)
def store_keys(self, server_name, from_server, verify_keys):
"""Store a collection of verify keys for a given server

View File

@ -20,15 +20,9 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
EventTypes,
JoinRules,
Membership,
RoomVersions,
)
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.types import UserID, get_domain_from_id
logger = logging.getLogger(__name__)
@ -452,16 +446,18 @@ def check_redaction(room_version, event, auth_events):
if user_level >= redact_level:
return False
if room_version in (RoomVersions.V1, RoomVersions.V2,):
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
raise RuntimeError("Unrecognized room version %r" % (room_version,))
if v.event_format == EventFormatVersions.V1:
redacter_domain = get_domain_from_id(event.event_id)
redactee_domain = get_domain_from_id(event.redacts)
if redacter_domain == redactee_domain:
return True
elif room_version == RoomVersions.V3:
else:
event.internal_metadata.recheck_redaction = True
return True
else:
raise RuntimeError("Unrecognized room version %r" % (room_version,))
raise AuthError(
403,

View File

@ -21,7 +21,7 @@ import six
from unpaddedbase64 import encode_base64
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions, RoomVersions
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@ -351,18 +351,13 @@ def room_version_to_event_format(room_version):
Returns:
int
"""
if room_version not in KNOWN_ROOM_VERSIONS:
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
# We should have already checked version, so this should not happen
raise RuntimeError("Unrecognized room version %s" % (room_version,))
if room_version in (
RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
):
return EventFormatVersions.V1
elif room_version in (RoomVersions.V3,):
return EventFormatVersions.V2
else:
raise RuntimeError("Unrecognized room version %s" % (room_version,))
return v.event_format
def event_type_from_format_version(format_version):

View File

@ -17,21 +17,17 @@ import attr
from twisted.internet import defer
from synapse.api.constants import (
from synapse.api.constants import MAX_DEPTH
from synapse.api.room_versions import (
KNOWN_EVENT_FORMAT_VERSIONS,
KNOWN_ROOM_VERSIONS,
MAX_DEPTH,
EventFormatVersions,
)
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.types import EventID
from synapse.util.stringutils import random_string
from . import (
_EventInternalMetadata,
event_type_from_format_version,
room_version_to_event_format,
)
from . import _EventInternalMetadata, event_type_from_format_version
@attr.s(slots=True, cmp=False, frozen=True)
@ -170,21 +166,34 @@ class EventBuilderFactory(object):
def new(self, room_version, key_values):
"""Generate an event builder appropriate for the given room version
Deprecated: use for_room_version with a RoomVersion object instead
Args:
room_version (str): Version of the room that we're creating an
event builder for
room_version (str): Version of the room that we're creating an event builder
for
key_values (dict): Fields used as the basis of the new event
Returns:
EventBuilder
"""
# There's currently only the one event version defined
if room_version not in KNOWN_ROOM_VERSIONS:
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
raise Exception(
"No event format defined for version %r" % (room_version,)
)
return self.for_room_version(v, key_values)
def for_room_version(self, room_version, key_values):
"""Generate an event builder appropriate for the given room version
Args:
room_version (synapse.api.room_versions.RoomVersion):
Version of the room that we're creating an event builder for
key_values (dict): Fields used as the basis of the new event
Returns:
EventBuilder
"""
return EventBuilder(
store=self.store,
state=self.state,
@ -192,7 +201,7 @@ class EventBuilderFactory(object):
clock=self.clock,
hostname=self.hostname,
signing_key=self.signing_key,
format_version=room_version_to_event_format(room_version),
format_version=room_version.event_format,
type=key_values["type"],
state_key=key_values.get("state_key"),
room_id=key_values["room_id"],
@ -222,7 +231,6 @@ def create_local_event_from_event_dict(clock, hostname, signing_key,
FrozenEvent
"""
# There's currently only the one event version defined
if format_version not in KNOWN_EVENT_FORMAT_VERSIONS:
raise Exception(
"No event format defined for version %r" % (format_version,)

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd.
# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@ -15,8 +15,9 @@
from six import string_types
from synapse.api.constants import EventFormatVersions, EventTypes, Membership
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.room_versions import EventFormatVersions
from synapse.types import EventID, RoomID, UserID

View File

@ -20,8 +20,9 @@ import six
from twisted.internet import defer
from twisted.internet.defer import DeferredList
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import event_type_from_format_version
from synapse.events.utils import prune_event
@ -274,9 +275,12 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
# now let's look for events where the sender's domain is different to the
# event id's domain (normally only the case for joins/leaves), and add additional
# checks. Only do this if the room version has a concept of event ID domain
if room_version in (
RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
):
# (ie, the room version uses old-style non-hash event IDs).
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
raise RuntimeError("Unrecognized room version %s" % (room_version,))
if v.event_format == EventFormatVersions.V1:
pdus_to_check_event_id = [
p for p in pdus_to_check
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
@ -289,10 +293,6 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
for p, d in zip(pdus_to_check_event_id, more_deferreds):
p.deferreds.append(d)
elif room_version in (RoomVersions.V3,):
pass # No further checks needed, as event IDs are hashes here
else:
raise RuntimeError("Unrecognized room version %s" % (room_version,))
# replace lists of deferreds with single Deferreds
return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]

View File

@ -25,12 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RoomVersions,
)
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
Codes,
@ -38,6 +33,11 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersions,
)
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.util import logcontext, unwrapFirstError
@ -570,7 +570,7 @@ class FederationClient(FederationBase):
Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
`(origin, event, event_format)` where origin is the remote
homeserver which generated the event, and event_format is one of
`synapse.api.constants.EventFormatVersions`.
`synapse.api.room_versions.EventFormatVersions`.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@ -592,7 +592,7 @@ class FederationClient(FederationBase):
# Note: If not supplied, the room version may be either v1 or v2,
# however either way the event format version will be v1.
room_version = ret.get("room_version", RoomVersions.V1)
room_version = ret.get("room_version", RoomVersions.V1.identifier)
event_format = room_version_to_event_format(room_version)
pdu_dict = ret.get("event", None)
@ -695,7 +695,9 @@ class FederationClient(FederationBase):
room_version = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
room_version = e.content.get("room_version", RoomVersions.V1)
room_version = e.content.get(
"room_version", RoomVersions.V1.identifier
)
break
if room_version is None:
@ -802,11 +804,10 @@ class FederationClient(FederationBase):
raise err
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API.
if room_version in (RoomVersions.V1, RoomVersions.V2):
pass # We'll fall through
else:
# the v2 invite API. That's ok provided the room uses old-style event
# IDs.
v = KNOWN_ROOM_VERSIONS.get(room_version)
if v.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",

View File

@ -25,7 +25,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@ -34,6 +34,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json

View File

@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = SortedDict() # Stream position -> user_id
self.presence_changed = SortedDict() # Stream position -> list[user_id]
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
self.presence_destinations = SortedDict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object):
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
"edus", "device_messages", "pos_time",
"edus", "device_messages", "pos_time", "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object):
for user_id in uids
)
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]
user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_presence_to_destinations(self, states, destinations):
"""As per FederationSender
Args:
states (list[UserPresenceState])
destinations (list[str])
"""
for state in states:
pos = self._next_pos()
self.presence_map.update({state.user_id: state for state in states})
self.presence_destinations[pos] = (state.user_id, destinations)
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object):
state=self.presence_map[user_id],
)))
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
rows.append((pos, PresenceDestinationsRow(
state=self.presence_map[user_id],
destinations=list(dests),
)))
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
buff.presence.append(self.state)
class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
"state", # UserPresenceState
"destinations", # list[str]
))):
TypeId = "pd"
@staticmethod
def from_data(data):
return PresenceDestinationsRow(
state=UserPresenceState.from_dict(data["state"]),
destinations=data["dests"],
)
def to_data(self):
return {
"state": self.state.as_dict(),
"dests": self.destinations,
}
def add_to_buffer(self, buff):
buff.presence_destinations.append((self.state, self.destinations))
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
@ -428,6 +489,7 @@ TypeToRow = {
Row.TypeId: Row
for Row in (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
DeviceRow,
@ -437,6 +499,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):
buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations,
)
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)

View File

@ -371,7 +371,7 @@ class FederationSender(object):
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
@ -402,6 +402,23 @@ class FederationSender(object):
finally:
self._processing_pending_presence = False
def send_presence_to_destinations(self, states, destinations):
"""Send the given presence states to the given destinations.
Args:
states (list[UserPresenceState])
destinations (list[str])
"""
if not states or not self.hs.config.use_presence:
# No-op if presence is disabled.
return
for destination in destinations:
if destination == self.server_name:
continue
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):

View File

@ -21,8 +21,8 @@ import re
from twisted.internet import defer
import synapse
from synapse.api.constants import RoomVersions
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.http.server import JsonResource
@ -513,7 +513,7 @@ class FederationV1InviteServlet(BaseFederationServlet):
# state resolution algorithm, and we don't use that for processing
# invites
content = yield self.handler.on_invite_request(
origin, content, room_version=RoomVersions.V1,
origin, content, room_version=RoomVersions.V1.identifier,
)
# V1 federation API is defined to return a content of `[200, {...}]`

View File

@ -22,6 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
logger = logging.getLogger(__name__)
@ -896,6 +897,78 @@ class GroupsServerHandler(object):
"group_id": group_id,
})
@defer.inlineCallbacks
def delete_group(self, group_id, requester_user_id):
"""Deletes a group, kicking out all current members.
Only group admins or server admins can call this request
Args:
group_id (str)
request_user_id (str)
Returns:
Deferred
"""
yield self.check_group_is_ours(
group_id, requester_user_id,
and_exists=True,
)
# Only server admins or group admins can delete groups.
is_admin = yield self.store.is_user_admin_in_group(
group_id, requester_user_id
)
if not is_admin:
is_admin = yield self.auth.is_server_admin(
UserID.from_string(requester_user_id),
)
if not is_admin:
raise SynapseError(403, "User is not an admin")
# Before deleting the group lets kick everyone out of it
users = yield self.store.get_users_in_group(
group_id, include_private=True,
)
@defer.inlineCallbacks
def _kick_user_from_group(user_id):
if self.hs.is_mine_id(user_id):
groups_local = self.hs.get_groups_local_handler()
yield groups_local.user_removed_from_group(group_id, user_id, {})
else:
yield self.transport_client.remove_user_from_group_notification(
get_domain_from_id(user_id), group_id, user_id, {}
)
yield self.store.maybe_delete_remote_profile_cache(user_id)
# We kick users out in the order of:
# 1. Non-admins
# 2. Other admins
# 3. The requester
#
# This is so that if the deletion fails for some reason other admins or
# the requester still has auth to retry.
non_admins = []
admins = []
for u in users:
if u["user_id"] == requester_user_id:
continue
if u["is_admin"]:
admins.append(u["user_id"])
else:
non_admins.append(u["user_id"])
yield concurrently_execute(_kick_user_from_group, non_admins, 10)
yield concurrently_execute(_kick_user_from_group, admins, 10)
yield _kick_user_from_group(requester_user_id)
yield self.store.delete_group(group_id)
def _parse_join_policy_from_contents(content):
"""Given a content for a request, return the specified join policy or None

View File

@ -912,7 +912,7 @@ class AuthHandler(BaseHandler):
)
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
def delete_threepid(self, user_id, medium, address, id_server=None):
"""Attempts to unbind the 3pid on the identity servers and deletes it
from the local database.
@ -920,6 +920,10 @@ class AuthHandler(BaseHandler):
user_id (str)
medium (str)
address (str)
id_server (str|None): Use the given identity server when unbinding
any threepids. If None then will attempt to unbind using the
identity server specified when binding (if known).
Returns:
Deferred[bool]: Returns True if successfully unbound the 3pid on
@ -937,6 +941,7 @@ class AuthHandler(BaseHandler):
{
'medium': medium,
'address': address,
'id_server': id_server,
},
)

View File

@ -44,12 +44,15 @@ class DeactivateAccountHandler(BaseHandler):
hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id, erase_data):
def deactivate_account(self, user_id, erase_data, id_server=None):
"""Deactivate a user's account
Args:
user_id (str): ID of user to be deactivated
erase_data (bool): whether to GDPR-erase the user's data
id_server (str|None): Use the given identity server when unbinding
any threepids. If None then will attempt to unbind using the
identity server specified when binding (if known).
Returns:
Deferred[bool]: True if identity server supports removing
@ -75,6 +78,7 @@ class DeactivateAccountHandler(BaseHandler):
{
'medium': threepid['medium'],
'address': threepid['address'],
'id_server': id_server,
},
)
identity_server_supports_unbinding &= result

View File

@ -68,7 +68,7 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
servers = set(get_domain_from_id(u) for u in users)
if not servers:
@ -268,7 +268,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND
)
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
extra_servers = set(get_domain_from_id(u) for u in users)
servers = set(extra_servers) | set(servers)

View File

@ -102,7 +102,7 @@ class EventStreamHandler(BaseHandler):
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = yield self.state.get_current_user_in_room(event.room_id)
users = yield self.state.get_current_users_in_room(event.room_id)
states = yield presence_handler.get_states(
users,
as_event=True,

View File

@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RejectedReason,
RoomVersions,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
AuthError,
CodeMessageException,
@ -44,6 +38,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
@ -1734,7 +1729,9 @@ class FederationHandler(BaseHandler):
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
room_version = create_event.content.get("room_version", RoomVersions.V1)
room_version = create_event.content.get(
"room_version", RoomVersions.V1.identifier,
)
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):

View File

@ -139,6 +139,14 @@ class IdentityHandler(BaseHandler):
}
)
logger.debug("bound threepid %r to %s", creds, mxid)
# Remember where we bound the threepid
yield self.store.add_user_bound_threepid(
user_id=mxid,
medium=data["medium"],
address=data["address"],
id_server=id_server,
)
except CodeMessageException as e:
data = json.loads(e.msg) # XXX WAT?
defer.returnValue(data)
@ -147,9 +155,48 @@ class IdentityHandler(BaseHandler):
def try_unbind_threepid(self, mxid, threepid):
"""Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be
removed, and an optional id_server.
Raises:
SynapseError: If we failed to contact the identity server
Returns:
Deferred[bool]: True on success, otherwise False if the identity
server doesn't support unbinding (or no identity server found to
contact).
"""
if threepid.get("id_server"):
id_servers = [threepid["id_server"]]
else:
id_servers = yield self.store.get_id_servers_user_bound(
user_id=mxid,
medium=threepid["medium"],
address=threepid["address"],
)
# We don't know where to unbind, so we don't have a choice but to return
if not id_servers:
defer.returnValue(False)
changed = True
for id_server in id_servers:
changed &= yield self.try_unbind_threepid_with_id_server(
mxid, threepid, id_server,
)
defer.returnValue(changed)
@defer.inlineCallbacks
def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
"""Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
id_server (str): Identity server to unbind from
Raises:
SynapseError: If we failed to contact the identity server
@ -158,20 +205,13 @@ class IdentityHandler(BaseHandler):
Deferred[bool]: True on success, otherwise False if the identity
server doesn't support unbinding
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
logger.warn("Can't unbind threepid: no trusted ID servers set in config")
defer.returnValue(False)
# We don't track what ID server we added 3pids on (perhaps we ought to)
# but we assume that any of the servers in the trusted list are in the
# same ID server federation, so we can pick any one of them to send the
# deletion request to.
id_server = next(iter(self.trusted_id_servers))
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
content = {
"mxid": mxid,
"threepid": threepid,
"threepid": {
"medium": threepid["medium"],
"address": threepid["address"],
},
}
# we abuse the federation http client to sign the request, but we have to send it
@ -204,16 +244,24 @@ class IdentityHandler(BaseHandler):
content,
headers,
)
changed = True
except HttpResponseException as e:
changed = False
if e.code in (400, 404, 501,):
# The remote server probably doesn't support unbinding (yet)
logger.warn("Received %d response while unbinding threepid", e.code)
defer.returnValue(False)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(502, "Failed to contact identity server")
defer.returnValue(True)
yield self.store.remove_user_bound_threepid(
user_id=mxid,
medium=threepid["medium"],
address=threepid["address"],
id_server=id_server,
)
defer.returnValue(changed)
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):

View File

@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import EventTypes, Membership, RoomVersions
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@ -30,6 +30,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@ -191,7 +192,7 @@ class MessageHandler(object):
"Getting joined members after leaving is not implemented"
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
users_with_profile = yield self.state.get_current_users_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
@ -603,7 +604,9 @@ class EventCreationHandler(object):
"""
if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
room_version = event.content.get("room_version", RoomVersions.V1)
room_version = event.content.get(
"room_version", RoomVersions.V1.identifier
)
else:
room_version = yield self.store.get_room_version(event.room_id)

View File

@ -31,9 +31,11 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import PresenceState
import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
@ -98,6 +100,7 @@ class PresenceHandler(object):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
@ -110,30 +113,6 @@ class PresenceHandler(object):
federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
distributor = hs.get_distributor()
distributor.observe("user_joined_room", self.user_joined_room)
active_presence = self.store.take_presence_startup_info()
@ -220,6 +199,15 @@ class PresenceHandler(object):
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
# Used to handle sending of presence to newly joined users/servers
if hs.config.use_presence:
self.notifier.add_replication_callback(self.notify_new_event)
# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
self._event_pos = self.store.get_current_events_token()
self._event_processing = False
@defer.inlineCallbacks
def _on_shutdown(self):
"""Gets called when shutting down. This lets us persist any updates that
@ -750,162 +738,6 @@ class PresenceHandler(object):
yield self._update_states([prev_state.copy_and_replace(**new_fields)])
@defer.inlineCallbacks
def user_joined_room(self, user, room_id):
"""Called (via the distributor) when a user joins a room. This funciton
sends presence updates to servers, either:
1. the joining user is a local user and we send their presence to
all servers in the room.
2. the joining user is a remote user and so we send presence for all
local users in the room.
"""
# We only need to send presence to servers that don't have it yet. We
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
self._push_to_remotes([state])
else:
user_ids = yield self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))
states = yield self.current_state_for_users(user_ids)
self._push_to_remotes(list(states.values()))
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
"""
if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
presence_list = yield self.store.get_presence_list(
observer_user.localpart, accepted=accepted
)
results = yield self.get_states(
target_user_ids=[row["observed_user_id"] for row in presence_list],
as_event=False,
)
now = self.clock.time_msec()
results[:] = [format_user_presence_state(r, now) for r in results]
is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list
}
for result in results:
result.update({
"accepted": is_accepted,
})
defer.returnValue(results)
@defer.inlineCallbacks
def send_presence_invite(self, observer_user, observed_user):
"""Sends a presence invite.
"""
yield self.store.add_presence_list_pending(
observer_user.localpart, observed_user.to_string()
)
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
yield self.federation.build_and_send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
content={
"observed_user": observed_user.to_string(),
"observer_user": observer_user.to_string(),
}
)
@defer.inlineCallbacks
def invite_presence(self, observed_user, observer_user):
"""Handles new presence invites.
"""
if not self.is_mine(observed_user):
raise SynapseError(400, "User is not hosted on this Home Server")
# TODO: Don't auto accept
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",
content={
"observed_user": observed_user.to_string(),
"observer_user": observer_user.to_string(),
}
)
state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence",
content={
"push": [state_dict]
}
)
@defer.inlineCallbacks
def accept_presence(self, observed_user, observer_user):
"""Handles a m.presence_accept EDU. Mark a presence invite from a
local or remote user as accepted in a local user's presence list.
Starts polling for presence updates from the local or remote user.
Args:
observed_user(UserID): The user to update in the presence list.
observer_user(UserID): The owner of the presence list to update.
"""
yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string()
)
@defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user):
"""Handle a m.presence_deny EDU. Removes a local or remote user from a
local user's presence list.
Args:
observed_user(UserID): The local or remote user to remove from the
list.
observer_user(UserID): The local owner of the presence list.
Returns:
A Deferred.
"""
yield self.store.del_presence_list(
observer_user.localpart, observed_user.to_string()
)
# TODO(paul): Inform the user somehow?
@defer.inlineCallbacks
def drop(self, observed_user, observer_user):
"""Remove a local or remote user from a local user's presence list and
unsubscribe the local user from updates that user.
Args:
observed_user(UserId): The local or remote user to remove from the
list.
observer_user(UserId): The local owner of the presence list.
Returns:
A Deferred.
"""
if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
yield self.store.del_presence_list(
observer_user.localpart, observed_user.to_string()
)
# TODO: Inform the remote that we've dropped the presence list.
@defer.inlineCallbacks
def is_visible(self, observed_user, observer_user):
"""Returns whether a user can see another user's presence.
@ -920,11 +752,7 @@ class PresenceHandler(object):
if observer_room_ids & observed_room_ids:
defer.returnValue(True)
accepted_observers = yield self.store.get_presence_list_observers_accepted(
observed_user.to_string()
)
defer.returnValue(observer_user.to_string() in accepted_observers)
defer.returnValue(False)
@defer.inlineCallbacks
def get_all_presence_updates(self, last_id, current_id):
@ -945,6 +773,140 @@ class PresenceHandler(object):
rows = yield self.store.get_all_presence_updates(last_id, current_id)
defer.returnValue(rows)
def notify_new_event(self):
"""Called when new events have happened. Handles users and servers
joining rooms and require being sent presence.
"""
if self._event_processing:
return
@defer.inlineCallbacks
def _process_presence():
assert not self._event_processing
self._event_processing = True
try:
yield self._unsafe_process()
finally:
self._event_processing = False
run_as_background_process("presence.notify_new_event", _process_presence)
@defer.inlineCallbacks
def _unsafe_process(self):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
deltas = yield self.store.get_current_state_deltas(self._event_pos)
if not deltas:
return
yield self._handle_state_delta(deltas)
self._event_pos = deltas[-1]["stream_id"]
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("presence").set(
self._event_pos
)
@defer.inlineCallbacks
def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
if typ != EventTypes.Member:
continue
event = yield self.store.get_event(event_id)
if event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id)
if prev_event.content.get("membership") == Membership.JOIN:
# Ignore changes to join events.
continue
yield self._on_user_joined_room(room_id, state_key)
@defer.inlineCallbacks
def _on_user_joined_room(self, room_id, user_id):
"""Called when we detect a user joining the room via the current state
delta stream.
Args:
room_id (str)
user_id (str)
Returns:
Deferred
"""
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)
# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user
state = yield self.current_state_for_user(user_id)
hosts = yield self.state.get_current_hosts_in_room(room_id)
# Filter out ourselves.
hosts = set(host for host in hosts if host != self.server_name)
self.federation.send_presence_to_destinations(
states=[state],
destinations=hosts,
)
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.
# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen
# TODO: Check that this is actually a new server joining the
# room.
user_ids = yield self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))
states = yield self.current_state_for_users(user_ids)
# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
# depending on what we want the UX to be, but at the least we
# should filter out offline presence where the state is just the
# default state.
now = self.clock.time_msec()
states = [
state for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]
if states:
self.federation.send_presence_to_destinations(
states=states,
destinations=[get_domain_from_id(user_id)],
)
def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
@ -1086,10 +1048,7 @@ class PresenceEventSource(object):
updates for
"""
user_id = user.to_string()
plist = yield self.store.get_presence_list_accepted(
user.localpart, on_invalidate=cache_context.invalidate,
)
users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@ -1294,10 +1253,6 @@ def get_interested_parties(store, states):
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)
plist = yield store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
users_to_states.setdefault(u, []).append(state)
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)

View File

@ -154,6 +154,7 @@ class RegistrationHandler(BaseHandler):
user_type=None,
default_display_name=None,
address=None,
bind_emails=[],
):
"""Registers a new client on the server.
@ -173,6 +174,7 @@ class RegistrationHandler(BaseHandler):
default_display_name (unicode|None): if set, the new user's displayname
will be set to this. Defaults to 'localpart'.
address (str|None): the IP address used to perform the registration.
bind_emails (List[str]): list of emails to bind to this account.
Returns:
A tuple of (user_id, access_token).
Raises:
@ -272,6 +274,21 @@ class RegistrationHandler(BaseHandler):
if not self.hs.config.user_consent_at_registration:
yield self._auto_join_rooms(user_id)
# Bind any specified emails to this account
current_time = self.hs.get_clock().time_msec()
for email in bind_emails:
# generate threepid dict
threepid_dict = {
"medium": "email",
"address": email,
"validated_at": current_time,
}
# Bind email to new account
yield self._register_email_threepid(
user_id, threepid_dict, None, False,
)
defer.returnValue((user_id, token))
@defer.inlineCallbacks

View File

@ -25,14 +25,9 @@ from six import iteritems, string_types
from twisted.internet import defer
from synapse.api.constants import (
DEFAULT_ROOM_VERSION,
KNOWN_ROOM_VERSIONS,
EventTypes,
JoinRules,
RoomCreationPreset,
)
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@ -304,6 +299,7 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.RoomAvatar, ""),
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@ -515,7 +511,7 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
if not isinstance(room_version, string_types):
raise SynapseError(
400,

View File

@ -167,7 +167,7 @@ class RoomListHandler(BaseHandler):
if not latest_event_ids:
return
joined_users = yield self.state_handler.get_current_user_in_room(
joined_users = yield self.state_handler.get_current_users_in_room(
room_id, latest_event_ids,
)

View File

@ -451,6 +451,9 @@ class RoomMemberHandler(object):
room_id, latest_event_ids=latest_event_ids,
)
# TODO: Refactor into dictionary of explicitly allowed transitions
# between old and new state, with specific error messages for some
# transitions and generic otherwise
old_state_id = current_state_ids.get((EventTypes.Member, target.to_string()))
if old_state_id:
old_state = yield self.store.get_event(old_state_id, allow_none=True)
@ -476,6 +479,9 @@ class RoomMemberHandler(object):
if same_sender and same_membership and same_content:
defer.returnValue(old_state)
if old_membership in ["ban", "leave"] and action == "kick":
raise AuthError(403, "The target user is not in the room")
# we don't allow people to reject invites to the server notice
# room, but they can leave it once they are joined.
if (
@ -489,6 +495,9 @@ class RoomMemberHandler(object):
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
else:
if action == "kick":
raise AuthError(403, "The target user is not in the room")
is_host_in_room = yield self._is_host_in_room(current_state_ids)

View File

@ -1050,11 +1050,11 @@ class SyncHandler(object):
# TODO: Be more clever than this, i.e. remove users who we already
# share a room with?
for room_id in newly_joined_rooms:
joined_users = yield self.state.get_current_user_in_room(room_id)
joined_users = yield self.state.get_current_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)
for room_id in newly_left_rooms:
left_users = yield self.state.get_current_user_in_room(room_id)
left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)
# TODO: Check that these users are actually new, i.e. either they
@ -1215,7 +1215,7 @@ class SyncHandler(object):
extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
@ -1861,7 +1861,7 @@ class SyncHandler(object):
extrems = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering,
)
users_in_room = yield self.state.get_current_user_in_room(
users_in_room = yield self.state.get_current_users_in_room(
room_id, extrems,
)
if user_id in users_in_room:

View File

@ -218,7 +218,7 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _push_remote(self, member, typing):
try:
users = yield self.state.get_current_user_in_room(member.room_id)
users = yield self.state.get_current_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
@ -261,7 +261,7 @@ class TypingHandler(object):
)
return
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
domains = set(get_domain_from_id(u) for u in users)
if self.server_name in domains:

View File

@ -276,7 +276,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# ignore the change
return
users_with_profile = yield self.state.get_current_user_in_room(room_id)
users_with_profile = yield self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in iterkeys(users_with_profile):
@ -325,7 +325,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id
)
# Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_user_in_room(room_id)
users_with_profile = yield self.state.get_current_users_in_room(room_id)
if is_public:
yield self.store.add_users_in_public_rooms(room_id, (user_id,))

View File

@ -74,14 +74,14 @@ class ModuleApi(object):
return self._auth_handler.check_user_exists(user_id)
@defer.inlineCallbacks
def register(self, localpart, displayname=None):
def register(self, localpart, displayname=None, emails=[]):
"""Registers a new user with given localpart and optional
displayname.
displayname, emails.
Args:
localpart (str): The localpart of the new user.
displayname (str|None): The displayname of the new user. If None,
the user's displayname will default to `localpart`.
displayname (str|None): The displayname of the new user.
emails (List[str]): Emails to bind to the new user.
Returns:
Deferred: a 2-tuple of (user_id, access_token)
@ -90,6 +90,7 @@ class ModuleApi(object):
reg = self.hs.get_registration_handler()
user_id, access_token = yield reg.register(
localpart=localpart, default_display_name=displayname,
bind_emails=emails,
)
defer.returnValue((user_id, access_token))

View File

@ -72,8 +72,15 @@ class EmailPusher(object):
self._is_processing = False
def on_started(self):
if self.mailer is not None:
def on_started(self, should_check_for_notifs):
"""Called when this pusher has been started.
Args:
should_check_for_notifs (bool): Whether we should immediately
check for push to send. Set to False only if it's known there
is nothing to send
"""
if should_check_for_notifs and self.mailer is not None:
self._start_processing()
def on_stop(self):

View File

@ -112,8 +112,16 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
def on_started(self):
self._start_processing()
def on_started(self, should_check_for_notifs):
"""Called when this pusher has been started.
Args:
should_check_for_notifs (bool): Whether we should immediately
check for push to send. Set to False only if it's known there
is nothing to send
"""
if should_check_for_notifs:
self._start_processing()
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)

View File

@ -21,6 +21,7 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute
logger = logging.getLogger(__name__)
@ -197,7 +198,7 @@ class PusherPool:
p = r
if p:
self._start_pusher(p)
yield self._start_pusher(p)
@defer.inlineCallbacks
def _start_pushers(self):
@ -208,10 +209,14 @@ class PusherPool:
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
self._start_pusher(pusherdict)
# Stagger starting up the pushers so we don't completely drown the
# process on start up.
yield concurrently_execute(self._start_pusher, pushers, 10)
logger.info("Started pushers")
@defer.inlineCallbacks
def _start_pusher(self, pusherdict):
"""Start the given pusher
@ -248,7 +253,22 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
p.on_started()
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
user_id = pusherdict["user_name"]
last_stream_ordering = pusherdict["last_stream_ordering"]
if last_stream_ordering:
have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
user_id, last_stream_ordering,
)
else:
# We always want to default to starting up the pusher rather than
# risk missing push.
have_notifs = True
p.on_started(have_notifs)
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):

View File

@ -74,7 +74,9 @@ REQUIREMENTS = [
CONDITIONAL_REQUIREMENTS = {
"email.enable_notifs": ["Jinja2>=2.9", "bleach>=1.4.2"],
"matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"],
"postgres": ["psycopg2>=2.6"],
# we use execute_batch, which arrived in psycopg 2.7.
"postgres": ["psycopg2>=2.7"],
# ConsentResource uses select_autoescape, which arrived in jinja 2.9
"resources.consent": ["Jinja2>=2.9"],

View File

@ -16,6 +16,10 @@
import logging
from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
@ -79,11 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore,
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
self.invalidate_caches_for_event(
token, row.event_id, row.room_id, row.type, row.state_key,
row.redacts,
backfilled=False,
)
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
@ -96,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore,
stream_name, token, rows
)
def _process_event_stream_row(self, token, row):
data = row.data
if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event(
token, data.event_id, data.room_id, data.type, data.state_key,
data.redacts,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key, ),
)
else:
raise Exception("Unknown events stream row type %s" % (row.type, ))
def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
etype, state_key, redacts, backfilled):
self._invalidate_get_event_cache(event_id)

View File

@ -13,22 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage import DataStore
from synapse.storage.keys import KeyStore
from synapse.storage import KeyStore
from ._base import BaseSlavedStore, __func__
# KeyStore isn't really safe to use from a worker, but for now we do so and hope that
# the races it creates aren't too bad.
class SlavedKeyStore(BaseSlavedStore):
_get_server_verify_key = KeyStore.__dict__[
"_get_server_verify_key"
]
get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
store_server_verify_key = __func__(DataStore.store_server_verify_key)
get_server_certificate = __func__(DataStore.get_server_certificate)
store_server_certificate = __func__(DataStore.store_server_certificate)
get_server_keys_json = __func__(DataStore.get_server_keys_json)
store_server_keys_json = __func__(DataStore.store_server_keys_json)
SlavedKeyStore = KeyStore

View File

@ -39,16 +39,6 @@ class SlavedPresenceStore(BaseSlavedStore):
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()

View File

@ -103,10 +103,19 @@ class ReplicationClientHandler(object):
hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
the slave store.
"""Called to handle a batch of replication data with a given stream token.
Can be overriden in subclasses to handle more.
By default this just pokes the slave store. Can be overridden in subclasses to
handle more.
Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
logger.debug("Received rdata %s -> %s", stream_name, token)
return self.store.process_replication_rows(stream_name, token, rows)

View File

@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire::
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
> RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
inbound_rdata_count.labels(stream_name).inc()
try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
row = STREAMS_MAP[stream_name].parse_row(cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",

View File

@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
from .streams import STREAMS_MAP, FederationStream
from .streams import STREAMS_MAP
from .streams.federation import FederationStream
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])

View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.
Each stream is defined by the following information:
stream name: The name of the stream
row type: The type that is used to serialise/deserialse the row
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""
from . import _base, events, federation
STREAMS_MAP = {
stream.NAME: stream
for stream in (
events.EventsStream,
_base.BackfillStream,
_base.PresenceStream,
_base.TypingStream,
_base.ReceiptsStream,
_base.PushRulesStream,
_base.PushersStream,
_base.CachesStream,
_base.PublicRoomsStream,
_base.DeviceListsStream,
_base.ToDeviceStream,
federation.FederationStream,
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.GroupServerStream,
)
}

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.
Each stream is defined by the following information:
stream name: The name of the stream
row type: The type that is used to serialise/deserialse the row
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""
import itertools
import logging
from collections import namedtuple
@ -34,14 +26,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
@ -96,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
@ -111,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
"data_type", # str
"data", # dict
))
CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
"room_id", # str
"type", # str
"state_key", # str
"event_id", # str, optional
))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str
"user_id", # str
@ -132,9 +106,24 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called.
"""
NAME = None # The name of the stream
ROW_TYPE = None # The type of the row
ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit
@classmethod
def parse_row(cls, row):
"""Parse a row received over replication
By default, assumes that the row data is an array object and passes its contents
to the constructor of the ROW_TYPE for this stream.
Args:
row: row data from the incoming RDATA command, after json decoding
Returns:
ROW_TYPE object for this stream
"""
return cls.ROW_TYPE(*row)
def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()
@ -162,8 +151,10 @@ class Stream(object):
until the `upto_token`
Returns:
(list(ROW_TYPE), int): list of updates plus the token used as an
upper bound of the updates (i.e. the "current token")
Deferred[Tuple[List[Tuple[int, Any]], int]:
Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
"""
updates, current_token = yield self.get_updates_since(self.last_token)
self.last_token = current_token
@ -176,8 +167,10 @@ class Stream(object):
stream updates
Returns:
(list(ROW_TYPE), int): list of updates plus the token used as an
upper bound of the updates (i.e. the "current token")
Deferred[Tuple[List[Tuple[int, Any]], int]:
Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
"""
if from_token in ("NOW", "now"):
defer.returnValue(([], self.upto_token))
@ -202,7 +195,7 @@ class Stream(object):
from_token, current_token,
)
updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
updates = [(row[0], row[1:]) for row in rows]
# check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator.
@ -232,20 +225,6 @@ class Stream(object):
raise NotImplementedError()
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
ROW_TYPE = EventStreamRow
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows
super(EventsStream, self).__init__(hs)
class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
@ -400,22 +379,6 @@ class ToDeviceStream(Stream):
super(ToDeviceStream, self).__init__(hs)
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs):
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows
super(FederationStream, self).__init__(hs)
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
@ -459,21 +422,6 @@ class AccountDataStream(Stream):
defer.returnValue(results)
class CurrentStateDeltaStream(Stream):
"""Current state for a room was changed
"""
NAME = "current_state_deltas"
ROW_TYPE = CurrentStateDeltaStreamRow
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_max_current_state_delta_stream_id
self.update_function = store.get_all_updated_current_state_deltas
super(CurrentStateDeltaStream, self).__init__(hs)
class GroupServerStream(Stream):
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@ -485,26 +433,3 @@ class GroupServerStream(Stream):
self.update_function = store.get_all_groups_changes
super(GroupServerStream, self).__init__(hs)
STREAMS_MAP = {
stream.NAME: stream
for stream in (
EventsStream,
BackfillStream,
PresenceStream,
TypingStream,
ReceiptsStream,
PushRulesStream,
PushersStream,
CachesStream,
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
TagAccountDataStream,
AccountDataStream,
CurrentStateDeltaStream,
GroupServerStream,
)
}

View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
import attr
from twisted.internet import defer
from ._base import Stream
"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
identifier before the real data. For example::
RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
An "ev" row is sent for each new event. The fields in the data part are:
* The new event id
* The room id for the event
* The type of the new event
* The state key of the event, for state events
* The event id of an event which is redacted by this event.
A "state" row is sent whenever the "current state" in a room changes. The fields in the
data part are:
* The room id for the state change
* The event type of the state which has changed
* The state_key of the state which has changed
* The event id of the new state
"""
@attr.s(slots=True, frozen=True)
class EventsStreamRow(object):
"""A parsed row from the events replication stream"""
type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
data = attr.ib() # BaseEventsStreamRow
class BaseEventsStreamRow(object):
"""Base class for rows to be sent in the events stream.
Specifies how to identify, serialize and deserialize the different types.
"""
TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
@classmethod
def from_data(cls, data):
"""Parse the data from the replication stream into a row.
By default we just call the constructor with the data list as arguments
Args:
data: The value of the data object from the replication stream
"""
return cls(*data)
@attr.s(slots=True, frozen=True)
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
event_id = attr.ib() # str
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str, optional
redacts = attr.ib() # str, optional
@attr.s(slots=True, frozen=True)
class EventsStreamCurrentStateRow(BaseEventsStreamRow):
TypeId = "state"
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str
event_id = attr.ib() # str, optional
TypeToRow = {
Row.TypeId: Row
for Row in (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
)
}
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
def __init__(self, hs):
self._store = hs.get_datastore()
self.current_token = self._store.get_current_events_token
super(EventsStream, self).__init__(hs)
@defer.inlineCallbacks
def update_function(self, from_token, current_token, limit=None):
event_rows = yield self._store.get_all_new_forward_event_rows(
from_token, current_token, limit,
)
event_updates = (
(row[0], EventsStreamEventRow.TypeId, row[1:])
for row in event_rows
)
state_rows = yield self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (
(row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
for row in state_rows
)
all_updates = heapq.merge(event_updates, state_updates)
defer.returnValue(all_updates)
@classmethod
def parse_row(cls, row):
(typ, data) = row
data = TypeToRow[typ].from_data(data)
return EventsStreamRow(typ, data)

View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from ._base import Stream
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs):
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows
super(FederationStream, self).__init__(hs)

Some files were not shown because too many files have changed in this diff Show More