Merge remote-tracking branch 'origin/develop' into erikj/destination_retry_cache

This commit is contained in:
Richard van der Hoff 2018-09-28 10:51:09 +01:00
commit 9c8cec5dab
51 changed files with 696 additions and 426 deletions

View File

@ -1,5 +1,21 @@
version: 2
jobs:
dockerhubuploadrelease:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:$CIRCLE_TAG .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker push matrixdotorg/synapse:$CIRCLE_TAG
dockerhubuploadlatest:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:$CIRCLE_SHA1 .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker tag matrixdotorg/synapse:$CIRCLE_SHA1 matrixdotorg/synapse:latest
- run: docker push matrixdotorg/synapse:$CIRCLE_SHA1
- run: docker push matrixdotorg/synapse:latest
sytestpy2:
machine: true
steps:
@ -99,23 +115,45 @@ workflows:
version: 2
build:
jobs:
- sytestpy2
- sytestpy2postgres
- sytestpy3
- sytestpy3postgres
- sytestpy2:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy2postgres:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy3:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy3postgres:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy2merged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- sytestpy2postgresmerged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- sytestpy3merged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- sytestpy3postgresmerged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- dockerhubuploadrelease:
filters:
tags:
only: /^v[0-9].[0-9]+.[0-9]+(.[0-9]+)?/
branches:
ignore: /.*/
- dockerhubuploadlatest:
filters:
branches:
only: master

View File

@ -20,6 +20,9 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 2.7
env: TOX_ENV=py27-old
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
services:

1
changelog.d/3794.misc Normal file
View File

@ -0,0 +1 @@
Speed up calculation of typing updates for replication

1
changelog.d/3924.misc Normal file
View File

@ -0,0 +1 @@
Comments and interface cleanup for on_receive_pdu

1
changelog.d/3946.misc Normal file
View File

@ -0,0 +1 @@
Automate pushes to docker hub

1
changelog.d/3948.misc Normal file
View File

@ -0,0 +1 @@
Fix incompatibility with python3 on alpine

1
changelog.d/3952.misc Normal file
View File

@ -0,0 +1 @@
Run the test suite on the oldest supported versions of our dependencies in CI.

1
changelog.d/3957.misc Normal file
View File

@ -0,0 +1 @@
CircleCI now only runs merged jobs on PRs, and commit jobs on develop, master, and release branches.

1
changelog.d/3958.misc Normal file
View File

@ -0,0 +1 @@
Fix docstrings and add tests for state store methods

1
changelog.d/3959.feature Normal file
View File

@ -0,0 +1 @@
Include eventid in log lines when processing incoming federation transactions

1
changelog.d/3961.bugfix Normal file
View File

@ -0,0 +1 @@
Fix errors due to concurrent monthly_active_user upserts

1
changelog.d/3963.misc Normal file
View File

@ -0,0 +1 @@
fix docstring for FederationClient.get_state_for_room

1
changelog.d/3965.misc Normal file
View File

@ -0,0 +1 @@
Run notify_app_services as a bg process

1
changelog.d/3966.misc Normal file
View File

@ -0,0 +1 @@
Improve the logging when handling a federation transaction

1
changelog.d/3967.misc Normal file
View File

@ -0,0 +1 @@
Clarifications in FederationHandler

1
changelog.d/3970.bugfix Normal file
View File

@ -0,0 +1 @@
Replaced all occurences of e.message with str(e). Contributed by Schnuffle

View File

@ -21,4 +21,4 @@ try:
verifier.verify(macaroon, key)
print "Signature is correct"
except Exception as e:
print e.message
print str(e)

View File

@ -0,0 +1,9 @@
#!/bin/bash
set -e
# Fetch the current GitHub issue number, add one to it -- presto! The likely
# next PR number.
CURRENT_NUMBER=`curl -s "https://api.github.com/repos/matrix-org/synapse/issues?state=all&per_page=1" | jq -r ".[0].number"`
CURRENT_NUMBER=$((CURRENT_NUMBER+1))
echo $CURRENT_NUMBER

View File

@ -226,7 +226,7 @@ class Filtering(object):
jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
format_checker=FormatChecker())
except jsonschema.ValidationError as e:
raise SynapseError(400, e.message)
raise SynapseError(400, str(e))
class FilterCollection(object):

View File

@ -24,7 +24,7 @@ try:
python_dependencies.check_requirements()
except python_dependencies.MissingRequirementError as e:
message = "\n".join([
"Missing Requirement: %s" % (e.message,),
"Missing Requirement: %s" % (str(e),),
"To install run:",
" pip install --upgrade --force \"%s\"" % (e.dependency,),
"",

View File

@ -136,7 +136,7 @@ def start(config_options):
"Synapse appservice", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.appservice"

View File

@ -153,7 +153,7 @@ def start(config_options):
"Synapse client reader", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.client_reader"

View File

@ -169,7 +169,7 @@ def start(config_options):
"Synapse event creator", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.event_creator"

View File

@ -140,7 +140,7 @@ def start(config_options):
"Synapse federation reader", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_reader"

View File

@ -160,7 +160,7 @@ def start(config_options):
"Synapse federation sender", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_sender"

View File

@ -228,7 +228,7 @@ def start(config_options):
"Synapse frontend proxy", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.frontend_proxy"

View File

@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
quit_with_error(e.message)
quit_with_error(str(e))
# Gauges to expose monthly active user control metrics
@ -328,7 +328,7 @@ def setup(config_options):
config_options,
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if not config:

View File

@ -133,7 +133,7 @@ def start(config_options):
"Synapse media repository", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.media_repository"

View File

@ -191,7 +191,7 @@ def start(config_options):
"Synapse pusher", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.pusher"

View File

@ -410,7 +410,7 @@ def start(config_options):
"Synapse synchrotron", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron"

View File

@ -1,284 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import collections
import errno
import glob
import os
import os.path
import signal
import subprocess
import sys
import time
from six import iteritems
import yaml
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
GREEN = "\x1b[1;32m"
YELLOW = "\x1b[1;33m"
RED = "\x1b[1;31m"
NORMAL = "\x1b[m"
def pid_running(pid):
try:
os.kill(pid, 0)
return True
except OSError as err:
if err.errno == errno.EPERM:
return True
return False
def write(message, colour=NORMAL, stream=sys.stdout):
if colour == NORMAL:
stream.write(message + "\n")
else:
stream.write(colour + message + NORMAL + "\n")
def abort(message, colour=RED, stream=sys.stderr):
write(message, colour, stream)
sys.exit(1)
def start(configfile):
write("Starting ...")
args = SYNAPSE
args.extend(["--daemonize", "-c", configfile])
try:
subprocess.check_call(args)
write("started synapse.app.homeserver(%r)" %
(configfile,), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting (exit code: %d); see above for logs" % e.returncode,
colour=RED,
)
def start_worker(app, configfile, worker_configfile):
args = [
"python", "-B",
"-m", app,
"-c", configfile,
"-c", worker_configfile
]
try:
subprocess.check_call(args)
write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting %s(%r) (exit code: %d); see above for logs" % (
app, worker_configfile, e.returncode,
),
colour=RED,
)
def stop(pidfile, app):
if os.path.exists(pidfile):
pid = int(open(pidfile).read())
try:
os.kill(pid, signal.SIGTERM)
write("stopped %s" % (app,), colour=GREEN)
except OSError as err:
if err.errno == errno.ESRCH:
write("%s not running" % (app,), colour=YELLOW)
elif err.errno == errno.EPERM:
abort("Cannot stop %s: Operation not permitted" % (app,))
else:
abort("Cannot stop %s: Unknown error" % (app,))
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor"
])
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"action",
choices=["start", "stop", "restart"],
help="whether to start, stop or restart the synapse",
)
parser.add_argument(
"configfile",
nargs="?",
default="homeserver.yaml",
help="the homeserver config file, defaults to homeserver.yaml",
)
parser.add_argument(
"-w", "--worker",
metavar="WORKERCONFIG",
help="start or stop a single worker",
)
parser.add_argument(
"-a", "--all-processes",
metavar="WORKERCONFIGDIR",
help="start or stop all the workers in the given directory"
" and the main synapse process",
)
options = parser.parse_args()
if options.worker and options.all_processes:
write(
'Cannot use "--worker" with "--all-processes"',
stream=sys.stderr
)
sys.exit(1)
configfile = options.configfile
if not os.path.exists(configfile):
write(
"No config file found\n"
"To generate a config file, run '%s -c %s --generate-config"
" --server-name=<server name>'\n" % (
" ".join(SYNAPSE), options.configfile
),
stream=sys.stderr,
)
sys.exit(1)
with open(configfile) as stream:
config = yaml.load(stream)
pidfile = config["pid_file"]
cache_factor = config.get("synctl_cache_factor")
start_stop_synapse = True
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
worker_configfile = options.worker
if not os.path.exists(worker_configfile):
write(
"No worker config found at %r" % (worker_configfile,),
stream=sys.stderr,
)
sys.exit(1)
worker_configfiles.append(worker_configfile)
if options.all_processes:
# To start the main synapse with -a you need to add a worker file
# with worker_app == "synapse.app.homeserver"
start_stop_synapse = False
worker_configdir = options.all_processes
if not os.path.isdir(worker_configdir):
write(
"No worker config directory found at %r" % (worker_configdir,),
stream=sys.stderr,
)
sys.exit(1)
worker_configfiles.extend(sorted(glob.glob(
os.path.join(worker_configdir, "*.yaml")
)))
workers = []
for worker_configfile in worker_configfiles:
with open(worker_configfile) as stream:
worker_config = yaml.load(stream)
worker_app = worker_config["worker_app"]
if worker_app == "synapse.app.homeserver":
# We need to special case all of this to pick up options that may
# be set in the main config file or in this worker config file.
worker_pidfile = (
worker_config.get("pid_file")
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
# The master process doesn't support using worker_* config.
for key in worker_config:
if key == "worker_app": # But we allow worker_app
continue
assert not key.startswith("worker_"), \
"Main process cannot use worker_* config"
else:
worker_pidfile = worker_config["worker_pid_file"]
worker_daemonize = worker_config["worker_daemonize"]
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
))
action = options.action
if action == "stop" or action == "restart":
for worker in workers:
stop(worker.pidfile, worker.app)
if start_stop_synapse:
stop(pidfile, "synapse.app.homeserver")
# Wait for synapse to actually shutdown before starting it again
if action == "restart":
running_pids = []
if start_stop_synapse and os.path.exists(pidfile):
running_pids.append(int(open(pidfile).read()))
for worker in workers:
if os.path.exists(worker.pidfile):
running_pids.append(int(open(worker.pidfile).read()))
if len(running_pids) > 0:
write("Waiting for process to exit before restarting...")
for running_pid in running_pids:
while pid_running(running_pid):
time.sleep(0.2)
write("All processes exited; now restarting...")
if action == "start" or action == "restart":
if start_stop_synapse:
# Check if synapse is already running
if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
abort("synapse.app.homeserver already running")
start(configfile)
for worker in workers:
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
start_worker(worker.app, configfile, worker.configfile)
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
else:
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
if __name__ == "__main__":
main()

View File

@ -188,7 +188,7 @@ def start(config_options):
"Synapse user directory", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.user_dir"

View File

@ -25,7 +25,7 @@ if __name__ == "__main__":
try:
config = HomeServerConfig.load_config("", sys.argv[3:])
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
print (getattr(config, key))

View File

@ -209,8 +209,6 @@ class FederationClient(FederationBase):
Will attempt to get the PDU from each destination in the list until
one succeeds.
This will persist the PDU locally upon receipt.
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
@ -289,8 +287,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
"""Requests all of the `current` state PDUs for a given room from
a remote home server.
"""Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@ -298,9 +295,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
Deferred: Results in a list of PDUs.
Deferred[Tuple[List[EventBase], List[EventBase]]]:
A list of events in the state, and a list of events in the auth chain
for the given event.
"""
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.

View File

@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
with nested_logging_context(event_id):
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@ -618,7 +620,7 @@ class FederationServer(FederationBase):
)
yield self.handler.on_receive_pdu(
origin, pdu, get_missing=True, sent_to_us_directly=True,
origin, pdu, sent_to_us_directly=True,
)
def __str__(self):

View File

@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e):
if isinstance(e, CodeMessageException):
return {
"status": e.code, "message": e.message,
"status": e.code, "message": str(e),
}
if isinstance(e, NotRetryingDestination):

View File

@ -136,7 +136,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_receive_pdu(
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@ -145,7 +145,8 @@ class FederationHandler(BaseHandler):
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
get_missing (bool): True if we should fetch missing prev_events
sent_to_us_directly (bool): True if this event was pushed to us; False if
we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
@ -250,7 +251,7 @@ class FederationHandler(BaseHandler):
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
missing_prevs = prevs - seen
if get_missing and missing_prevs:
if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
@ -282,24 +283,46 @@ class FederationHandler(BaseHandler):
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
if sent_to_us_directly and prevs - seen:
# If they have sent it to us directly, and the server
# isn't telling us about the auth events that it's
# made a message referencing, we explode
logger.warn(
"[%s %s] Failed to fetch %d prev events: rejecting",
room_id, event_id, len(prevs - seen),
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
elif prevs - seen:
if prevs - seen:
# We've still not been able to get all of the prev_events for this event.
#
# In this case, we need to fall back to asking another server in the
# federation for the state at this event. That's ok provided we then
# resolve the state against other bits of the DAG before using it (which
# will ensure that you can't just take over a room by sending an event,
# withholding its prev_events, and declaring yourself to be an admin in
# the subsequent state request).
#
# Now, if we're pulling this event as a missing prev_event, then clearly
# this event is not going to become the only forward-extremity and we are
# guaranteed to resolve its state against our existing forward
# extremities, so that should be fine.
#
# On the other hand, if this event was pushed to us, it is possible for
# it to become the only forward-extremity in the room, and we would then
# trust its state to be the state for the whole room. This is very bad.
# Further, if the event was pushed to us, there is no excuse for us not to
# have all the prev_events. We therefore reject any such events.
#
# XXX this really feels like it could/should be merged with the above,
# but there is an interaction with min_depth that I'm not really
# following.
if sent_to_us_directly:
logger.warn(
"[%s %s] Failed to fetch %d prev events: rejecting",
room_id, event_id, len(prevs - seen),
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
@ -316,14 +339,26 @@ class FederationHandler(BaseHandler):
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,
with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
remote_state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,
)
)
)
auth_chains.update(got_auth_chain)
state_group = {(x.type, x.state_key): x.event_id for x in state}
state_groups.append(state_group)
# XXX hrm I'm not convinced that duplicate events will compare
# for equality, so I'm not sure this does what the author
# hoped.
auth_chains.update(got_auth_chain)
state_group = {
(x.type, x.state_key): x.event_id for x in remote_state
}
state_groups.append(state_group)
# Resolve any conflicting state
def fetch(ev_ids):
@ -460,20 +495,21 @@ class FederationHandler(BaseHandler):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
try:
yield self.on_receive_pdu(
origin,
ev,
get_missing=False
)
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
with logcontext.nested_logging_context(ev.event_id):
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
else:
raise
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
)
else:
raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
@ -549,6 +585,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
logger.info(
"[%s %s] persisting newly-received auth/state events %s",
room_id, event_id, [e["event"].event_id for e in event_infos]
)
yield self._handle_new_events(origin, event_infos)
try:
@ -1112,7 +1152,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p)
with logcontext.nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@ -1558,15 +1599,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
self._prep_event,
@defer.inlineCallbacks
def prep(ev_info):
event = ev_info["event"]
with logcontext.nested_logging_context(suffix=event.event_id):
res = yield self._prep_event(
origin,
ev_info["event"],
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
defer.returnValue(res)
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))

View File

@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
room_id, str(e.message)
room_id, str(e)
)

View File

@ -20,6 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@ -68,6 +69,11 @@ class TypingHandler(object):
# map room IDs to sets of users currently typing
self._room_typing = {}
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial,
)
self.clock.looping_call(
self._handle_timeouts,
5000,
@ -274,19 +280,29 @@ class TypingHandler(object):
self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial
self._typing_stream_change_cache.entity_has_changed(
member.room_id, self._latest_room_serial,
)
self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
def get_all_typing_updates(self, last_id, current_id):
# TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id:
return []
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id,
)
if changed_rooms is None:
changed_rooms = self._room_serials
rows = []
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
for room_id in changed_rooms:
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()

View File

@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@ -248,7 +249,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_in_background(self._notify_app_services, room_stream_id)
run_as_background_process(
"notify_app_services",
self._notify_app_services, room_stream_id,
)
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)

View File

@ -33,31 +33,32 @@ logger = logging.getLogger(__name__)
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"],
"frozendict>=1": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"service_identity>=16.0.0": ["service_identity>=16.0.0"],
"Twisted>=17.1.0": ["twisted>=17.1.0"],
"treq>=15.1": ["treq>=15.1"],
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
"daemonize": ["daemonize"],
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
"sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"pyyaml>=3.11": ["yaml"],
"pyasn1>=0.1.9": ["pyasn1"],
"pyasn1-modules>=0.0.7": ["pyasn1_modules"],
"daemonize>=2.3.1": ["daemonize"],
"bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
"pillow>=3.1.2": ["PIL"],
"pydenticon>=0.2": ["pydenticon"],
"sortedcontainers>=1.4.4": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
"six>=1.10": ["six"],
"prometheus_client>=0.0.18": ["prometheus_client"],
# we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"],

View File

@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
needed_event_count = len(needed_events)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
logger.info(
"Asking for %d/%d conflicted events",
len(needed_events),
needed_event_count,
)
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
)
new_needed_events = set(itervalues(auth_events))
new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))
logger.info(
"Asking for %d/%d auth events",
len(new_needed_events),
new_needed_event_count,
)
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)

View File

@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={
"timestamp": int(self._clock.time_msec()),
},
lock=False,
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))

View File

@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids):
def get_state_groups_ids(self, _room_id, event_ids):
"""Get the event IDs of all the state for the state groups for the given events
Args:
_room_id (str): id of the room for these events
event_ids (iterable[str]): ids of the events
Returns:
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if not event_ids:
defer.returnValue({})
@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
"""Get the state IDs for the given state group
"""Get the event IDs of all the state in the given state group
Args:
state_group (int)
@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
The return value is a dict mapping group names to lists of events.
Returns:
Deferred[dict[int, list[EventBase]]]:
dict of state_group_id -> list of state events.
"""
if not event_ids:
defer.returnValue({})
@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
member events (if True), or to exclude member events (if False)
Returns:
dictionary state_group -> (dict of (type, state_key) -> event id)
Returns:
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
results = {}
@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types:
types = frozenset(types)

View File

@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
def __init__(self, name=None, parent_context=None):
def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
if self.parent_context is not None:
self.parent_context.copy_to(self)
if request is not None:
# the request param overrides the request from the parent context
self.request = request
def __str__(self):
return "%s@%x" % (self.name, id(self))
@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
if self.parent_context is not None:
self.parent_context.copy_to(self)
return self
def __exit__(self, type, value, traceback):
@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
def nested_logging_context(suffix, parent_context=None):
"""Creates a new logging context as a child of another.
The nested logging context will have a 'request' made up of the parent context's
request, plus the given suffix.
CPU/db usage stats will be added to the parent context's on exit.
Normal usage looks like:
with nested_logging_context(suffix):
# ... do stuff
Args:
suffix (str): suffix to add to the parent context's 'request'.
parent_context (LoggingContext|None): parent context. Will use the current context
if None.
Returns:
LoggingContext: new logging context.
"""
if parent_context is None:
parent_context = LoggingContext.current_context()
return LoggingContext(
parent_context=parent_context,
request=parent_context.request + "-" + suffix,
)
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):

1
synctl
View File

@ -1 +0,0 @@
./synapse/app/synctl.py

294
synctl Executable file
View File

@ -0,0 +1,294 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import collections
import errno
import glob
import os
import os.path
import signal
import subprocess
import sys
import time
from six import iteritems
import yaml
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
GREEN = "\x1b[1;32m"
YELLOW = "\x1b[1;33m"
RED = "\x1b[1;31m"
NORMAL = "\x1b[m"
def pid_running(pid):
try:
os.kill(pid, 0)
return True
except OSError as err:
if err.errno == errno.EPERM:
return True
return False
def write(message, colour=NORMAL, stream=sys.stdout):
if colour == NORMAL:
stream.write(message + "\n")
else:
stream.write(colour + message + NORMAL + "\n")
def abort(message, colour=RED, stream=sys.stderr):
write(message, colour, stream)
sys.exit(1)
def start(configfile):
write("Starting ...")
args = SYNAPSE
args.extend(["--daemonize", "-c", configfile])
try:
subprocess.check_call(args)
write("started synapse.app.homeserver(%r)" %
(configfile,), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting (exit code: %d); see above for logs" % e.returncode,
colour=RED,
)
def start_worker(app, configfile, worker_configfile):
args = [
"python", "-B",
"-m", app,
"-c", configfile,
"-c", worker_configfile
]
try:
subprocess.check_call(args)
write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting %s(%r) (exit code: %d); see above for logs" % (
app, worker_configfile, e.returncode,
),
colour=RED,
)
def stop(pidfile, app):
if os.path.exists(pidfile):
pid = int(open(pidfile).read())
try:
os.kill(pid, signal.SIGTERM)
write("stopped %s" % (app,), colour=GREEN)
except OSError as err:
if err.errno == errno.ESRCH:
write("%s not running" % (app,), colour=YELLOW)
elif err.errno == errno.EPERM:
abort("Cannot stop %s: Operation not permitted" % (app,))
else:
abort("Cannot stop %s: Unknown error" % (app,))
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
])
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"action",
choices=["start", "stop", "restart"],
help="whether to start, stop or restart the synapse",
)
parser.add_argument(
"configfile",
nargs="?",
default="homeserver.yaml",
help="the homeserver config file, defaults to homeserver.yaml",
)
parser.add_argument(
"-w", "--worker",
metavar="WORKERCONFIG",
help="start or stop a single worker",
)
parser.add_argument(
"-a", "--all-processes",
metavar="WORKERCONFIGDIR",
help="start or stop all the workers in the given directory"
" and the main synapse process",
)
options = parser.parse_args()
if options.worker and options.all_processes:
write(
'Cannot use "--worker" with "--all-processes"',
stream=sys.stderr
)
sys.exit(1)
configfile = options.configfile
if not os.path.exists(configfile):
write(
"No config file found\n"
"To generate a config file, run '%s -c %s --generate-config"
" --server-name=<server name>'\n" % (
" ".join(SYNAPSE), options.configfile
),
stream=sys.stderr,
)
sys.exit(1)
with open(configfile) as stream:
config = yaml.load(stream)
pidfile = config["pid_file"]
cache_factor = config.get("synctl_cache_factor")
start_stop_synapse = True
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
worker_configfile = options.worker
if not os.path.exists(worker_configfile):
write(
"No worker config found at %r" % (worker_configfile,),
stream=sys.stderr,
)
sys.exit(1)
worker_configfiles.append(worker_configfile)
if options.all_processes:
# To start the main synapse with -a you need to add a worker file
# with worker_app == "synapse.app.homeserver"
start_stop_synapse = False
worker_configdir = options.all_processes
if not os.path.isdir(worker_configdir):
write(
"No worker config directory found at %r" % (worker_configdir,),
stream=sys.stderr,
)
sys.exit(1)
worker_configfiles.extend(sorted(glob.glob(
os.path.join(worker_configdir, "*.yaml")
)))
workers = []
for worker_configfile in worker_configfiles:
with open(worker_configfile) as stream:
worker_config = yaml.load(stream)
worker_app = worker_config["worker_app"]
if worker_app == "synapse.app.homeserver":
# We need to special case all of this to pick up options that may
# be set in the main config file or in this worker config file.
worker_pidfile = (
worker_config.get("pid_file")
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
worker_cache_factors = (
worker_config.get("synctl_cache_factors")
or cache_factors
)
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
# The master process doesn't support using worker_* config.
for key in worker_config:
if key == "worker_app": # But we allow worker_app
continue
assert not key.startswith("worker_"), \
"Main process cannot use worker_* config"
else:
worker_pidfile = worker_config["worker_pid_file"]
worker_daemonize = worker_config["worker_daemonize"]
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
worker_cache_factors,
))
action = options.action
if action == "stop" or action == "restart":
for worker in workers:
stop(worker.pidfile, worker.app)
if start_stop_synapse:
stop(pidfile, "synapse.app.homeserver")
# Wait for synapse to actually shutdown before starting it again
if action == "restart":
running_pids = []
if start_stop_synapse and os.path.exists(pidfile):
running_pids.append(int(open(pidfile).read()))
for worker in workers:
if os.path.exists(worker.pidfile):
running_pids.append(int(open(worker.pidfile).read()))
if len(running_pids) > 0:
write("Waiting for process to exit before restarting...")
for running_pid in running_pids:
while pid_running(running_pid):
time.sleep(0.2)
write("All processes exited; now restarting...")
if action == "start" or action == "restart":
if start_stop_synapse:
# Check if synapse is already running
if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
abort("synapse.app.homeserver already running")
start(configfile)
for worker in workers:
env = os.environ.copy()
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
for cache_name, factor in worker.cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
start_worker(worker.app, configfile, worker.configfile)
# Reset env back to the original
os.environ.clear()
os.environ.update(env)
if __name__ == "__main__":
main()

View File

@ -74,6 +74,45 @@ class StateStoreTestCase(tests.unittest.TestCase):
self.assertEqual(s1[t].event_id, s2[t].event_id)
self.assertEqual(len(s1), len(s2))
@defer.inlineCallbacks
def test_get_state_groups_ids(self):
e1 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Create, '', {}
)
e2 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
)
state_group_map = yield self.store.get_state_groups_ids(self.room, [e2.event_id])
self.assertEqual(len(state_group_map), 1)
state_map = list(state_group_map.values())[0]
self.assertDictEqual(
state_map,
{
(EventTypes.Create, ''): e1.event_id,
(EventTypes.Name, ''): e2.event_id,
},
)
@defer.inlineCallbacks
def test_get_state_groups(self):
e1 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Create, '', {}
)
e2 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
)
state_group_map = yield self.store.get_state_groups(
self.room, [e2.event_id])
self.assertEqual(len(state_group_map), 1)
state_list = list(state_group_map.values())[0]
self.assertEqual(
{ev.event_id for ev in state_list},
{e1.event_id, e2.event_id},
)
@defer.inlineCallbacks
def test_get_state_for_event(self):

View File

@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from synapse.util import Clock
from synapse.util.logcontext import LoggingContext
from tests import unittest
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase):
}
)
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
with LoggingContext(request="lying_event"):
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
# Step the reactor, so the database fetches come back
self.reactor.advance(1)
@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase):
}
)
d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
with LoggingContext(request="good_event"):
d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
bad_event = FrozenEvent(
{
@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase):
}
)
d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)
with LoggingContext(request="bad_event"):
d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)
extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id

View File

@ -121,7 +121,7 @@ class TestCase(unittest.TestCase):
try:
self.assertEquals(attrs[key], getattr(obj, key))
except AssertionError as e:
raise (type(e))(e.message + " for '.%s'" % key)
raise (type(e))(str(e) + " for '.%s'" % key)
def assert_dict(self, required, actual):
"""Does a partial assert of a dict.

View File

@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase):
self.assertEqual(r, "bum")
self._check_test_key("one")
def test_nested_logging_context(self):
with LoggingContext(request="foo"):
nested_context = logcontext.nested_logging_context(suffix="bar")
self.assertEqual(nested_context.request, "foo-bar")
# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on

20
tox.ini
View File

@ -64,6 +64,26 @@ setenv =
{[base]setenv}
SYNAPSE_POSTGRES = 1
# A test suite for the oldest supported versions of Python libraries, to catch
# any uses of APIs not available in them.
[testenv:py27-old]
skip_install=True
deps =
# Old automat version for Twisted
Automat == 0.3.0
mock
lxml
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
# Make all greater-thans equals so we test the oldest version of our direct
# dependencies, but make the pyopenssl 17.0, which can work against an
# OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
/bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs pip install'
# Install Synapse itself. This won't update any libraries.
pip install -e .
{envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
[testenv:py35]
usedevelop=true