Merge pull request #3456 from matrix-org/hawkowl/federation-prevevent-checking

Check the state of prev_events a bit more thoroughly when coming over federation
This commit is contained in:
Erik Johnston 2018-06-29 13:55:02 +01:00 committed by GitHub
commit e3b4043800
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 319 additions and 30 deletions

1
changelog.d/3456.bugfix Normal file
View File

@ -0,0 +1 @@
Synapse is now stricter regarding accepting events which it cannot retrieve the prev_events for.

View File

@ -549,7 +549,9 @@ class FederationServer(FederationBase):
affected=pdu.event_id, affected=pdu.event_id,
) )
yield self.handler.on_receive_pdu(origin, pdu, get_missing=True) yield self.handler.on_receive_pdu(
origin, pdu, get_missing=True, sent_to_us_directly=True,
)
def __str__(self): def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name return "<ReplicationLayer(%s)>" % self.server_name

View File

@ -44,6 +44,7 @@ from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import ( from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures, compute_event_signature, add_hashes_and_signatures,
) )
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
from synapse.events.utils import prune_event from synapse.events.utils import prune_event
@ -89,7 +90,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_receive_pdu(self, origin, pdu, get_missing=True): def on_receive_pdu(
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or """ Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events via backfill of missing prev_events
@ -163,14 +166,11 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!", "Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin, pdu.event_id, pdu.room_id, origin,
) )
return defer.returnValue(None)
state = None state = None
auth_chain = [] auth_chain = []
fetch_state = False
# Get missing pdus if necessary. # Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier(): if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth. # We only backfill backwards to the min depth.
@ -225,26 +225,60 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5], list(prevs - seen)[:5],
) )
if prevs - seen: if sent_to_us_directly and prevs - seen:
logger.info( # If they have sent it to us directly, and the server
"Still missing %d events for room %r: %r...", # isn't telling us about the auth events that it's
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] # made a message referencing, we explode
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
) )
fetch_state = True elif prevs - seen:
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
auth_chains = set()
try:
# Get the state of the events we know about
ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
state_groups.append(ours)
if fetch_state: # Ask the remote server for the states we don't
# We need to get the state at this event, since we haven't # know about
# processed all the prev events. for p in prevs - seen:
logger.debug( state, got_auth_chain = (
"_handle_new_pdu getting state for %s", yield self.replication_layer.get_state_for_room(
pdu.room_id origin, pdu.room_id, p
) )
try: )
state, auth_chain = yield self.replication_layer.get_state_for_room( auth_chains.update(got_auth_chain)
origin, pdu.room_id, pdu.event_id, state_group = {(x.type, x.state_key): x.event_id for x in state}
) state_groups.append(state_group)
except Exception:
logger.exception("Failed to get state for event: %s", pdu.event_id) # Resolve any conflicting state
def fetch(ev_ids):
return self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False
)
state_map = yield resolve_events_with_factory(
state_groups, {pdu.event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
affected=pdu.event_id,
)
yield self._process_received_pdu( yield self._process_received_pdu(
origin, origin,
@ -322,11 +356,17 @@ class FederationHandler(BaseHandler):
for e in missing_events: for e in missing_events:
logger.info("Handling found event %s", e.event_id) logger.info("Handling found event %s", e.event_id)
yield self.on_receive_pdu( try:
origin, yield self.on_receive_pdu(
e, origin,
get_missing=False e,
) get_missing=False
)
except FederationError as e:
if e.code == 403:
logger.warn("Event %s failed history check.")
else:
raise
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks

243
tests/test_federation.py Normal file
View File

@ -0,0 +1,243 @@
from twisted.internet.defer import succeed, maybeDeferred
from synapse.util import Clock
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from tests import unittest
from tests.server import setup_test_homeserver, ThreadedMemoryReactorClock
from mock import Mock
class MessageAcceptTests(unittest.TestCase):
def setUp(self):
self.http_client = Mock()
self.reactor = ThreadedMemoryReactorClock()
self.hs_clock = Clock(self.reactor)
self.homeserver = setup_test_homeserver(
http_client=self.http_client, clock=self.hs_clock, reactor=self.reactor
)
user_id = UserID("us", "test")
our_user = Requester(user_id, None, False, None, None)
room_creator = self.homeserver.get_room_creation_handler()
room = room_creator.create_room(
our_user, room_creator.PRESETS_DICT["public_chat"], ratelimit=False
)
self.reactor.advance(0.1)
self.room_id = self.successResultOf(room)["room_id"]
# Figure out what the most recent event is
most_recent = self.successResultOf(
maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
)[0]
join_event = FrozenEvent(
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"state_key": "@baduser:test.serv",
"event_id": "$join:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.member",
"origin": "test.servx",
"content": {"membership": "join"},
"auth_events": [],
"prev_state": [(most_recent, {})],
"prev_events": [(most_recent, {})],
}
)
self.handler = self.homeserver.get_handlers().federation_handler
self.handler.do_auth = lambda *a, **b: succeed(True)
self.client = self.homeserver.get_federation_client()
self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed(
pdus
)
# Send the join, it should return None (which is not an error)
d = self.handler.on_receive_pdu(
"test.serv", join_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
# Make sure we actually joined the room
self.assertEqual(
self.successResultOf(
maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
)[0],
"$join:test.serv",
)
def test_cant_hide_direct_ancestors(self):
"""
If you send a message, you must be able to provide the direct
prev_events that said event references.
"""
def post_json(destination, path, data, headers=None, timeout=0):
# If it asks us for new missing events, give them NOTHING
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
return {"events": []}
self.http_client.post_json = post_json
# Figure out what the most recent event is
most_recent = self.successResultOf(
maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
)[0]
# Now lie about an event
lying_event = FrozenEvent(
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "one:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [("two:test.serv", {}), (most_recent, {})],
}
)
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
# Step the reactor, so the database fetches come back
self.reactor.advance(1)
# on_receive_pdu should throw an error
failure = self.failureResultOf(d)
self.assertEqual(
failure.value.args[0],
(
"ERROR 403: Your server isn't divulging details about prev_events "
"referenced in this event."
),
)
# Make sure the invalid event isn't there
extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
@unittest.DEBUG
def test_cant_hide_past_history(self):
"""
If you send a message, you must be able to provide the direct
prev_events that said event references.
"""
def post_json(destination, path, data, headers=None, timeout=0):
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
return {
"events": [
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "three:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [("four:test.serv", {})],
}
]
}
self.http_client.post_json = post_json
def get_json(destination, path, args, headers=None):
if path.startswith("/_matrix/federation/v1/state_ids/"):
d = self.successResultOf(
self.homeserver.datastore.get_state_ids_for_event("one:test.serv")
)
return succeed(
{
"pdu_ids": [
y
for x, y in d.items()
if x == ("m.room.member", "@us:test")
],
"auth_chain_ids": d.values(),
}
)
self.http_client.get_json = get_json
# Figure out what the most recent event is
most_recent = self.successResultOf(
maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
)[0]
# Make a good event
good_event = FrozenEvent(
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "one:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [(most_recent, {})],
}
)
d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
bad_event = FrozenEvent(
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "two:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [("one:test.serv", {}), ("three:test.serv", {})],
}
)
d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)
extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
self.assertEqual(self.successResultOf(extrem)[0], "two:test.serv")
state = self.homeserver.get_state_handler().get_current_state_ids(self.room_id)
self.reactor.advance(1)
self.assertIn(("m.room.member", "@us:test"), self.successResultOf(state).keys())

View File

@ -35,7 +35,10 @@ class ToTwistedHandler(logging.Handler):
def emit(self, record): def emit(self, record):
log_entry = self.format(record) log_entry = self.format(record)
log_level = record.levelname.lower().replace('warning', 'warn') log_level = record.levelname.lower().replace('warning', 'warn')
self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry) self.tx_log.emit(
twisted.logger.LogLevel.levelWithName(log_level),
log_entry.replace("{", r"(").replace("}", r")"),
)
handler = ToTwistedHandler() handler = ToTwistedHandler()