From e1625d62a8313ff34662aa72ae4d0574e540cc2b Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 11:55:57 +0100 Subject: [PATCH 1/8] Add federation room list servlet --- synapse/config/server.py | 6 +++ synapse/federation/transport/server.py | 65 +++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/synapse/config/server.py b/synapse/config/server.py index 0b5f462e4..8d554d749 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -29,6 +29,7 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) self.public_baseurl = config.get("public_baseurl") + self.secondary_directory_servers = config.get("secondary_directory_servers", []) if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': @@ -156,6 +157,11 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # A list of other Home Servers to fetch the public room directory from + # and include in the public room directory of this home server + # secondary_directory_servers: + # - matrix.org + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5b6c7d11d..b82f72fd5 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,10 +134,11 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name): + def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter + self.room_list_handler = room_list_handler def _wrap(self, code): authenticator = self.authenticator @@ -491,6 +492,66 @@ class OpenIdUserInfo(BaseFederationServlet): def _wrap(self, code): return code +class PublicRoomList(BaseFederationServlet): + """ + Fetch the public room list for this server. + + This API returns information in the same format as /publicRooms on the + client API, but will only ever include local public rooms and hence is + intended for consumption by other home servers. + + GET /publicRooms HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "chunk": [ + { + "aliases": [ + "#test:localhost" + ], + "guest_can_join": false, + "name": "test room", + "num_joined_members": 3, + "room_id": "!whkydVegtvatLfXmPN:localhost", + "world_readable": false + } + ], + "end": "END", + "start": "START" + } + """ + + PATH = "/publicRooms" + + @defer.inlineCallbacks + def on_GET(self, request): + data = yield self.room_list_handler.get_public_room_list() + defer.returnValue((200, data)) + + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + SERVLET_CLASSES = ( FederationSendServlet, @@ -513,6 +574,7 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, OpenIdUserInfo, + PublicRoomList, ) @@ -523,4 +585,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter): authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, + room_list_handler=hs.get_room_list_handler(), ).register(resource) From 70ecb415f553e5de86833034ce184a8a905b7ed5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 12:00:54 +0100 Subject: [PATCH 2/8] Fix c+p fail --- synapse/federation/transport/server.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b82f72fd5..f23c02efd 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -530,23 +530,6 @@ class PublicRoomList(BaseFederationServlet): data = yield self.room_list_handler.get_public_room_list() defer.returnValue((200, data)) - token = parse_string(request, "access_token") - if token is None: - defer.returnValue((401, { - "errcode": "M_MISSING_TOKEN", "error": "Access Token required" - })) - return - - user_id = yield self.handler.on_openid_userinfo(token) - - if user_id is None: - defer.returnValue((401, { - "errcode": "M_UNKNOWN_TOKEN", - "error": "Access Token unknown or expired" - })) - - defer.returnValue((200, {"sub": user_id})) - # Avoid doing remote HS authorization checks which are done by default by # BaseFederationServlet. def _wrap(self, code): From d240796dedcfae1f6929c1501e7e335df417cfaf Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:20:07 +0100 Subject: [PATCH 3/8] Basic, un-cached support for secondary_directory_servers --- synapse/federation/federation_client.py | 21 ++++++++++++++++ synapse/federation/transport/client.py | 12 +++++++++ synapse/federation/transport/server.py | 2 +- synapse/handlers/room.py | 33 ++++++++++++++++++++++++- synapse/rest/client/v1/room.py | 3 ++- 5 files changed, 68 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 37ee469fa..ba8d71c05 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -550,6 +551,26 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") + @defer.inlineCallbacks + def get_public_rooms(self, destinations): + results_by_server = {} + + @defer.inlineCallbacks + def _get_result(s): + if s == self.server_name: + defer.returnValue() + + try: + result = yield self.transport_layer.get_public_rooms(s) + results_by_server[s] = result + except: + logger.exception("Error getting room list from server %r", s) + + + yield concurrently_execute(_get_result, destinations, 3) + + defer.returnValue(results_by_server) + @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): """ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd2841c4d..ebb698e27 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -224,6 +224,18 @@ class TransportLayerClient(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def get_public_rooms(self, remote_server): + path = PREFIX + "/publicRooms" + + response = yield self.client.get_json( + destination=remote_server, + path=path, + ) + + defer.returnValue(response) + @defer.inlineCallbacks @log_function def exchange_third_party_invite(self, destination, room_id, event_dict): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index f23c02efd..da9e7a326 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -527,7 +527,7 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, request): - data = yield self.room_list_handler.get_public_room_list() + data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) # Avoid doing remote HS authorization checks which are done by default by diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d63b3c51..b0aa9fb51 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,7 +345,7 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache() - def get_public_room_list(self): + def get_local_public_room_list(self): result = self.response_cache.get(()) if not result: result = self.response_cache.set((), self._get_public_room_list()) @@ -427,6 +427,37 @@ class RoomListHandler(BaseHandler): # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) + @defer.inlineCallbacks + def get_aggregated_public_room_list(self): + """ + Get the public room list from this server and the servers + specified in the secondary_directory_servers config option. + XXX: Pagination... + """ + federated_by_server = yield self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + public_rooms = yield self.get_local_public_room_list() + + # keep track of which room IDs we've seen so we can de-dup + room_ids = set() + + # tag all the ones in our list with our server name. + # Also add the them to the de-deping set + for room in public_rooms['chunk']: + room["server_name"] = self.hs.hostname + room_ids.add(room["room_id"]) + + # Now add the results from federation + for server_name, server_result in federated_by_server.items(): + for room in server_result["chunk"]: + if room["room_id"] not in room_ids: + room["server_name"] = server_name + public_rooms["chunk"].append(room) + room_ids.add(room["room_id"]) + + defer.returnValue(public_rooms) + class RoomContextHandler(BaseHandler): @defer.inlineCallbacks diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2d22bbdaa..db52a1fc3 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -280,7 +280,8 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): handler = self.hs.get_room_list_handler() - data = yield handler.get_public_room_list() + data = yield handler.get_aggregated_public_room_list() + defer.returnValue((200, data)) From 963e3ed2828f6d1e704678af971ceffff3076115 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:22:53 +0100 Subject: [PATCH 4/8] Apparently I am not permitted to have two blank lines here --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ba8d71c05..d835c1b03 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -566,7 +566,6 @@ class FederationClient(FederationBase): except: logger.exception("Error getting room list from server %r", s) - yield concurrently_execute(_get_result, destinations, 3) defer.returnValue(results_by_server) From 6ca4d3ae9add2cf2bab3cf074072d7a7bb0b6553 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:24:50 +0100 Subject: [PATCH 5/8] Add vector.im to default secondary_directory_servers and add comment explaining it's not a permanent solution --- synapse/config/server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/config/server.py b/synapse/config/server.py index 8d554d749..c2d8f8a52 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -159,8 +159,12 @@ class ServerConfig(Config): # A list of other Home Servers to fetch the public room directory from # and include in the public room directory of this home server + # This is a temporary stopgap solution to populate new server with a + # list of rooms until there exists a good solution of a decentralized + # room directory. # secondary_directory_servers: # - matrix.org + # - vector.im # List of ports that Synapse should listen on, their purpose and their # configuration. From 2a449fec4d03b7739269e832683905a18459e4c3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 18:27:23 +0100 Subject: [PATCH 6/8] Add cache to remote room lists Poll for updates from remote servers, waiting for the poll if there's no cache entry. --- synapse/handlers/room.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b0aa9fb51..77063b021 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -36,6 +36,8 @@ import string logger = logging.getLogger(__name__) +REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 + id_server_scheme = "https://" @@ -344,6 +346,12 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache() + self.remote_list_request_cache = ResponseCache() + self.remote_list_cache = {} + self.fetch_looping_call = hs.get_clock().looping_call( + self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL + ) + self.fetch_all_remote_lists() def get_local_public_room_list(self): result = self.response_cache.get(()) @@ -427,6 +435,14 @@ class RoomListHandler(BaseHandler): # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) + @defer.inlineCallbacks + def fetch_all_remote_lists(self): + deferred = self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + self.remote_list_request_cache.set((), deferred) + yield deferred + @defer.inlineCallbacks def get_aggregated_public_room_list(self): """ @@ -434,9 +450,19 @@ class RoomListHandler(BaseHandler): specified in the secondary_directory_servers config option. XXX: Pagination... """ - federated_by_server = yield self.hs.get_replication_layer().get_public_rooms( - self.hs.config.secondary_directory_servers - ) + # We return the results from out cache which is updated by a looping call, + # unless we're missing a cache entry, in which case wait for the result + # of the fetch if there's one in progress. If not, omit that server. + wait = False + for s in self.hs.config.secondary_directory_servers: + if s not in self.remote_list_cache: + logger.warn("No cached room list from %s: waiting for fetch", s) + wait = True + break + + if wait and self.remote_list_request_cache.get(()): + yield self.remote_list_request_cache.get(()) + public_rooms = yield self.get_local_public_room_list() # keep track of which room IDs we've seen so we can de-dup @@ -449,7 +475,7 @@ class RoomListHandler(BaseHandler): room_ids.add(room["room_id"]) # Now add the results from federation - for server_name, server_result in federated_by_server.items(): + for server_name, server_result in self.remote_list_cache.items(): for room in server_result["chunk"]: if room["room_id"] not in room_ids: room["server_name"] = server_name From 6ecb2ca4ec3fae8c6f2e837b4ec99cc6929de638 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 09:48:55 +0100 Subject: [PATCH 7/8] pep8 --- synapse/federation/transport/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index da9e7a326..a1a334955 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,7 +134,8 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): + def __init__(self, handler, authenticator, ratelimiter, server_name, + room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter @@ -492,6 +493,7 @@ class OpenIdUserInfo(BaseFederationServlet): def _wrap(self, code): return code + class PublicRoomList(BaseFederationServlet): """ Fetch the public room list for this server. From 195254cae80f4748c3fc0ac3b46000047c2e6cc0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 11:14:16 +0100 Subject: [PATCH 8/8] Inject fake room list handler in tests Otherwise it tries to start the remote public room list updating looping call which breaks. --- tests/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/utils.py b/tests/utils.py index 59d985b5f..006abedbc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): version_string="Synapse/tests", database_engine=create_engine(config.database_config), get_db_conn=db_pool.get_db_conn, + room_list_handler=object(), **kargs ) hs.setup() @@ -75,6 +76,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", database_engine=create_engine(config.database_config), + room_list_handler=object(), **kargs )