Merge branch 'develop' into rav/deferred_timeout

This commit is contained in:
Richard van der Hoff 2018-04-27 12:54:43 +01:00
commit 3d1ae61399
11 changed files with 148 additions and 21 deletions

View File

@ -1,3 +1,44 @@
Changes in synapse v0.28.0-rc1 (2018-04-24)
===========================================
Minor performance improvement to federation sending and bug fixes.
(Note: This release does not include state resolutions discussed in matrix live)
Features:
* Add metrics for event processing lag (PR #3090)
* Add metrics for ResponseCache (PR #3092)
Changes:
* Synapse on PyPy (PR #2760) Thanks to @Valodim!
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
* Document the behaviour of ResponseCache (PR #3059)
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107
#3109, #3110) Thanks to @NotAFile!
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
* Send federation events concurrently (PR #3078)
* Limit concurrent event sends for a room (PR #3079)
* Improve R30 stat definition (PR #3086)
* Send events to ASes concurrently (PR #3088)
* Refactor ResponseCache usage (PR #3093)
* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
* Refactor store.have_events (PR #3117)
Bug Fixes:
* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
* fix federation_domain_whitelist (PR #3099)
* Avoid creating events with huge numbers of prev_events (PR #3113)
* Reject events which have lots of prev_events (PR #3118)
Changes in synapse v0.27.4 (2018-04-13) Changes in synapse v0.27.4 (2018-04-13)
====================================== ======================================

View File

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.27.4" __version__ = "0.28.0-rc1"

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd # Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -496,13 +497,33 @@ class FederationServer(FederationBase):
def _handle_received_pdu(self, origin, pdu): def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction. """ Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
(The error will then be logged and sent back to the sender (which
probably won't do anything with it), and other events in the
transaction will be processed as normal).
It is likely that we'll then receive other events which refer to
this rejected_event in their prev_events, etc. When that happens,
we'll attempt to fetch the rejected event again, which will presumably
fail, so those second-generation events will also get rejected.
Eventually, we get to the point where there are more than 10 events
between any new events and the original rejected event. Since we
only try to backfill 10 events deep on received pdu, we then accept the
new event, possibly introducing a discontinuity in the DAG, with new
forward extremities, so normal service is approximately returned,
until we try to backfill across the discontinuity.
Args: Args:
origin (str): server which sent the pdu origin (str): server which sent the pdu
pdu (FrozenEvent): received pdu pdu (FrozenEvent): received pdu
Returns (Deferred): completes with None Returns (Deferred): completes with None
Raises: FederationError if the signatures / hash do not match
""" Raises: FederationError if the signatures / hash do not match, or
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to # check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6 # workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id): if origin != get_domain_from_id(pdu.event_id):

View File

@ -15,8 +15,14 @@
# limitations under the License. # limitations under the License.
"""Contains handlers for federation events.""" """Contains handlers for federation events."""
import httplib
import itertools
import logging
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
from twisted.internet import defer
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
from ._base import BaseHandler from ._base import BaseHandler
@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room from synapse.util.distributor import user_joined_room
from twisted.internet import defer
import itertools
import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -115,6 +117,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id) logger.debug("Already seen pdu %s", pdu.event_id)
return return
# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
# could cause a huge state resolution or cascade of event fetches.
try:
self._sanity_check_event(pdu)
except SynapseError as err:
raise FederationError(
"ERROR",
err.code,
err.msg,
affected=pdu.event_id,
)
# If we are currently in the process of joining this room, then we # If we are currently in the process of joining this room, then we
# queue up events for later processing. # queue up events for later processing.
if pdu.room_id in self.room_queues: if pdu.room_id in self.room_queues:
@ -519,9 +534,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities): def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id` """ Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. This may return This will attempt to get more events from the remote. If the other side
be successfull and still return no events if the other side has no new has no new events to offer, this will return an empty list.
events to offer.
As the events are received, we check their signatures, and also do some
sanity-checking on them. If any of the backfilled events are invalid,
this method throws a SynapseError.
TODO: make this more useful to distinguish failures of the remote
server from invalid events (there is probably no point in trying to
re-fetch invalid events from every other HS in the room.)
""" """
if dest == self.server_name: if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.") raise SynapseError(400, "Can't backfill from self.")
@ -533,6 +555,16 @@ class FederationHandler(BaseHandler):
extremities=extremities, extremities=extremities,
) )
# ideally we'd sanity check the events here for excess prev_events etc,
# but it's hard to reject events at this point without completely
# breaking backfill in the same way that it is currently broken by
# events whose signature we cannot verify (#3121).
#
# So for now we accept the events anyway. #3124 tracks this.
#
# for ev in events:
# self._sanity_check_event(ev)
# Don't bother processing events we already have. # Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline( seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events) set(e.event_id for e in events)
@ -835,6 +867,38 @@ class FederationHandler(BaseHandler):
defer.returnValue(False) defer.returnValue(False)
def _sanity_check_event(self, ev):
"""
Do some early sanity checks of a received event
In particular, checks it doesn't have an excessive number of
prev_events or auth_events, which could cause a huge state resolution
or cascade of event fetches.
Args:
ev (synapse.events.EventBase): event to be checked
Returns: None
Raises:
SynapseError if the event does not pass muster
"""
if len(ev.prev_events) > 20:
logger.warn("Rejecting event %s which has %i prev_events",
ev.event_id, len(ev.prev_events))
raise SynapseError(
httplib.BAD_REQUEST,
"Too many prev_events",
)
if len(ev.auth_events) > 10:
logger.warn("Rejecting event %s which has %i auth_events",
ev.event_id, len(ev.auth_events))
raise SynapseError(
httplib.BAD_REQUEST,
"Too many auth_events",
)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_invite(self, target_host, event): def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing. """ Sends the invite to the remote server for signing.

View File

@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
# Convert the IDs to MXC URIs # Convert the IDs to MXC URIs
for media_id in local_mxcs: for media_id in local_mxcs:
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs: for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token: while next_token:
sql = """ sql = """
SELECT stream_ordering, json FROM events SELECT stream_ordering, json FROM events
JOIN event_json USING (event_id) JOIN event_json USING (room_id, event_id)
WHERE room_id = ? WHERE room_id = ?
AND stream_ordering < ? AND stream_ordering < ?
AND contains_url = ? AND outlier = ? AND contains_url = ? AND outlier = ?
@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
if matches: if matches:
hostname = matches.group(1) hostname = matches.group(1)
media_id = matches.group(2) media_id = matches.group(2)
if hostname == self.hostname: if hostname == self.hs.hostname:
local_media_mxcs.append(media_id) local_media_mxcs.append(media_id)
else: else:
remote_media_mxcs.append((hostname, media_id)) remote_media_mxcs.append((hostname, media_id))

View File

@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
sql = ( sql = (
"SELECT stream_ordering, event_id, room_id, type, json, " "SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events" " origin_server_ts FROM events"
" JOIN event_json USING (event_id)" " JOIN event_json USING (room_id, event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?" " WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)" " AND (%s)"
" ORDER BY stream_ordering DESC" " ORDER BY stream_ordering DESC"

View File

@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.dir = tempfile.mkdtemp() self.dir = tempfile.mkdtemp()
print self.dir print(self.dir)
self.file = os.path.join(self.dir, "homeserver.yaml") self.file = os.path.join(self.dir, "homeserver.yaml")
def tearDown(self): def tearDown(self):

View File

@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
res_deferreds_2 = kr.verify_json_objects_for_server( res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1)], [("server10", json1)],
) )
yield async.sleep(01) yield async.sleep(1)
self.http_client.post_json.assert_not_called() self.http_client.post_json.assert_not_called()
res_deferreds_2[0].addBoth(self.check_context, None) res_deferreds_2[0].addBoth(self.check_context, None)

View File

@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase):
def test_signal_catch(self): def test_signal_catch(self):
self.dist.declare("alarm") self.dist.declare("alarm")
observers = [Mock() for i in 1, 2] observers = [Mock() for i in (1, 2)]
for o in observers: for o in observers:
self.dist.observe("alarm", o) self.dist.observe("alarm", o)

View File

@ -20,7 +20,7 @@ from mock import NonCallableMock
from synapse.util.file_consumer import BackgroundFileConsumer from synapse.util.file_consumer import BackgroundFileConsumer
from tests import unittest from tests import unittest
from StringIO import StringIO from six import StringIO
import threading import threading

View File

@ -18,6 +18,7 @@ from tests import unittest
from twisted.internet import defer from twisted.internet import defer
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from six.moves import range
class LinearizerTestCase(unittest.TestCase): class LinearizerTestCase(unittest.TestCase):
@ -58,7 +59,7 @@ class LinearizerTestCase(unittest.TestCase):
logcontext.LoggingContext.current_context(), lc) logcontext.LoggingContext.current_context(), lc)
func(0, sleep=True) func(0, sleep=True)
for i in xrange(1, 100): for i in range(1, 100):
func(i) func(i)
return func(1000) return func(1000)