From 5810cffd335f96ac448497e7caf46c5cbf29d6a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 10:36:19 +0100 Subject: [PATCH] Pass since/from parameters over federation --- synapse/federation/federation_client.py | 22 ++-------- synapse/federation/transport/client.py | 9 +++- synapse/federation/transport/server.py | 10 ++++- synapse/handlers/room_list.py | 55 ++++++++++++------------- synapse/http/servlet.py | 18 ++++++-- synapse/rest/client/v1/room.py | 6 +-- 6 files changed, 63 insertions(+), 57 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 91bed4746..f0a684fc1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,7 +24,6 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @@ -719,24 +718,11 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") - @defer.inlineCallbacks - def get_public_rooms(self, destinations): - results_by_server = {} + def get_public_rooms(self, destination, limit=None, since_token=None): + if destination == self.server_name: + return - @defer.inlineCallbacks - def _get_result(s): - if s == self.server_name: - defer.returnValue() - - try: - result = yield self.transport_layer.get_public_rooms(s) - results_by_server[s] = result - except: - logger.exception("Error getting room list from server %r", s) - - yield concurrently_execute(_get_result, destinations, 3) - - defer.returnValue(results_by_server) + return self.transport_layer.get_public_rooms(destination, limit, since_token) @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2b138526b..f508b70f1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -248,12 +248,19 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_public_rooms(self, remote_server): + def get_public_rooms(self, remote_server, limit, since_token): path = PREFIX + "/publicRooms" + args = {} + if limit: + args["limit"] = [str(limit)] + if since_token: + args["since"] = [since_token] + response = yield self.client.get_json( destination=remote_server, path=path, + args=args, ) defer.returnValue(response) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 37c0d4fbc..fec337be6 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import ( + parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, +) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -554,7 +556,11 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, origin, content, query): - data = yield self.room_list_handler.get_local_public_room_list() + limit = parse_integer_from_args(query, "limit", 0) + since_token = parse_string_from_args(query, "since", None) + data = yield self.room_list_handler.get_local_public_room_list( + limit, since_token + ) defer.returnValue((200, data)) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 94a5e7f51..6a62f3c27 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -20,7 +20,6 @@ from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.api.errors import SynapseError from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -40,21 +39,21 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) - def get_local_public_room_list(self, limit=None, next_batch=None): - result = self.response_cache.get((limit, next_batch)) + def get_local_public_room_list(self, limit=None, since_token=None): + result = self.response_cache.get((limit, since_token)) if not result: result = self.response_cache.set( - (limit, next_batch), - self._get_public_room_list(limit, next_batch) + (limit, since_token), + self._get_public_room_list(limit, since_token) ) return result @defer.inlineCallbacks - def _get_public_room_list(self, limit=None, next_batch=None): - if next_batch and next_batch != "END": - next_batch = RoomListNextBatch.from_token(next_batch) + def _get_public_room_list(self, limit=None, since_token=None): + if since_token and since_token != "END": + since_token = RoomListNextBatch.from_token(since_token) else: - next_batch = None + since_token = None room_ids = yield self.store.get_public_room_ids() @@ -62,8 +61,8 @@ class RoomListHandler(BaseHandler): rooms_to_num_joined = {} rooms_to_latest_event_ids = {} - if next_batch: - current_stream_token = next_batch.stream_ordering + if since_token: + current_stream_token = since_token.stream_ordering else: current_stream_token = yield self.store.get_room_max_stream_ordering() @@ -99,22 +98,22 @@ class RoomListHandler(BaseHandler): sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) sorted_rooms = [room_id for room_id, _ in sorted_entries] - if next_batch: - if next_batch.direction_is_forward: - sorted_rooms = sorted_rooms[next_batch.current_limit:] + if since_token: + if since_token.direction_is_forward: + sorted_rooms = sorted_rooms[since_token.current_limit:] else: - sorted_rooms = sorted_rooms[:next_batch.current_limit] + sorted_rooms = sorted_rooms[:since_token.current_limit] sorted_rooms.reverse() new_limit = None if limit: if sorted_rooms[limit:]: new_limit = limit - if next_batch: - if next_batch.direction_is_forward: - new_limit += next_batch.current_limit + if since_token: + if since_token.direction_is_forward: + new_limit += since_token.current_limit else: - new_limit = next_batch.current_limit - new_limit + new_limit = since_token.current_limit - new_limit new_limit = max(0, new_limit) sorted_rooms = sorted_rooms[:limit] @@ -208,7 +207,7 @@ class RoomListHandler(BaseHandler): "chunk": chunk, } - if not next_batch or next_batch.direction_is_forward: + if not since_token or since_token.direction_is_forward: if new_limit: results["next_batch"] = RoomListNextBatch( stream_ordering=current_stream_token, @@ -216,8 +215,8 @@ class RoomListHandler(BaseHandler): direction_is_forward=True, ).to_token() - if next_batch: - results["prev_batch"] = next_batch.copy_and_replace( + if since_token: + results["prev_batch"] = since_token.copy_and_replace( direction_is_forward=False, ).to_token() else: @@ -228,22 +227,20 @@ class RoomListHandler(BaseHandler): direction_is_forward=False, ).to_token() - if next_batch: - results["next_batch"] = next_batch.copy_and_replace( + if since_token: + results["next_batch"] = since_token.copy_and_replace( direction_is_forward=True, ).to_token() defer.returnValue(results) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name, limit=None, next_batch=None): + def get_remote_public_room_list(self, server_name, limit=None, since_token=None): res = yield self.hs.get_replication_layer().get_public_rooms( - [server_name] + server_name, limit=limit, since_token=since_token, ) - if server_name not in res: - raise SynapseError(404, "Server not found") - defer.returnValue(res[server_name]) + defer.returnValue(res) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index e41afeab8..934638623 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -41,9 +41,13 @@ def parse_integer(request, name, default=None, required=False): SynapseError: if the parameter is absent and required, or if the parameter is present and not an integer. """ - if name in request.args: + return parse_integer_from_args(request.args, name, default, required) + + +def parse_integer_from_args(args, name, default=None, required=False): + if name in args: try: - return int(request.args[name][0]) + return int(args[name][0]) except: message = "Query parameter %r must be an integer" % (name,) raise SynapseError(400, message) @@ -116,9 +120,15 @@ def parse_string(request, name, default=None, required=False, parameter is present, must be one of a list of allowed values and is not one of those allowed values. """ + return parse_string_from_args( + request.args, name, default, required, allowed_values, param_type, + ) - if name in request.args: - value = request.args[name][0] + +def parse_string_from_args(args, name, default=None, required=False, + allowed_values=None, param_type="string"): + if name in args: + value = args[name][0] if allowed_values is not None and value not in allowed_values: message = "Query parameter %r must be one of [%s]" % ( name, ", ".join(repr(v) for v in allowed_values) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 00b7738e0..db0cd4380 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -320,19 +320,19 @@ class PublicRoomListRestServlet(ClientV1RestServlet): pass limit = parse_integer(request, "limit", 0) - next_batch = parse_string(request, "since", None) + since_token = parse_string(request, "since", None) handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( server, limit=limit, - next_batch=next_batch, + since_token=since_token, ) else: data = yield handler.get_local_public_room_list( limit=limit, - next_batch=next_batch, + since_token=since_token, ) defer.returnValue((200, data))