Fix origin handling for pushed transactions

Use the actual origin for push transactions, rather than whatever the remote
server claimed.
This commit is contained in:
Richard van der Hoff 2018-09-04 01:23:18 +01:00
parent 804dd41e18
commit c127c8d042
5 changed files with 32 additions and 29 deletions

View File

@ -99,7 +99,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_incoming_transaction(self, transaction_data): def on_incoming_transaction(self, origin, transaction_data):
# keep this as early as possible to make the calculated origin ts as # keep this as early as possible to make the calculated origin ts as
# accurate as possible. # accurate as possible.
request_time = self._clock.time_msec() request_time = self._clock.time_msec()
@ -108,34 +108,33 @@ class FederationServer(FederationBase):
if not transaction.transaction_id: if not transaction.transaction_id:
raise Exception("Transaction missing transaction_id") raise Exception("Transaction missing transaction_id")
if not transaction.origin:
raise Exception("Transaction missing origin")
logger.debug("[%s] Got transaction", transaction.transaction_id) logger.debug("[%s] Got transaction", transaction.transaction_id)
# use a linearizer to ensure that we don't process the same transaction # use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel. # multiple times in parallel.
with (yield self._transaction_linearizer.queue( with (yield self._transaction_linearizer.queue(
(transaction.origin, transaction.transaction_id), (origin, transaction.transaction_id),
)): )):
result = yield self._handle_incoming_transaction( result = yield self._handle_incoming_transaction(
transaction, request_time, origin, transaction, request_time,
) )
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_incoming_transaction(self, transaction, request_time): def _handle_incoming_transaction(self, origin, transaction, request_time):
""" Process an incoming transaction and return the HTTP response """ Process an incoming transaction and return the HTTP response
Args: Args:
origin (unicode): the server making the request
transaction (Transaction): incoming transaction transaction (Transaction): incoming transaction
request_time (int): timestamp that the HTTP request arrived at request_time (int): timestamp that the HTTP request arrived at
Returns: Returns:
Deferred[(int, object)]: http response code and body Deferred[(int, object)]: http response code and body
""" """
response = yield self.transaction_actions.have_responded(transaction) response = yield self.transaction_actions.have_responded(origin, transaction)
if response: if response:
logger.debug( logger.debug(
@ -149,7 +148,7 @@ class FederationServer(FederationBase):
received_pdus_counter.inc(len(transaction.pdus)) received_pdus_counter.inc(len(transaction.pdus))
origin_host, _ = parse_server_name(transaction.origin) origin_host, _ = parse_server_name(origin)
pdus_by_room = {} pdus_by_room = {}
@ -190,7 +189,7 @@ class FederationServer(FederationBase):
event_id = pdu.event_id event_id = pdu.event_id
try: try:
yield self._handle_received_pdu( yield self._handle_received_pdu(
transaction.origin, pdu origin, pdu
) )
pdu_results[event_id] = {} pdu_results[event_id] = {}
except FederationError as e: except FederationError as e:
@ -212,7 +211,7 @@ class FederationServer(FederationBase):
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus): for edu in (Edu(**x) for x in transaction.edus):
yield self.received_edu( yield self.received_edu(
transaction.origin, origin,
edu.edu_type, edu.edu_type,
edu.content edu.content
) )
@ -224,6 +223,7 @@ class FederationServer(FederationBase):
logger.debug("Returning: %s", str(response)) logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response( yield self.transaction_actions.set_response(
origin,
transaction, transaction,
200, response 200, response
) )

View File

@ -36,7 +36,7 @@ class TransactionActions(object):
self.store = datastore self.store = datastore
@log_function @log_function
def have_responded(self, transaction): def have_responded(self, origin, transaction):
""" Have we already responded to a transaction with the same id and """ Have we already responded to a transaction with the same id and
origin? origin?
@ -50,11 +50,11 @@ class TransactionActions(object):
"transaction_id") "transaction_id")
return self.store.get_received_txn_response( return self.store.get_received_txn_response(
transaction.transaction_id, transaction.origin transaction.transaction_id, origin
) )
@log_function @log_function
def set_response(self, transaction, code, response): def set_response(self, origin, transaction, code, response):
""" Persist how we responded to a transaction. """ Persist how we responded to a transaction.
Returns: Returns:
@ -66,7 +66,7 @@ class TransactionActions(object):
return self.store.set_received_txn_response( return self.store.set_received_txn_response(
transaction.transaction_id, transaction.transaction_id,
transaction.origin, origin,
code, code,
response, response,
) )

View File

@ -353,7 +353,7 @@ class FederationSendServlet(BaseFederationServlet):
try: try:
code, response = yield self.handler.on_incoming_transaction( code, response = yield self.handler.on_incoming_transaction(
transaction_data origin, transaction_data,
) )
except Exception: except Exception:
logger.exception("on_incoming_transaction failed") logger.exception("on_incoming_transaction failed")

View File

@ -33,7 +33,7 @@ from ..utils import (
) )
def _expect_edu(destination, edu_type, content, origin="test"): def _expect_edu_transaction(edu_type, content, origin="test"):
return { return {
"origin": origin, "origin": origin,
"origin_server_ts": 1000000, "origin_server_ts": 1000000,
@ -42,8 +42,8 @@ def _expect_edu(destination, edu_type, content, origin="test"):
} }
def _make_edu_json(origin, edu_type, content): def _make_edu_transaction_json(edu_type, content):
return json.dumps(_expect_edu("test", edu_type, content, origin=origin)).encode( return json.dumps(_expect_edu_transaction(edu_type, content)).encode(
'utf8' 'utf8'
) )
@ -190,8 +190,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
call( call(
"farm", "farm",
path="/_matrix/federation/v1/send/1000000/", path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu( data=_expect_edu_transaction(
"farm",
"m.typing", "m.typing",
content={ content={
"room_id": self.room_id, "room_id": self.room_id,
@ -221,11 +220,10 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.assertEquals(self.event_source.get_current_key(), 0) self.assertEquals(self.event_source.get_current_key(), 0)
yield self.mock_federation_resource.trigger( (code, response) = yield self.mock_federation_resource.trigger(
"PUT", "PUT",
"/_matrix/federation/v1/send/1000000/", "/_matrix/federation/v1/send/1000000/",
_make_edu_json( _make_edu_transaction_json(
"farm",
"m.typing", "m.typing",
content={ content={
"room_id": self.room_id, "room_id": self.room_id,
@ -233,7 +231,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
"typing": True, "typing": True,
}, },
), ),
federation_auth=True, federation_auth_origin=b'farm',
) )
self.on_new_event.assert_has_calls( self.on_new_event.assert_has_calls(
@ -264,8 +262,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
call( call(
"farm", "farm",
path="/_matrix/federation/v1/send/1000000/", path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu( data=_expect_edu_transaction(
"farm",
"m.typing", "m.typing",
content={ content={
"room_id": self.room_id, "room_id": self.room_id,

View File

@ -306,7 +306,10 @@ class MockHttpResource(HttpServer):
@patch('twisted.web.http.Request') @patch('twisted.web.http.Request')
@defer.inlineCallbacks @defer.inlineCallbacks
def trigger(self, http_method, path, content, mock_request, federation_auth=False): def trigger(
self, http_method, path, content, mock_request,
federation_auth_origin=None,
):
""" Fire an HTTP event. """ Fire an HTTP event.
Args: Args:
@ -315,6 +318,7 @@ class MockHttpResource(HttpServer):
content : The HTTP body content : The HTTP body
mock_request : Mocked request to pass to the event so it can get mock_request : Mocked request to pass to the event so it can get
content. content.
federation_auth_origin (bytes|None): domain to authenticate as, for federation
Returns: Returns:
A tuple of (code, response) A tuple of (code, response)
Raises: Raises:
@ -335,8 +339,10 @@ class MockHttpResource(HttpServer):
mock_request.getClientIP.return_value = "-" mock_request.getClientIP.return_value = "-"
headers = {} headers = {}
if federation_auth: if federation_auth_origin is not None:
headers[b"Authorization"] = [b"X-Matrix origin=test,key=,sig="] headers[b"Authorization"] = [
b"X-Matrix origin=%s,key=,sig=" % (federation_auth_origin, )
]
mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers) mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)
# return the right path if the event requires it # return the right path if the event requires it