Merge remote-tracking branch 'upstream/master' into feat-dockerfile

This commit is contained in:
kaiyou 2018-05-02 20:22:41 +02:00
commit 9a779c2ddb
69 changed files with 1022 additions and 490 deletions

View File

@ -1,3 +1,74 @@
Changes in synapse v0.28.1 (2018-05-01)
=======================================
SECURITY UPDATE
* Clamp the allowed values of event depth received over federation to be
[0, 2^63 - 1]. This mitigates an attack where malicious events
injected with depth = 2^63 - 1 render rooms unusable. Depth is used to
determine the cosmetic ordering of events within a room, and so the ordering
of events in such a room will default to using stream_ordering rather than depth
(topological_ordering).
This is a temporary solution to mitigate abuse in the wild, whilst a long term solution
is being implemented to improve how the depth parameter is used.
Full details at
https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI
* Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API.
Changes in synapse v0.28.0 (2018-04-26)
=======================================
Bug Fixes:
* Fix quarantine media admin API and search reindex (PR #3130)
* Fix media admin APIs (PR #3134)
Changes in synapse v0.28.0-rc1 (2018-04-24)
===========================================
Minor performance improvement to federation sending and bug fixes.
(Note: This release does not include the delta state resolution implementation discussed in matrix live)
Features:
* Add metrics for event processing lag (PR #3090)
* Add metrics for ResponseCache (PR #3092)
Changes:
* Synapse on PyPy (PR #2760) Thanks to @Valodim!
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
* Document the behaviour of ResponseCache (PR #3059)
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
* Send federation events concurrently (PR #3078)
* Limit concurrent event sends for a room (PR #3079)
* Improve R30 stat definition (PR #3086)
* Send events to ASes concurrently (PR #3088)
* Refactor ResponseCache usage (PR #3093)
* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
* Refactor store.have_events (PR #3117)
Bug Fixes:
* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
* fix federation_domain_whitelist (PR #3099)
* Avoid creating events with huge numbers of prev_events (PR #3113)
* Reject events which have lots of prev_events (PR #3118)
Changes in synapse v0.27.4 (2018-04-13)
======================================
@ -61,7 +132,6 @@ Bug fixes:
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
* Fix replication after switch to simplejson (PR #3015)
* Fix replication after switch to simplejson (PR #3015)
* 404 correctly on missing paths via NoResource (PR #3022)
* Fix error when claiming e2e keys from offline servers (PR #3034)
* fix tests/storage/test_user_directory.py (PR #3042)

View File

@ -614,6 +614,9 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
$ dig -t srv _matrix._tcp.example.com
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
Note that the server hostname cannot be an alias (CNAME record): it has to point
directly to the server hosting the synapse instance.
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
its user-ids, by setting ``server_name``::

10
contrib/README.rst Normal file
View File

@ -0,0 +1,10 @@
Community Contributions
=======================
Everything in this directory are projects submitted by the community that may be useful
to others. As such, the project maintainers cannot guarantee support, stability
or backwards compatibility of these projects.
Files in this directory should *not* be relied on directly, as they may not
continue to work or exist in future. If you wish to use any of these files then
they should be copied to avoid them breaking from underneath you.

View File

@ -22,6 +22,8 @@ import argparse
from synapse.events import FrozenEvent
from synapse.util.frozenutils import unfreeze
from six import string_types
def make_graph(file_name, room_id, file_prefix, limit):
print "Reading lines"
@ -58,7 +60,7 @@ def make_graph(file_name, room_id, file_prefix, limit):
for key, value in unfreeze(event.get_dict()["content"]).items():
if value is None:
value = "<null>"
elif isinstance(value, basestring):
elif isinstance(value, string_types):
pass
else:
value = json.dumps(value)

View File

@ -202,11 +202,11 @@ new PromConsole.Graph({
<h1>Requests</h1>
<h3>Requests by Servlet</h3>
<div id="synapse_http_server_requests_servlet"></div>
<div id="synapse_http_server_request_count_servlet"></div>
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_requests_servlet"),
expr: "rate(synapse_http_server_requests:servlet[2m])",
node: document.querySelector("#synapse_http_server_request_count_servlet"),
expr: "rate(synapse_http_server_request_count:servlet[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@ -215,11 +215,11 @@ new PromConsole.Graph({
})
</script>
<h4>&nbsp;(without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
<div id="synapse_http_server_requests_servlet_minus_events"></div>
<div id="synapse_http_server_request_count_servlet_minus_events"></div>
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
node: document.querySelector("#synapse_http_server_request_count_servlet_minus_events"),
expr: "rate(synapse_http_server_request_count:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@ -233,7 +233,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_time_avg"),
expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
expr: "rate(synapse_http_server_response_time_seconds[2m]) / rate(synapse_http_server_response_count[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@ -276,7 +276,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_ru_utime"),
expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
expr: "rate(synapse_http_server_response_ru_utime_seconds[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@ -291,7 +291,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
expr: "rate(synapse_http_server_response_db_txn_duration_seconds[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@ -306,7 +306,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_send_time_avg"),
expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
expr: "rate(synapse_http_server_response_time_second{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,

View File

@ -1,10 +1,10 @@
synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])

View File

@ -5,19 +5,19 @@ groups:
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- record: "synapse_federation_transaction_queue_pendingPdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- record: 'synapse_http_server_requests:method'
- record: 'synapse_http_server_request_count:method'
labels:
servlet: ""
expr: "sum(synapse_http_server_requests) by (method)"
- record: 'synapse_http_server_requests:servlet'
expr: "sum(synapse_http_server_request_count) by (method)"
- record: 'synapse_http_server_request_count:servlet'
labels:
method: ""
expr: 'sum(synapse_http_server_requests) by (servlet)'
expr: 'sum(synapse_http_server_request_count) by (servlet)'
- record: 'synapse_http_server_requests:total'
- record: 'synapse_http_server_request_count:total'
labels:
servlet: ""
expr: 'sum(synapse_http_server_requests:by_method) by (servlet)'
expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
- record: 'synapse_cache:hit_ratio_5m'
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'

View File

@ -2,6 +2,9 @@
# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
# rather than in a user home directory or similar under virtualenv.
# **NOTE:** This is an example service file that may change in the future. If you
# wish to use this please copy rather than symlink it.
[Unit]
Description=Synapse Matrix homeserver
@ -12,6 +15,7 @@ Group=synapse
WorkingDirectory=/var/lib/synapse
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
# EnvironmentFile=-/etc/sysconfig/synapse # Can be used to e.g. set SYNAPSE_CACHE_FACTOR
[Install]
WantedBy=multi-user.target

View File

@ -30,6 +30,8 @@ import time
import traceback
import yaml
from six import string_types
logger = logging.getLogger("synapse_port_db")
@ -574,7 +576,7 @@ class Porter(object):
def conv(j, col):
if j in bool_cols:
return bool(col)
elif isinstance(col, basestring) and "\0" in col:
elif isinstance(col, string_types) and "\0" in col:
logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col)
raise BadValueException();
return col

View File

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.27.4"
__version__ = "0.28.1"

View File

@ -204,8 +204,8 @@ class Auth(object):
ip_addr = self.hs.get_ip_from_request(request)
user_agent = request.requestHeaders.getRawHeaders(
"User-Agent",
default=[""]
b"User-Agent",
default=[b""]
)[0]
if user and access_token and ip_addr:
self.store.insert_client_ip(
@ -672,7 +672,7 @@ def has_access_token(request):
bool: False if no access_token was given, True otherwise.
"""
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
return bool(query_params) or bool(auth_headers)
@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401):
AuthError: If there isn't an access_token in the request.
"""
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
query_params = request.args.get(b"access_token")
if auth_headers:
# Try the get the access_token from a "Authorization: Bearer"
# header

View File

@ -16,6 +16,9 @@
"""Contains constants from the specification."""
# the "depth" field on events is limited to 2**63 - 1
MAX_DEPTH = 2**63 - 1
class Membership(object):

View File

@ -18,6 +18,7 @@
import logging
import simplejson as json
from six import iteritems
logger = logging.getLogger(__name__)
@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
A dict representing the error response JSON.
"""
err = {"error": msg, "errcode": code}
for key, value in kwargs.iteritems():
for key, value in iteritems(kwargs):
err[key] = value
return err

View File

@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet):
# They're actually trying to upload something, proxy to main synapse.
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
headers = {
"Authorization": auth_headers,
}

View File

@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
from six import iteritems
logger = logging.getLogger("synapse.app.synchrotron")
@ -211,7 +213,7 @@ class SynchrotronPresence(object):
def get_currently_syncing_users(self):
return [
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]

View File

@ -21,6 +21,8 @@ from twisted.internet import defer
import logging
import re
from six import string_types
logger = logging.getLogger(__name__)
@ -146,7 +148,7 @@ class ApplicationService(object):
)
regex = regex_obj.get("regex")
if isinstance(regex, basestring):
if isinstance(regex, string_types):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError(

View File

@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@ -73,7 +72,8 @@ class ApplicationServiceApi(SimpleHttpClient):
super(ApplicationServiceApi, self).__init__(hs)
self.clock = hs.get_clock()
self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
timeout_ms=HOUR_IN_MS)
@defer.inlineCallbacks
def query_user(self, service, user_id):
@ -193,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(None)
key = (service.id, protocol)
result = self.protocol_meta_cache.get(key)
if not result:
result = self.protocol_meta_cache.set(
key, preserve_fn(_get)()
)
return make_deferred_yieldable(result)
return self.protocol_meta_cache.wrap(key, _get)
@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):

View File

@ -19,6 +19,8 @@ import os
import yaml
from textwrap import dedent
from six import integer_types
class ConfigError(Exception):
pass
@ -49,7 +51,7 @@ Missing mandatory `server_name` config option.
class Config(object):
@staticmethod
def parse_size(value):
if isinstance(value, int) or isinstance(value, long):
if isinstance(value, integer_types):
return value
sizes = {"K": 1024, "M": 1024 * 1024}
size = 1
@ -61,7 +63,7 @@ class Config(object):
@staticmethod
def parse_duration(value):
if isinstance(value, int) or isinstance(value, long):
if isinstance(value, integer_types):
return value
second = 1000
minute = 60 * second
@ -288,22 +290,22 @@ class Config(object):
)
obj.invoke_all("generate_files", config)
config_file.write(config_bytes)
print (
print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
" certificates. Please review this file and customise it"
" to your needs."
) % (config_path, server_name)
print (
) % (config_path, server_name))
print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
print (
print((
"Config file %r already exists. Generating any missing key"
" files."
) % (config_path,)
) % (config_path,))
generate_keys = True
parser = argparse.ArgumentParser(

View File

@ -21,6 +21,8 @@ import urllib
import yaml
import logging
from six import string_types
logger = logging.getLogger(__name__)
@ -89,14 +91,14 @@ def _load_appservice(hostname, as_info, config_filename):
"id", "as_token", "hs_token", "sender_localpart"
]
for field in required_string_fields:
if not isinstance(as_info.get(field), basestring):
if not isinstance(as_info.get(field), string_types):
raise KeyError("Required string field: '%s' (%s)" % (
field, config_filename,
))
# 'url' must either be a string or explicitly null, not missing
# to avoid accidentally turning off push for ASes.
if (not isinstance(as_info.get("url"), basestring) and
if (not isinstance(as_info.get("url"), string_types) and
as_info.get("url", "") is not None):
raise KeyError(
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
@ -128,7 +130,7 @@ def _load_appservice(hostname, as_info, config_filename):
"Expected namespace entry in %s to be an object,"
" but got %s", ns, regex_obj
)
if not isinstance(regex_obj.get("regex"), basestring):
if not isinstance(regex_obj.get("regex"), string_types):
raise ValueError(
"Missing/bad type 'regex' key in %s", regex_obj
)

View File

@ -352,7 +352,7 @@ class Keyring(object):
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e.message),
type(e).__name__, str(e),
)
defer.returnValue({})
@ -384,7 +384,7 @@ class Keyring(object):
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e.message),
type(e).__name__, str(e),
)
if not keys:
@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request):
except IOError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
server_name, type(e).__name__, str(e),
)
raise SynapseError(
502,
@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request):
except Exception as e:
logger.exception(
"Got Exception when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
server_name, type(e).__name__, str(e),
)
raise SynapseError(
401,

View File

@ -14,7 +14,10 @@
# limitations under the License.
import logging
from synapse.api.errors import SynapseError
import six
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import SynapseError, Codes
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
FrozenEvent
Raises:
SynapseError: if the pdu is missing required fields
SynapseError: if the pdu is missing required fields or is otherwise
not a valid matrix event
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
assert_params_in_request(pdu_json, ('event_id', 'type'))
assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
Codes.BAD_JSON)
if depth < 0:
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event = FrozenEvent(
pdu_json
)

View File

@ -394,7 +394,7 @@ class FederationClient(FederationBase):
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_events(event_ids)
seen_events = yield self.store.have_seen_events(event_ids)
signed_events = []
failed_to_fetch = set()

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -30,9 +31,10 @@ import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
from six import iteritems
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@ -65,7 +67,7 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
@defer.inlineCallbacks
@log_function
@ -212,16 +214,17 @@ class FederationServer(FederationBase):
if not in_room:
raise AuthError(403, "Host not in room.")
result = self._state_resp_cache.get((room_id, event_id))
if not result:
with (yield self._server_linearizer.queue((origin, room_id))):
d = self._state_resp_cache.set(
(room_id, event_id),
preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
)
resp = yield make_deferred_yieldable(d)
else:
resp = yield make_deferred_yieldable(result)
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (yield self._server_linearizer.queue((origin, room_id))):
resp = yield self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id, event_id,
)
defer.returnValue((200, resp))
@ -425,9 +428,9 @@ class FederationServer(FederationBase):
"Claimed one-time-keys: %s",
",".join((
"%s for %s:%s" % (key_id, user_id, device_id)
for user_id, user_keys in json_result.iteritems()
for device_id, device_keys in user_keys.iteritems()
for key_id, _ in device_keys.iteritems()
for user_id, user_keys in iteritems(json_result)
for device_id, device_keys in iteritems(user_keys)
for key_id, _ in iteritems(device_keys)
)),
)
@ -494,13 +497,33 @@ class FederationServer(FederationBase):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
(The error will then be logged and sent back to the sender (which
probably won't do anything with it), and other events in the
transaction will be processed as normal).
It is likely that we'll then receive other events which refer to
this rejected_event in their prev_events, etc. When that happens,
we'll attempt to fetch the rejected event again, which will presumably
fail, so those second-generation events will also get rejected.
Eventually, we get to the point where there are more than 10 events
between any new events and the original rejected event. Since we
only try to backfill 10 events deep on received pdu, we then accept the
new event, possibly introducing a discontinuity in the DAG, with new
forward extremities, so normal service is approximately returned,
until we try to backfill across the discontinuity.
Args:
origin (str): server which sent the pdu
pdu (FrozenEvent): received pdu
Returns (Deferred): completes with None
Raises: FederationError if the signatures / hash do not match
"""
Raises: FederationError if the signatures / hash do not match, or
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):

View File

@ -40,6 +40,8 @@ from collections import namedtuple
import logging
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
for uids in self.presence_changed.itervalues()
for uids in itervalues(self.presence_changed)
for user_id in uids
)
@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
for ((destination, edu_key), pos) in keyed_edus.iteritems():
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
for (destination, pos) in device_messages.iteritems():
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
destination=destination,
)))
@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
for destination, edu_list in buff.edus.iteritems():
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
for destination, failure_list in buff.failures.iteritems():
for destination, failure_list in iteritems(buff.failures):
for failure in failure_list:
transaction_queue.send_failure(destination, failure)

View File

@ -169,7 +169,7 @@ class TransactionQueue(object):
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=20,
last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
@ -177,24 +177,33 @@ class TransactionQueue(object):
if not events and next_token >= self._last_poked_id:
break
for event in events:
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
continue
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
destinations = set(destinations)
if send_on_behalf_of is not None:
@ -207,12 +216,44 @@ class TransactionQueue(object):
self._send_pdu(event, destinations)
events_processed_counter.inc_by(len(events))
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
],
consumeErrors=True
))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
events_processed_counter.inc_by(len(events))
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
finally:
self._is_processing = False

View File

@ -94,12 +94,6 @@ class Authenticator(object):
"signatures": {},
}
if (
self.federation_domain_whitelist is not None and
self.server_name not in self.federation_domain_whitelist
):
raise FederationDeniedError(self.server_name)
if content is not None:
json_request["content"] = content
@ -138,6 +132,12 @@ class Authenticator(object):
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
if (
self.federation_domain_whitelist is not None and
origin not in self.federation_domain_whitelist
):
raise FederationDeniedError(origin)
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,

View File

@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)
import logging
@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events:
break
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@defer.inlineCallbacks
def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
continue # no services need notifying
return # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
service, event
)
events_processed_counter.inc_by(len(events))
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
events_processed_counter.inc_by(len(events))
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
finally:
self.is_processing = False

View File

@ -15,8 +15,14 @@
# limitations under the License.
"""Contains handlers for federation events."""
import httplib
import itertools
import logging
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
from twisted.internet import defer
import itertools
import logging
logger = logging.getLogger(__name__)
@ -115,6 +117,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
# could cause a huge state resolution or cascade of event fetches.
try:
self._sanity_check_event(pdu)
except SynapseError as err:
raise FederationError(
"ERROR",
err.code,
err.msg,
affected=pdu.event_id,
)
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@ -149,10 +164,6 @@ class FederationHandler(BaseHandler):
auth_chain = []
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
fetch_state = False
# Get missing pdus if necessary.
@ -168,7 +179,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@ -196,8 +207,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.iterkeys())
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@ -248,8 +258,7 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
@ -361,9 +370,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
seen_ids = set(
(yield self.store.have_events(event_ids)).keys()
)
seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@ -527,9 +534,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. This may return
be successfull and still return no events if the other side has no new
events to offer.
This will attempt to get more events from the remote. If the other side
has no new events to offer, this will return an empty list.
As the events are received, we check their signatures, and also do some
sanity-checking on them. If any of the backfilled events are invalid,
this method throws a SynapseError.
TODO: make this more useful to distinguish failures of the remote
server from invalid events (there is probably no point in trying to
re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@ -541,6 +555,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
# ideally we'd sanity check the events here for excess prev_events etc,
# but it's hard to reject events at this point without completely
# breaking backfill in the same way that it is currently broken by
# events whose signature we cannot verify (#3121).
#
# So for now we accept the events anyway. #3124 tracks this.
#
# for ev in events:
# self._sanity_check_event(ev)
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@ -633,7 +657,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
seen_events = yield self.store.have_events(
seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@ -843,6 +867,38 @@ class FederationHandler(BaseHandler):
defer.returnValue(False)
def _sanity_check_event(self, ev):
"""
Do some early sanity checks of a received event
In particular, checks it doesn't have an excessive number of
prev_events or auth_events, which could cause a huge state resolution
or cascade of event fetches.
Args:
ev (synapse.events.EventBase): event to be checked
Returns: None
Raises:
SynapseError if the event does not pass muster
"""
if len(ev.prev_events) > 20:
logger.warn("Rejecting event %s which has %i prev_events",
ev.event_id, len(ev.prev_events))
raise SynapseError(
httplib.BAD_REQUEST,
"Too many prev_events",
)
if len(ev.auth_events) > 10:
logger.warn("Rejecting event %s which has %i auth_events",
ev.event_id, len(ev.auth_events))
raise SynapseError(
httplib.BAD_REQUEST,
"Too many auth_events",
)
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
have_events = yield self.store.have_events(
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
seen_remotes = yield self.store.have_events(
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
if e.event_id in seen_remotes.keys():
if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
have_events = yield self.store.have_events(
have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
seen_remotes = yield self.store.have_events(
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
if ev.event_id in seen_remotes.keys():
if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:

View File

@ -16,7 +16,7 @@
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
@ -37,7 +37,6 @@ from ._base import BaseHandler
from canonicaljson import encode_canonical_json
import logging
import random
import simplejson
logger = logging.getLogger(__name__)
@ -433,7 +432,7 @@ class EventCreationHandler(object):
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_event_ids=None):
prev_events_and_hashes=None):
"""
Given a dict from a client, create a new event.
@ -447,47 +446,52 @@ class EventCreationHandler(object):
event_dict (dict): An entire event
token_id (str)
txn_id (str)
prev_event_ids (list): The prev event ids to use when creating the event
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Returns:
Tuple of created event (FrozenEvent), Context
"""
builder = self.event_builder_factory.new(event_dict)
with (yield self.limiter.queue(builder.room_id)):
self.validator.validate_new(builder)
self.validator.validate_new(builder)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
try:
if "displayname" not in content:
content["displayname"] = yield profile.get_displayname(target)
if "avatar_url" not in content:
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
target, e
)
try:
if "displayname" not in content:
content["displayname"] = yield profile.get_displayname(target)
if "avatar_url" not in content:
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
target, e
)
if token_id is not None:
builder.internal_metadata.token_id = token_id
if token_id is not None:
builder.internal_metadata.token_id = token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
)
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_events_and_hashes=prev_events_and_hashes,
)
defer.returnValue((event, context))
@ -557,64 +561,80 @@ class EventCreationHandler(object):
See self.create_event and self.send_nonmember_event.
"""
event, context = yield self.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
)
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, basestring):
spam_error = "Spam is not permitted here"
raise SynapseError(
403, spam_error, Codes.FORBIDDEN
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
# a situation where event persistence can't keep up, causing
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (yield self.limiter.queue(event_dict["room_id"])):
event, context = yield self.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
)
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
)
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, basestring):
spam_error = "Spam is not permitted here"
raise SynapseError(
403, spam_error, Codes.FORBIDDEN
)
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
)
defer.returnValue(event)
@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
depth = prev_max_depth + 1
else:
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
builder.room_id,
def create_new_client_event(self, builder, requester=None,
prev_events_and_hashes=None):
"""Create a new event for a local client
Args:
builder (EventBuilder):
requester (synapse.types.Requester|None):
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Returns:
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
if prev_events_and_hashes is not None:
assert len(prev_events_and_hashes) <= 10, \
"Attempting to create an event with %i prev_events" % (
len(prev_events_and_hashes),
)
else:
prev_events_and_hashes = \
yield self.store.get_prev_events_for_room(builder.room_id)
# We want to limit the max number of prev events we point to in our
# new event
if len(latest_ret) > 10:
# Sort by reverse depth, so we point to the most recent.
latest_ret.sort(key=lambda a: -a[2])
new_latest_ret = latest_ret[:5]
if prev_events_and_hashes:
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
else:
depth = 1
# We also randomly point to some of the older events, to make
# sure that we don't completely ignore the older events.
if latest_ret[5:]:
sample_size = min(5, len(latest_ret[5:]))
new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
latest_ret = new_latest_ret
if latest_ret:
depth = max([d for _, _, d in latest_ret]) + 1
else:
depth = 1
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in latest_ret
]
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
builder.prev_events = prev_events
builder.depth = depth

View File

@ -23,7 +23,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID
from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler):
token = None
attempts += 1
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
# rather than there being consistent matrix-wide ones, so we don't.
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue((user_id, access_token))
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
)

View File

@ -20,7 +20,6 @@ from ._base import BaseHandler
from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
self.response_cache = ResponseCache(hs, "room_list")
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler):
)
key = (limit, since_token, network_tuple)
result = self.response_cache.get(key)
if not result:
logger.info("No cached result, calculating one.")
result = self.response_cache.set(
key,
preserve_fn(self._get_public_room_list)(
limit, since_token, network_tuple=network_tuple
)
)
else:
logger.info("Using cached deferred result.")
return make_deferred_yieldable(result)
return self.response_cache.wrap(
key,
self._get_public_room_list,
limit, since_token, network_tuple=network_tuple,
)
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler):
server_name, limit, since_token, include_all_networks,
third_party_instance_id,
)
result = self.remote_response_cache.get(key)
if not result:
result = self.remote_response_cache.set(
key,
repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
)
return result
return self.remote_response_cache.wrap(
key,
repl_layer.get_public_rooms,
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
class RoomListNextBatch(namedtuple("RoomListNextBatch", (

View File

@ -149,7 +149,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
prev_event_ids,
prev_events_and_hashes,
txn_id=None,
ratelimit=True,
content=None,
@ -175,7 +175,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
)
# Check if this event matches the previous membership event for the user.
@ -314,7 +314,12 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server",
)
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
@ -403,7 +408,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
)
defer.returnValue(res)
@ -852,6 +857,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
# and if it is the only entry we'd like to return a 404 rather than a
# 500.
remote_room_hosts = [
host for host in remote_room_hosts if host != self.hs.hostname
]
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")

View File

@ -15,7 +15,7 @@
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
__bool__ = __nonzero__ # python3
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
__bool__ = __nonzero__ # python3
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
or self.state
or self.account_data
)
__bool__ = __nonzero__ # python3
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
__bool__ = __nonzero__ # python3
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
__bool__ = __nonzero__ # python3
class DeviceLists(collections.namedtuple("DeviceLists", [
@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
def __nonzero__(self):
return bool(self.changed or self.left)
__bool__ = __nonzero__ # python3
class SyncResult(collections.namedtuple("SyncResult", [
@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.device_lists or
self.groups
)
__bool__ = __nonzero__ # python3
class SyncHandler(object):
@ -169,7 +176,7 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs)
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@ -180,15 +187,11 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
result = self.response_cache.get(sync_config.request_key)
if not result:
result = self.response_cache.set(
sync_config.request_key,
preserve_fn(self._wait_for_sync_for_user)(
sync_config, since_token, timeout, full_state
)
)
return make_deferred_yieldable(result)
return self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,

View File

@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
@ -33,7 +31,7 @@ SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
#
# "host" is actually a dotted-quad or ipv6 address string. Except when there's
# "host" is the hostname acquired from the SRV record. Except when there's
# no SRV record, in which case it is the original hostname.
_Server = collections.namedtuple(
"_Server", "priority weight host port expires"
@ -297,20 +295,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
payload = answer.payload
hosts = yield _get_hosts_for_srv_record(
dns_client, str(payload.target)
)
for (ip, ttl) in hosts:
host_ttl = min(answer.ttl, ttl)
servers.append(_Server(
host=ip,
port=int(payload.port),
priority=int(payload.priority),
weight=int(payload.weight),
expires=int(clock.time()) + host_ttl,
))
servers.append(_Server(
host=str(payload.target),
port=int(payload.port),
priority=int(payload.priority),
weight=int(payload.weight),
expires=int(clock.time()) + answer.ttl,
))
servers.sort()
cache[service_name] = list(servers)
@ -328,81 +319,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
raise e
defer.returnValue(servers)
@defer.inlineCallbacks
def _get_hosts_for_srv_record(dns_client, host):
"""Look up each of the hosts in a SRV record
Args:
dns_client (twisted.names.dns.IResolver):
host (basestring): host to look up
Returns:
Deferred[list[(str, int)]]: a list of (host, ttl) pairs
"""
ip4_servers = []
ip6_servers = []
def cb(res):
# lookupAddress and lookupIP6Address return a three-tuple
# giving the answer, authority, and additional sections of the
# response.
#
# we only care about the answers.
return res[0]
def eb(res, record_type):
if res.check(DNSNameError):
return []
logger.warn("Error looking up %s for %s: %s", record_type, host, res)
return res
# no logcontexts here, so we can safely fire these off and gatherResults
d1 = dns_client.lookupAddress(host).addCallbacks(
cb, eb, errbackArgs=("A", ))
d2 = dns_client.lookupIPV6Address(host).addCallbacks(
cb, eb, errbackArgs=("AAAA", ))
results = yield defer.DeferredList(
[d1, d2], consumeErrors=True)
# if all of the lookups failed, raise an exception rather than blowing out
# the cache with an empty result.
if results and all(s == defer.FAILURE for (s, _) in results):
defer.returnValue(results[0][1])
for (success, result) in results:
if success == defer.FAILURE:
continue
for answer in result:
if not answer.payload:
continue
try:
if answer.type == dns.A:
ip = answer.payload.dottedQuad()
ip4_servers.append((ip, answer.ttl))
elif answer.type == dns.AAAA:
ip = socket.inet_ntop(
socket.AF_INET6, answer.payload.address,
)
ip6_servers.append((ip, answer.ttl))
else:
# the most likely candidate here is a CNAME record.
# rfc2782 says srvs may not point to aliases.
logger.warn(
"Ignoring unexpected DNS record type %s for %s",
answer.type, host,
)
continue
except Exception as e:
logger.warn("Ignoring invalid DNS response for %s: %s",
host, e)
continue
# keep the ipv4 results before the ipv6 results, mostly to match historical
# behaviour.
defer.returnValue(ip4_servers + ip6_servers)

View File

@ -329,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource):
register_paths, so will return (possibly via Deferred) either
None, or a tuple of (http code, response body).
"""
if request.method == "OPTIONS":
if request.method == b"OPTIONS":
return _options_handler, {}
# Loop through all the registered callbacks to check if the method
@ -543,7 +543,7 @@ def finish_request(request):
def _request_user_agent_is_curl(request):
user_agents = request.requestHeaders.getRawHeaders(
"User-Agent", default=[]
b"User-Agent", default=[]
)
for user_agent in user_agents:
if "curl" in user_agent:

View File

@ -20,7 +20,7 @@ import logging
import re
import time
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
class SynapseRequest(Request):
@ -43,12 +43,12 @@ class SynapseRequest(Request):
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
r'\1<redacted>\3',
br'\1<redacted>\3',
self.uri
)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def started_processing(self):
self.site.access_logger.info(

View File

@ -17,12 +17,13 @@ import logging
import functools
import time
import gc
import platform
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric,
MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector
@ -30,6 +31,7 @@ from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
all_metrics = []
all_collectors = []
@ -63,6 +65,13 @@ class Metrics(object):
"""
return self._register(CounterMetric, *args, **kwargs)
def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
"""
Returns:
@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
def runUntilCurrentTimer(func):
@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
if running_on_pypy:
return ret
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
threshold = gc.get_threshold()
@ -206,6 +244,7 @@ try:
# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
gc.disable()
if not running_on_pypy:
gc.disable()
except AttributeError:
pass

View File

@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty list).
# (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}
# Scalar metrics are never empty
@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
)
class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""
def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}
def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
self.guages[values] = v
def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the

View File

@ -144,6 +144,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
__bool__ = __nonzero__ # python3
class Notifier(object):

View File

@ -1,5 +1,6 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -18,6 +19,18 @@ from distutils.version import LooseVersion
logger = logging.getLogger(__name__)
# this dict maps from python package name to a list of modules we expect it to
# provide.
#
# the key is a "requirement specifier", as used as a parameter to `pip
# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
#
# the value is a sequence of strings; each entry should be the name of the
# python module, optionally followed by a version assertion which can be either
# ">=<ver>" or "==<ver>".
#
# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"],
@ -26,7 +39,11 @@ REQUIREMENTS = {
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"Twisted>=16.0.0": ["twisted>=16.0.0"],
# we break under Twisted 18.4
# (https://github.com/matrix-org/synapse/issues/3135)
"Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
@ -39,6 +56,7 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {

View File

@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet):
self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while
self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id):
result = self.response_cache.get(event_id)
if not result:
result = self.response_cache.set(
event_id,
self._handle_request(request)
)
else:
logger.warn("Returning cached response")
return make_deferred_yieldable(result)
return self.response_cache.wrap(
event_id,
self._handle_request,
request
)
@preserve_fn
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):

View File

@ -44,7 +44,10 @@ class LogoutRestServlet(ClientV1RestServlet):
requester = yield self.auth.get_user_by_req(request)
except AuthError:
# this implies the access token has already been deleted.
pass
defer.returnValue((401, {
"errcode": "M_UNKNOWN_TOKEN",
"error": "Access Token unknown or expired"
}))
else:
if requester.device_id is None:
# the acccess token wasn't associated with a device.

View File

@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet):
admin = register_json.get("admin", None)
# Its important to check as we use null bytes as HMAC field separators
if "\x00" in user:
if b"\x00" in user:
raise SynapseError(400, "Invalid user")
if "\x00" in password:
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
# str() because otherwise hmac complains that 'unicode' does not

View File

@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
content=content,
)
else:
event, context = yield self.event_creation_hander.create_event(
event = yield self.event_creation_hander.create_and_send_nonmember_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id,
)
yield self.event_creation_hander.send_nonmember_event(
requester, event, context,
)
ret = {}
if event:
ret = {"event_id": event.event_id}

View File

@ -20,7 +20,6 @@ import synapse
import synapse.types
from synapse.api.auth import get_access_token_from_request, has_access_token
from synapse.api.constants import LoginType
from synapse.types import RoomID, RoomAlias
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
@ -405,14 +404,6 @@ class RegisterRestServlet(RestServlet):
generate_token=False,
)
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = synapse.types.create_requester(registered_user_id)
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
self.auth_handler.set_session_data(
@ -445,29 +436,6 @@ class RegisterRestServlet(RestServlet):
def on_OPTIONS(self, _):
return 200, {}
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield self.room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield self.room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
action="join",
)
@defer.inlineCallbacks
def _do_appservice_registration(self, username, as_token, body):
user_id = yield self.registration_handler.appservice_register(

View File

@ -16,6 +16,8 @@
from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
import six
from ._base import Responder
from synapse.util.file_consumer import BackgroundFileConsumer
@ -119,7 +121,7 @@ class MediaStorage(object):
os.remove(fname)
except Exception:
pass
raise t, v, tb
six.reraise(t, v, tb)
if not finished_called:
raise Exception("Finished callback not called")

View File

@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore,
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days
* Users who have created their accounts more than 30 days ago
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days
* Where account creation and last_seen are > 30 days apart
Returns counts globaly for a given user as well as breaking
by platform

View File

@ -376,7 +376,7 @@ class SQLBaseStore(object):
Returns:
A list of dicts where the key is the column header.
"""
col_headers = list(intern(column[0]) for column in cursor.description)
col_headers = list(intern(str(column[0])) for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor
)

View File

@ -18,6 +18,7 @@ from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
import importlib
import platform
SUPPORTED_MODULE = {
@ -31,6 +32,10 @@ def create_engine(database_config):
engine_class = SUPPORTED_MODULE.get(name, None)
if engine_class:
# pypy requires psycopg2cffi rather than psycopg2
if (name == "psycopg2" and
platform.python_implementation() == "PyPy"):
name = "psycopg2cffi"
module = importlib.import_module(name)
return engine_class(module, database_config)

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from twisted.internet import defer
@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
import logging
from Queue import PriorityQueue, Empty
from six.moves.queue import PriorityQueue, Empty
from six.moves import range
logger = logging.getLogger(__name__)
@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
front_list = list(front)
chunks = [
front_list[x:x + 100]
for x in xrange(0, len(front), 100)
for x in range(0, len(front), 100)
]
for chunk in chunks:
txn.execute(
@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
retcol="event_id",
)
@defer.inlineCallbacks
def get_prev_events_for_room(self, room_id):
"""
Gets a subset of the current forward extremities in the given room.
Limits the result to 10 extremities, so that we can avoid creating
events which refer to hundreds of prev_events.
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
if len(res) > 10:
# Sort by reverse depth, so we point to the most recent.
res.sort(key=lambda a: -a[2])
# we use half of the limit for the actual most recent events, and
# the other half to randomly point to some of the older events, to
# make sure that we don't completely ignore the older events.
res = res[0:5] + random.sample(res[5:], 5)
defer.returnValue(res)
def get_latest_event_ids_and_hashes_in_room(self, room_id):
"""
Gets the current forward extremities in the given room
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
return self.runInteraction(
"get_latest_event_ids_and_hashes_in_room",
self._get_latest_event_ids_and_hashes_in_room,
@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
room_id,
)
@defer.inlineCallbacks
def get_max_depth_of_events(self, event_ids):
sql = (
"SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
) % (",".join(["?"] * len(event_ids)),)
rows = yield self._execute(
"get_max_depth_of_events", None,
sql, *event_ids
)
if rows:
defer.returnValue(rows[0][0])
else:
defer.returnValue(1)
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,

View File

@ -16,6 +16,7 @@
from collections import OrderedDict, deque, namedtuple
from functools import wraps
import itertools
import logging
import simplejson as json
@ -444,6 +445,9 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
for event, context in chunk:
if context.app_service:
origin_type = "local"
@ -1317,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(set(r["event_id"] for r in rows))
def have_events(self, event_ids):
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
@ -1345,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
return res
return self.runInteraction(
"have_events", f,
)
return self.runInteraction("get_rejection_reasons", f)
@defer.inlineCallbacks
def count_daily_messages(self):

View File

@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore):
def get_received_ts(self, event_id):
"""Get received_ts (when it was persisted) for the event.
Raises an exception for unknown events.
Args:
event_id (str)
Returns:
Deferred[int|None]: Timestamp in milliseconds, or None for events
that were persisted before received_ts was implemented.
"""
return self._simple_select_one_onecol(
table="events",
keyvalues={
"event_id": event_id,
},
retcol="received_ts",
desc="get_received_ts",
)
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,

View File

@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
# Convert the IDs to MXC URIs
for media_id in local_mxcs:
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
SELECT stream_ordering, json FROM events
JOIN event_json USING (event_id)
JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
if hostname == self.hostname:
if hostname == self.hs.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))

View File

@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
sql = (
"SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events"
" JOIN event_json USING (event_id)"
" JOIN event_json USING (room_id, event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"

View File

@ -169,7 +169,7 @@ class DomainSpecificString(
except Exception:
return False
__str__ = to_string
__repr__ = to_string
class UserID(DomainSpecificString):

View File

@ -12,8 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class ResponseCache(object):
@ -24,20 +31,68 @@ class ResponseCache(object):
used rather than trying to compute a new response.
"""
def __init__(self, hs, timeout_ms=0):
def __init__(self, hs, name, timeout_ms=0):
self.pending_result_cache = {} # Requests that haven't finished yet.
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.
self._name = name
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
cache_name=name,
)
def size(self):
return len(self.pending_result_cache)
def get(self, key):
"""Look up the given key.
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if the request has completed, the actual
result. You will probably want to make_deferred_yieldable the result.
If there is no entry for the key, returns None. It is worth noting that
this means there is no way to distinguish a completed result of None
from an absent cache entry.
Args:
key (hashable):
Returns:
twisted.internet.defer.Deferred|None|E: None if there is no entry
for this key; otherwise either a deferred result or the result
itself.
"""
result = self.pending_result_cache.get(key)
if result is not None:
self._metrics.inc_hits()
return result.observe()
else:
self._metrics.inc_misses()
return None
def set(self, key, deferred):
"""Set the entry for the given key to the given deferred.
*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
logcontext.run_in_background).
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if *deferred* was already complete, the actual
result. You will probably want to make_deferred_yieldable the result.
Args:
key (hashable):
deferred (twisted.internet.defer.Deferred[T):
Returns:
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
result.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
@ -53,3 +108,52 @@ class ResponseCache(object):
result.addBoth(remove)
return result.observe()
def wrap(self, key, callback, *args, **kwargs):
"""Wrap together a *get* and *set* call, taking care of logcontexts
First looks up the key in the cache, and if it is present makes it
follow the synapse logcontext rules and returns it.
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
follow the synapse logcontext rules, and adds the result to the cache.
Example usage:
@defer.inlineCallbacks
def handle_request(request):
# etc
defer.returnValue(result)
result = yield response_cache.wrap(
key,
handle_request,
request,
)
Args:
key (hashable): key to get/set in the cache
callback (callable): function to call if the key is not found in
the cache
*args: positional parameters to pass to the callback, if it is used
**kwargs: named paramters to pass to the callback, if it is used
Returns:
twisted.internet.defer.Deferred: yieldable result
"""
result = self.get(key)
if not result:
logger.info("[%s]: no cached result for [%s], calculating new one",
self._name, key)
d = run_in_background(callback, *args, **kwargs)
result = self.set(key, d)
elif not isinstance(result, defer.Deferred) or result.called:
logger.info("[%s]: using completed cached result for [%s]",
self._name, key)
else:
logger.info("[%s]: using incomplete cached result for [%s]",
self._name, key)
return make_deferred_yieldable(result)

View File

@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import Queue
from six.moves import queue
class BackgroundFileConsumer(object):
@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
# Queue of slices of bytes to be written. When producer calls
# unregister a final None is sent.
self._bytes_queue = Queue.Queue()
self._bytes_queue = queue.Queue()
# Deferred that is resolved when finished writing
self._finished_deferred = None

View File

@ -92,6 +92,7 @@ class LoggingContext(object):
def __nonzero__(self):
return False
__bool__ = __nonzero__ # python3
sentinel = Sentinel()

View File

@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase):
def setUp(self):
self.dir = tempfile.mkdtemp()
print self.dir
print(self.dir)
self.file = os.path.join(self.dir, "homeserver.yaml")
def tearDown(self):

View File

@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1)],
)
yield async.sleep(01)
yield async.sleep(1)
self.http_client.post_json.assert_not_called()
res_deferreds_2[0].addBoth(self.check_context, None)

View File

@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store)
self.mock_store.get_received_ts.return_value = 0
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
hs.get_clock.return_value = MockClock()

View File

@ -123,6 +123,7 @@ class EventStreamPermissionsTestCase(RestTestCase):
self.ratelimiter.send_message.return_value = (True, 0)
hs.config.enable_registration_captcha = False
hs.config.enable_registration = True
hs.config.auto_join_rooms = []
hs.get_handlers().federation_handler = Mock()

View File

@ -480,9 +480,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
ApplicationServiceStore(None, hs)
e = cm.exception
self.assertIn(f1, e.message)
self.assertIn(f2, e.message)
self.assertIn("id", e.message)
self.assertIn(f1, str(e))
self.assertIn(f2, str(e))
self.assertIn("id", str(e))
@defer.inlineCallbacks
def test_duplicate_as_tokens(self):
@ -504,6 +504,6 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
ApplicationServiceStore(None, hs)
e = cm.exception
self.assertIn(f1, e.message)
self.assertIn(f2, e.message)
self.assertIn("as_token", e.message)
self.assertIn(f1, str(e))
self.assertIn(f2, str(e))
self.assertIn("as_token", str(e))

View File

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import tests.unittest
import tests.utils
class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield tests.utils.setup_test_homeserver()
self.store = hs.get_datastore()
@defer.inlineCallbacks
def test_get_prev_events_for_room(self):
room_id = '@ROOM:local'
# add a bunch of events and hashes to act as forward extremities
def insert_event(txn, i):
event_id = '$event_%i:local' % i
txn.execute((
"INSERT INTO events ("
" room_id, event_id, type, depth, topological_ordering,"
" content, processed, outlier) "
"VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
), (room_id, event_id, i, i, True, False))
txn.execute((
'INSERT INTO event_forward_extremities (room_id, event_id) '
'VALUES (?, ?)'
), (room_id, event_id))
txn.execute((
'INSERT INTO event_reference_hashes '
'(event_id, algorithm, hash) '
"VALUES (?, 'sha256', ?)"
), (event_id, 'ffff'))
for i in range(0, 11):
yield self.store.runInteraction("insert", insert_event, i)
# this should get the last five and five others
r = yield self.store.get_prev_events_for_room(room_id)
self.assertEqual(10, len(r))
for i in range(0, 5):
el = r[i]
depth = el[2]
self.assertEqual(10 - i, depth)
for i in range(5, 5):
el = r[i]
depth = el[2]
self.assertLessEqual(5, depth)

View File

@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase):
def test_signal_catch(self):
self.dist.declare("alarm")
observers = [Mock() for i in 1, 2]
observers = [Mock() for i in (1, 2)]
for o in observers:
self.dist.observe("alarm", o)

View File

@ -33,8 +33,6 @@ class DnsTestCase(unittest.TestCase):
service_name = "test_service.example.com"
host_name = "example.com"
ip_address = "127.0.0.1"
ip6_address = "::1"
answer_srv = dns.RRHeader(
type=dns.SRV,
@ -43,29 +41,9 @@ class DnsTestCase(unittest.TestCase):
)
)
answer_a = dns.RRHeader(
type=dns.A,
payload=dns.Record_A(
address=ip_address,
)
)
answer_aaaa = dns.RRHeader(
type=dns.AAAA,
payload=dns.Record_AAAA(
address=ip6_address,
)
)
dns_client_mock.lookupService.return_value = defer.succeed(
([answer_srv], None, None),
)
dns_client_mock.lookupAddress.return_value = defer.succeed(
([answer_a], None, None),
)
dns_client_mock.lookupIPV6Address.return_value = defer.succeed(
([answer_aaaa], None, None),
)
cache = {}
@ -74,13 +52,10 @@ class DnsTestCase(unittest.TestCase):
)
dns_client_mock.lookupService.assert_called_once_with(service_name)
dns_client_mock.lookupAddress.assert_called_once_with(host_name)
dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name)
self.assertEquals(len(servers), 2)
self.assertEquals(len(servers), 1)
self.assertEquals(servers, cache[service_name])
self.assertEquals(servers[0].host, ip_address)
self.assertEquals(servers[1].host, ip6_address)
self.assertEquals(servers[0].host, host_name)
@defer.inlineCallbacks
def test_from_cache_expired_and_dns_fail(self):

View File

@ -20,7 +20,7 @@ from mock import NonCallableMock
from synapse.util.file_consumer import BackgroundFileConsumer
from tests import unittest
from StringIO import StringIO
from six import StringIO
import threading

View File

@ -18,6 +18,7 @@ from tests import unittest
from twisted.internet import defer
from synapse.util.async import Linearizer
from six.moves import range
class LinearizerTestCase(unittest.TestCase):
@ -58,7 +59,7 @@ class LinearizerTestCase(unittest.TestCase):
logcontext.LoggingContext.current_context(), lc)
func(0, sleep=True)
for i in xrange(1, 100):
for i in range(1, 100):
func(i)
return func(1000)

View File

@ -59,6 +59,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
config.email_enable_notifs = False
config.block_non_admin_invites = False
config.federation_domain_whitelist = None
config.federation_rc_reject_limit = 10
config.federation_rc_sleep_limit = 10
config.federation_rc_concurrent = 10
config.filter_timeline_limit = 5000
config.user_directory_search_all_users = False
# disable user directory updates, because they get done in the
@ -212,7 +216,7 @@ class MockHttpResource(HttpServer):
headers = {}
if federation_auth:
headers["Authorization"] = ["X-Matrix origin=test,key=,sig="]
headers[b"Authorization"] = ["X-Matrix origin=test,key=,sig="]
mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)
# return the right path if the event requires it