From 827de7cee98d0626f3ca24a04955df2ff1bdfc41 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 13 Aug 2014 16:55:53 +0100 Subject: [PATCH] Define the concept of a 'federation Query'; creating API for making and handling Queries on the Federation's increasingly-inaccurately-named ReplicationLayer --- synapse/federation/replication.py | 50 +++++++++++++++++++++++++++++ synapse/federation/transport.py | 25 +++++++++++++++ tests/federation/test_federation.py | 38 ++++++++++++++++++++++ 3 files changed, 113 insertions(+) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bea5335f8..01020566c 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -66,6 +66,7 @@ class ReplicationLayer(object): self.handler = None self.edu_handlers = {} + self.query_handlers = {} self._order = 0 @@ -84,6 +85,27 @@ class ReplicationLayer(object): self.edu_handlers[edu_type] = handler + def register_query_handler(self, query_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation Query of the given type. + + Args: + query_type (str): Category name of the query, which should match + the string used by make_query. + handler (callable): Invoked to handle incoming queries of this type + + handler is invoked as: + result = handler(args) + + where 'args' is a dict mapping strings to strings of the query + arguments. It should return a Deferred that will eventually yield an + object to encode as JSON. + """ + if query_type in self.query_handlers: + raise KeyError("Already have a Query handler for %s" % (query_type)) + + self.query_handlers[query_type] = handler + @defer.inlineCallbacks @log_function def send_pdu(self, pdu): @@ -137,6 +159,24 @@ class ReplicationLayer(object): # TODO, add errback, etc. self._transaction_queue.enqueue_edu(edu) + @log_function + def make_query(self, destination, query_type, args): + """Sends a federation Query to a remote homeserver of the given type + and arguments. + + Args: + destination (str): Domain name of the remote homeserver + query_type (str): Category of the query type; should match the + handler name used in register_query_handler(). + args (dict): Mapping of strings to strings containing the details + of the query request. + + Returns: + a Deferred which will eventually yield a JSON object from the + response + """ + return self.transport_layer.make_query(destination, query_type, args) + @defer.inlineCallbacks @log_function def paginate(self, dest, context, limit): @@ -340,6 +380,16 @@ class ReplicationLayer(object): (200, self._transaction_from_pdus(response).get_dict()) ) + @defer.inlineCallbacks + def on_query_request(self, query_type, args): + if query_type in self.query_handlers: + response = yield self.query_handlers[query_type](args) + defer.returnValue((200, response)) + else: + defer.returnValue((404, "No handler for Query type '%s'" + % (query_type) + )) + @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index ff3fc3441..69166036f 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -180,6 +180,19 @@ class TransportLayer(object): defer.returnValue((code, response)) + @defer.inlineCallbacks + @log_function + def make_query(self, destination, query_type, args): + path = PREFIX + "/query/%s" % query_type + + response = yield self.client.get_json( + destination=destination, + path=path, + args=args + ) + + defer.returnValue(response) + @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. @@ -251,6 +264,15 @@ class TransportLayer(object): lambda request, context: handler.on_context_pdus_request(context) ) + # This is when we receive a server-server Query + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/query/([^/]*)$"), + lambda request, query_type: handler.on_query_request( + query_type, {k: v[0] for k, v in request.args.items()} + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, request, transaction_id): @@ -456,3 +478,6 @@ class TransportRequestHandler(object): of what went wrong. """ pass + + def on_query_request(self): + """ Called on a GET /query/ request. """ diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index f493ee253..a3bcb5ede 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -62,6 +62,7 @@ class FederationTestCase(unittest.TestCase): def setUp(self): self.mock_http_server = MockHttpServer() self.mock_http_client = Mock(spec=[ + "get_json", "put_json", ]) self.mock_persistence = Mock(spec=[ @@ -253,3 +254,40 @@ class FederationTestCase(unittest.TestCase): recv_observer.assert_called_with( "remote", {"testing": "reply here"} ) + + @defer.inlineCallbacks + def test_send_query(self): + self.mock_http_client.get_json.return_value = defer.succeed( + {"your": "response"} + ) + + response = yield self.federation.make_query( + destination="remote", + query_type="a-question", + args={"one": "1", "two": "2"} + ) + + self.assertEquals({"your": "response"}, response) + + self.mock_http_client.get_json.assert_called_with( + destination="remote", + path="/matrix/federation/v1/query/a-question", + args={"one": "1", "two": "2"} + ) + + @defer.inlineCallbacks + def test_recv_query(self): + recv_handler = Mock() + recv_handler.return_value = defer.succeed({"another": "response"}) + + self.federation.register_query_handler("a-question", recv_handler) + + code, response = yield self.mock_http_server.trigger("GET", + "/matrix/federation/v1/query/a-question?three=3&four=4", None) + + self.assertEquals(200, code) + self.assertEquals({"another": "response"}, response) + + recv_handler.assert_called_with( + {"three": "3", "four": "4"} + )