From f15ba926ccfb36cad31a19fe22a4cb384850f4dd Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 11 Nov 2015 17:13:24 +0000 Subject: [PATCH 01/18] Allow guest access to room initialSync --- synapse/handlers/message.py | 55 ++++++++++++++++++++-------------- synapse/rest/client/v1/room.py | 3 +- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d31ff8d4..6720c6a72 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -456,7 +456,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None): + def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False): """Capture the a snapshot of a room. If user is currently a member of the room this will be what is currently in the room. If the user left the room this will be what was in the room when they left. @@ -473,15 +473,19 @@ class MessageHandler(BaseHandler): A JSON serialisable dict with the snapshot of the room. """ - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, + user_id, + is_guest + ) - if member_event.membership == Membership.JOIN: + if membership == Membership.JOIN: result = yield self._room_initial_sync_joined( - user_id, room_id, pagin_config, member_event + user_id, room_id, pagin_config, membership, is_guest ) - elif member_event.membership == Membership.LEAVE: + elif membership == Membership.LEAVE: result = yield self._room_initial_sync_parted( - user_id, room_id, pagin_config, member_event + user_id, room_id, pagin_config, membership, member_event_id, is_guest ) private_user_data = [] @@ -497,19 +501,19 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _room_initial_sync_parted(self, user_id, room_id, pagin_config, - member_event): + membership, member_event_id, is_guest): room_state = yield self.store.get_state_for_events( - [member_event.event_id], None + [member_event_id], None ) - room_state = room_state[member_event.event_id] + room_state = room_state[member_event_id] limit = pagin_config.limit if pagin_config else None if limit is None: limit = 10 stream_token = yield self.store.get_stream_token_for_event( - member_event.event_id + member_event_id ) messages, token = yield self.store.get_recent_events_for_room( @@ -519,7 +523,7 @@ class MessageHandler(BaseHandler): ) messages = yield self._filter_events_for_client( - user_id, messages + user_id, messages, is_guest=is_guest ) start_token = StreamToken(token[0], 0, 0, 0, 0) @@ -528,7 +532,7 @@ class MessageHandler(BaseHandler): time_now = self.clock.time_msec() defer.returnValue({ - "membership": member_event.membership, + "membership": membership, "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -542,7 +546,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _room_initial_sync_joined(self, user_id, room_id, pagin_config, - member_event): + membership, is_guest): current_state = yield self.state.get_current_state( room_id=room_id, ) @@ -574,12 +578,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = {} + if not is_guest: + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) @@ -599,7 +605,7 @@ class MessageHandler(BaseHandler): ).addErrback(unwrapFirstError) messages = yield self._filter_events_for_client( - user_id, messages + user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -607,8 +613,7 @@ class MessageHandler(BaseHandler): time_now = self.clock.time_msec() - defer.returnValue({ - "membership": member_event.membership, + ret = { "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -618,4 +623,8 @@ class MessageHandler(BaseHandler): "state": state, "presence": presence, "receipts": receipts, - }) + } + if not is_guest: + ret["membership"] = membership + + defer.returnValue(ret) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 03ac07392..df2dc37b2 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -372,12 +372,13 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): - user, _, _ = yield self.auth.get_user_by_req(request) + user, _, is_guest = yield self.auth.get_user_by_req(request, allow_guest=True) pagination_config = PaginationConfig.from_request(request) content = yield self.handlers.message_handler.room_initial_sync( room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, + is_guest=is_guest, ) defer.returnValue((200, content)) From 6a9c4cfd0be467e290be52b02e6229fd30367c6e Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 11:58:48 +0000 Subject: [PATCH 02/18] Fix race creating directories --- synapse/config/_base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index ceef309af..c18e0bdbb 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -14,6 +14,7 @@ # limitations under the License. import argparse +import errno import os import yaml import sys @@ -91,8 +92,11 @@ class Config(object): @classmethod def ensure_directory(cls, dir_path): dir_path = cls.abspath(dir_path) - if not os.path.exists(dir_path): + try: os.makedirs(dir_path) + except OSError, e: + if e.errno != errno.EEXIST: + raise if not os.path.isdir(dir_path): raise ConfigError( "%s is not a directory" % (dir_path,) From 50f1afbd5b8560012d7beb116fc2da0e8b0e6199 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 13:37:07 +0000 Subject: [PATCH 03/18] Consider joined guest users as joined users Otherwise they're inconveniently allowed to write events to the room but not to read them from the room. --- synapse/handlers/message.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d31ff8d4..92e118026 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -258,20 +258,29 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _check_in_room_or_world_readable(self, room_id, user_id, is_guest): - if is_guest: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if visibility.content["history_visibility"] == "world_readable": - defer.returnValue((Membership.JOIN, None)) - return - else: - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - else: + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. member_event = yield self.auth.check_user_was_in_room(room_id, user_id) defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + if not is_guest: + raise + + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if visibility.content["history_visibility"] == "world_readable": + defer.returnValue((Membership.JOIN, None)) + return + else: + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) @defer.inlineCallbacks def get_state_events(self, user_id, room_id, is_guest=False): From 0a93df5f9c4921690fe456ee18bd8d51a7a08744 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 13:44:39 +0000 Subject: [PATCH 04/18] Allow guests to set their display names Depends: https://github.com/matrix-org/synapse/pull/363 Tests in https://github.com/matrix-org/sytest/pull/66 --- synapse/rest/client/v1/profile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 6b379e4e5..3218e4702 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -37,7 +37,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): - auth_user, _, _ = yield self.auth.get_user_by_req(request) + auth_user, _, _ = yield self.auth.get_user_by_req(request, allow_guest=True) user = UserID.from_string(user_id) try: From 39de87869c9ad966f382e597986e860d03f3fef2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 14:06:31 +0000 Subject: [PATCH 05/18] Fix bug where assumed dict was namedtuple --- synapse/storage/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 4e0d7c977..ad099775e 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -59,7 +59,7 @@ class TransactionStore(SQLBaseStore): allow_none=True, ) - if result and result.response_code: + if result and result["response_code"]: return result["response_code"], result["response_json"] else: return None From 14a9d805b959b5d2b26fea0d57794cb3ceb28958 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 14:07:25 +0000 Subject: [PATCH 06/18] Use a (hopefully) more efficient SQL query for doing recency based room search --- synapse/storage/search.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 2e88c51ad..eac265b54 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -253,11 +253,13 @@ class SearchStore(BackgroundUpdateStore): ) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id," + "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" - " FROM event_search" - " NATURAL JOIN events" - " WHERE value MATCH ? AND room_id = ?" + " FROM (SELECT event_id, matchinfo(event_search) FROM event_search" + " WHERE value MATCH" + " )" + " CROSS JOIN events USING (event_id)" + " WHERE room_id = ?" ) else: # This should be unreachable. From fb7e260a20847ac2a253a0e7868ecb2284c9a766 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 15:02:00 +0000 Subject: [PATCH 07/18] Tweak guest access permissions * Allow world_readable rooms to be read by guests who have joined and left * Allow regular users to access world_readable rooms --- synapse/handlers/message.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 92e118026..5f56f9059 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -267,17 +267,18 @@ class MessageHandler(BaseHandler): member_event = yield self.auth.check_user_was_in_room(room_id, user_id) defer.returnValue((member_event.membership, member_event.event_id)) return - except AuthError: + except AuthError, auth_error: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return if not is_guest: - raise - - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if visibility.content["history_visibility"] == "world_readable": - defer.returnValue((Membership.JOIN, None)) - return - else: + raise auth_error raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) From 320408ef47eb373c2b8edad7ab3d08c3fa0cb459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:09:45 +0000 Subject: [PATCH 08/18] Fix SQL syntax --- synapse/storage/search.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index eac265b54..e1911e248 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -255,8 +255,9 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" - " FROM (SELECT event_id, matchinfo(event_search) FROM event_search" - " WHERE value MATCH" + " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" + " FROM event_search" + " WHERE value MATCH ?" " )" " CROSS JOIN events USING (event_id)" " WHERE room_id = ?" From 764e79d0514a0cce9dc456df0355108d40dbeda8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:19:56 +0000 Subject: [PATCH 09/18] Comment --- synapse/storage/search.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index e1911e248..0b00ddb9d 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -252,6 +252,8 @@ class SearchStore(BackgroundUpdateStore): " WHERE vector @@ query AND room_id = ?" ) elif isinstance(self.database_engine, Sqlite3Engine): + # We use CROSS JOIN here to ensure we use the right indexes. + # https://sqlite.org/optoverview.html#crossjoin sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" From 78f6010207d5e6908ba584121461af4b02714287 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 12 Nov 2015 13:10:25 +0000 Subject: [PATCH 10/18] Fix an issue with ignoring power_level changes on divergent graphs Changes to m.room.power_levels events are supposed to be handled at a high priority; however a typo meant that the relevant bit of code was never executed, so they were handled just like any other state change - which meant that a bad person could cause room state changes by forking the graph from a point in history when they were allowed to do so. --- synapse/state.py | 16 +++++--- tests/test_state.py | 93 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index bb225c39c..f893df337 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -307,19 +307,23 @@ class StateHandler(object): We resolve conflicts in the following order: 1. power levels - 2. memberships - 3. other events. + 2. join rules + 3. memberships + 4. other events. """ resolved_state = {} power_key = (EventTypes.PowerLevels, "") - if power_key in conflicted_state.items(): - power_levels = conflicted_state[power_key] - resolved_state[power_key] = self._resolve_auth_events(power_levels) + if power_key in conflicted_state: + events = conflicted_state[power_key] + logger.debug("Resolving conflicted power levels %r", events) + resolved_state[power_key] = self._resolve_auth_events( + events, auth_events) auth_events.update(resolved_state) for key, events in conflicted_state.items(): if key[0] == EventTypes.JoinRules: + logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = self._resolve_auth_events( events, auth_events @@ -329,6 +333,7 @@ class StateHandler(object): for key, events in conflicted_state.items(): if key[0] == EventTypes.Member: + logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = self._resolve_auth_events( events, auth_events @@ -338,6 +343,7 @@ class StateHandler(object): for key, events in conflicted_state.items(): if key not in resolved_state: + logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = self._resolve_normal_events( events, auth_events ) diff --git a/tests/test_state.py b/tests/test_state.py index 0274c4bc1..e4e995b75 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -317,6 +317,99 @@ class StateTestCase(unittest.TestCase): {e.event_id for e in context_store["E"].current_state.values()} ) + @defer.inlineCallbacks + def test_branch_have_perms_conflict(self): + userid1 = "@user_id:example.com" + userid2 = "@user_id2:example.com" + + nodes = { + "A1": DictObj( + type=EventTypes.Create, + state_key="", + content={"creator": userid1}, + depth=1, + ), + "A2": DictObj( + type=EventTypes.Member, + state_key=userid1, + content={"membership": Membership.JOIN}, + membership=Membership.JOIN, + ), + "A3": DictObj( + type=EventTypes.Member, + state_key=userid2, + content={"membership": Membership.JOIN}, + membership=Membership.JOIN, + ), + "A4": DictObj( + type=EventTypes.PowerLevels, + state_key="", + content={ + "events": {"m.room.name": 50}, + "users": {userid1: 100, + userid2: 60}, + }, + ), + "A5": DictObj( + type=EventTypes.Name, + state_key="", + ), + "B": DictObj( + type=EventTypes.PowerLevels, + state_key="", + content={ + "events": {"m.room.name": 50}, + "users": {userid2: 30}, + }, + ), + "C": DictObj( + type=EventTypes.Name, + state_key="", + sender=userid2, + ), + "D": DictObj( + type=EventTypes.Message, + ), + } + edges = { + "A2": ["A1"], + "A3": ["A2"], + "A4": ["A3"], + "A5": ["A4"], + "B": ["A5"], + "C": ["A5"], + "D": ["B", "C"] + } + self._add_depths(nodes, edges) + graph = Graph(nodes, edges) + + store = StateGroupStore() + self.store.get_state_groups.side_effect = store.get_state_groups + + context_store = {} + + for event in graph.walk(): + context = yield self.state.compute_event_context(event) + store.store_state_groups(event, context) + context_store[event.event_id] = context + + self.assertSetEqual( + {"A1", "A2", "A3", "A5", "B"}, + {e.event_id for e in context_store["D"].current_state.values()} + ) + + def _add_depths(self, nodes, edges): + def _get_depth(ev): + node = nodes[ev] + if 'depth' not in node: + prevs = edges[ev] + depth = max(_get_depth(prev) for prev in prevs) + 1 + node['depth'] = depth + return node['depth'] + + for n in nodes: + _get_depth(n) + @defer.inlineCallbacks def test_annotate_with_old_message(self): event = create_event(type="test_message", name="event") From 8fd8e72cec8df94403441664d8eecc25fa8d363f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:33:47 +0000 Subject: [PATCH 11/18] Expand comment --- synapse/storage/search.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 0b00ddb9d..dcc5ac65a 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -254,6 +254,12 @@ class SearchStore(BackgroundUpdateStore): elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin + # + # We want to use the full text search index on event_search to + # extract all possible matches first, then lookup those matches + # in the events table to get the topological ordering. We need + # to use the indexes in this order because sqlite refuses to + # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" From 3de46c7755ac5e061fc13fb916c13b841b65f2b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:36:43 +0000 Subject: [PATCH 12/18] Trailing whitespace --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index dcc5ac65a..380270b00 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -258,7 +258,7 @@ class SearchStore(BackgroundUpdateStore): # We want to use the full text search index on event_search to # extract all possible matches first, then lookup those matches # in the events table to get the topological ordering. We need - # to use the indexes in this order because sqlite refuses to + # to use the indexes in this order because sqlite refuses to # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," From c0b3554401b821002acfc3989e57d6ac59671aa9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 16:19:55 +0000 Subject: [PATCH 13/18] Fix missing profile data in federation joins There was a regression where we stopped including profile data in initial joins for rooms joined over federation. --- synapse/federation/federation_client.py | 5 ++++- synapse/handlers/federation.py | 11 +++++++---- synapse/handlers/room.py | 3 ++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c0c0b693b..d4f586fae 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -357,7 +357,8 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks - def make_membership_event(self, destinations, room_id, user_id, membership): + def make_membership_event(self, destinations, room_id, user_id, membership, + content={},): """ Creates an m.room.member event, with context, without participating in the room. @@ -398,6 +399,8 @@ class FederationClient(FederationBase): logger.debug("Got response to make_%s: %s", membership, pdu_dict) + pdu_dict["content"].update(content) + defer.returnValue( (destination, self.event_from_pdu_json(pdu_dict)) ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d1589334a..c1bce07e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -564,7 +564,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_hosts, room_id, joinee): + def do_invite_join(self, target_hosts, room_id, joinee, content): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -584,7 +584,8 @@ class FederationHandler(BaseHandler): target_hosts, room_id, joinee, - "join" + "join", + content, ) self.room_queues[room_id] = [] @@ -840,12 +841,14 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def _make_and_verify_event(self, target_hosts, room_id, user_id, membership): + def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, + content={},): origin, pdu = yield self.replication_layer.make_membership_event( target_hosts, room_id, user_id, - membership + membership, + content, ) logger.debug("Got response to make_%s: %s", membership, pdu) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0266926fc..3f0475258 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -504,7 +504,8 @@ class RoomMemberHandler(BaseHandler): yield handler.do_invite_join( room_hosts, room_id, - event.user_id + event.user_id, + event.content, ) else: logger.debug("Doing normal join") From 468a2ed4ecd06b208611d3b44cd588a184efdfec Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 12 Nov 2015 16:45:28 +0000 Subject: [PATCH 14/18] Return non-room events from guest /events calls --- synapse/notifier.py | 20 +++++++++++++++++--- tests/rest/client/v1/test_presence.py | 3 +++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 56c4c863b..e3b42e233 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,6 +14,8 @@ # limitations under the License. from twisted.internet import defer +from synapse.api.constants import EventTypes +from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor, ObservableDeferred @@ -346,9 +348,9 @@ class Notifier(object): room_ids = [] if is_guest: - # TODO(daniel): Deal with non-room events too - only_room_events = True if guest_room_id: + if not self._is_world_readable(guest_room_id): + raise AuthError(403, "Guest access not allowed") room_ids = [guest_room_id] else: rooms = yield self.store.get_rooms_for_user(user.to_string()) @@ -361,6 +363,7 @@ class Notifier(object): events = [] end_token = from_token + for name, source in self.event_sources.sources.items(): keyname = "%s_key" % name before_id = getattr(before_token, keyname) @@ -377,7 +380,7 @@ class Notifier(object): room_ids=room_ids, ) - if is_guest: + if name == "room": room_member_handler = self.hs.get_handlers().room_member_handler new_events = yield room_member_handler._filter_events_for_client( user.to_string(), @@ -403,6 +406,17 @@ class Notifier(object): defer.returnValue(result) + @defer.inlineCallbacks + def _is_world_readable(self, room_id): + state = yield self.hs.get_state_handler().get_current_state( + room_id, + EventTypes.RoomHistoryVisibility + ) + if state and "history_visibility" in state.content: + defer.returnValue(state.content["history_visibility"] == "world_readable") + else: + defer.returnValue(False) + @log_function def remove_expired_streams(self): time_now_ms = self.clock.time_msec() diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 7f29d73d9..8581796f7 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -321,6 +321,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): hs.handlers.room_member_handler.get_room_members = ( lambda r: self.room_members if r == "a-room" else [] ) + hs.handlers.room_member_handler._filter_events_for_client = ( + lambda user_id, events, **kwargs: events + ) self.mock_datastore = hs.get_datastore() self.mock_datastore.get_app_service_by_token = Mock(return_value=None) From 5dea4d37d160e5766aac6f1723a8b485c5b6c211 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 13 Nov 2015 10:31:15 +0000 Subject: [PATCH 15/18] Update some comments Add a couple of type annotations, docstrings, and other comments, in the interest of keeping track of what types I have. Merged from pull request #370. --- synapse/handlers/_base.py | 6 +++ synapse/handlers/sync.py | 34 ++++++++++++----- synapse/rest/client/v2_alpha/sync.py | 56 ++++++++++++++++++++++++++++ synapse/state.py | 16 ++++++-- 4 files changed, 98 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index f4ade1f59..6519f183d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -29,6 +29,12 @@ logger = logging.getLogger(__name__) class BaseHandler(object): + """ + Common base class for the event handlers. + + :type store: synapse.storage.events.StateStore + :type state_handler: synapse.state.StateHandler + """ def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 492c1c17d..ed93e5a2d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -47,9 +47,9 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", - "timeline", - "state", + "room_id", # str + "timeline", # TimelineBatch + "state", # list[FrozenEvent] "ephemeral", "private_user_data", ])): @@ -68,9 +68,9 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", - "timeline", - "state", + "room_id", # str + "timeline", # TimelineBatch + "state", # list[FrozenEvent] "private_user_data", ])): __slots__ = [] @@ -87,8 +87,8 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ - "room_id", - "invite", + "room_id", # str + "invite", # FrozenEvent: the invite event ])): __slots__ = [] @@ -507,6 +507,9 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None): + """ + :returns a Deferred TimelineBatch + """ limited = True recents = [] filtering_factor = 2 @@ -680,8 +683,13 @@ class SyncHandler(BaseHandler): def compute_state_delta(self, since_token, previous_state, current_state): """ Works out the differnce in state between the current state and the state the client got when it last performed a sync. - Returns: - A list of events. + + :param str since_token: the point we are comparing against + :param list[synapse.events.FrozenEvent] previous_state: the state to + compare to + :param list[synapse.events.FrozenEvent] current_state: the new state + + :returns: A list of events. """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -696,6 +704,12 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def check_joined_room(self, sync_config, room_id, state_delta): + """ + Check if the user has just joined the given room. If so, return the + full state for the room, instead of the delta since the last sync. + + :returns A deferred Tuple (state_delta, limited) + """ joined = False limited = False for event in state_delta: diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index d24507eff..997a61abb 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -165,6 +165,20 @@ class SyncRestServlet(RestServlet): return {"events": filter.filter_presence(formatted)} def encode_joined(self, rooms, filter, time_now, token_id): + """ + Encode the joined rooms in a sync result + + :param list[synapse.handlers.sync.JoinedSyncResult] rooms: list of sync + results for rooms this user is joined to + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + + :return: the joined rooms list, in our response format + :rtype: dict[str, dict[str, object]] + """ joined = {} for room in rooms: joined[room.room_id] = self.encode_room( @@ -174,6 +188,20 @@ class SyncRestServlet(RestServlet): return joined def encode_invited(self, rooms, filter, time_now, token_id): + """ + Encode the invited rooms in a sync result + + :param list[synapse.handlers.sync.InvitedSyncResult] rooms: list of + sync results for rooms this user is joined to + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + + :return: the invited rooms list, in our response format + :rtype: dict[str, dict[str, object]] + """ invited = {} for room in rooms: invite = serialize_event( @@ -189,6 +217,20 @@ class SyncRestServlet(RestServlet): return invited def encode_archived(self, rooms, filter, time_now, token_id): + """ + Encode the archived rooms in a sync result + + :param list[synapse.handlers.sync.ArchivedSyncResult] rooms: list of + sync results for rooms this user is joined to + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + + :return: the invited rooms list, in our response format + :rtype: dict[str, dict[str, object]] + """ joined = {} for room in rooms: joined[room.room_id] = self.encode_room( @@ -199,6 +241,20 @@ class SyncRestServlet(RestServlet): @staticmethod def encode_room(room, filter, time_now, token_id, joined=True): + """ + :param JoinedSyncResult|ArchivedSyncResult room: sync result for a + single room + :param FilterCollection filter: filters to apply to the results + :param int time_now: current time - used as a baseline for age + calculations + :param int token_id: ID of the user's auth token - used for namespacing + of transaction IDs + :param joined: True if the user is joined to this room - will mean + we handle ephemeral events + + :return: the room, encoded in our response format + :rtype: dict[str, object] + """ event_map = {} state_events = filter.filter_room_state(room.state) state_event_ids = [] diff --git a/synapse/state.py b/synapse/state.py index f893df337..8ea2cac5d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -71,7 +71,7 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - """ Returns the current state for the room as a list. This is done by + """ Retrieves the current state for the room. This is done by calling `get_latest_events_in_room` to get the leading edges of the event graph and then resolving any of the state conflicts. @@ -80,6 +80,8 @@ class StateHandler(object): If `event_type` is specified, then the method returns only the one event (or None) with that `event_type` and `state_key`. + + :returns map from (type, state_key) to event """ event_ids = yield self.store.get_latest_event_ids_in_room(room_id) @@ -177,9 +179,10 @@ class StateHandler(object): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. - Return format is a tuple: (`state_group`, `state_events`), where the - first is the name of a state group if one and only one is involved, - otherwise `None`. + :returns a Deferred tuple of (`state_group`, `state`, `prev_state`). + `state_group` is the name of a state group if one and only one is + involved. `state` is a map from (type, state_key) to event, and + `prev_state` is a list of event ids. """ logger.debug("resolve_state_groups event_ids %s", event_ids) @@ -255,6 +258,11 @@ class StateHandler(object): return self._resolve_events(state_sets) def _resolve_events(self, state_sets, event_type=None, state_key=""): + """ + :returns a tuple (new_state, prev_states). new_state is a map + from (type, state_key) to event. prev_states is a list of event_ids. + :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str]) + """ state = {} for st in state_sets: for e in st: From 5ab4b0afe8b5126213cab2be7c3700eb7dd49789 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 12 Nov 2015 16:34:42 +0000 Subject: [PATCH 16/18] Make handlers.sync return a state dictionary, instead of an event list. Basically this moves the process of flattening the existing dictionary into a list up to rest.client.*, instead of doing it in handlers.sync. This simplifies a bit of the code in handlers.sync, but it is also going to be somewhat beneficial in the next stage of my hacking on SPEC-254. Merged from PR #371 --- synapse/handlers/sync.py | 70 ++++++++++++++++------------ synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ed93e5a2d..8b154fa7e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -49,7 +49,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", # str "timeline", # TimelineBatch - "state", # list[FrozenEvent] + "state", # dict[(str, str), FrozenEvent] "ephemeral", "private_user_data", ])): @@ -70,7 +70,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", # str "timeline", # TimelineBatch - "state", # list[FrozenEvent] + "state", # dict[(str, str), FrozenEvent] "private_user_data", ])): __slots__ = [] @@ -257,12 +257,11 @@ class SyncHandler(BaseHandler): current_state = yield self.state_handler.get_current_state( room_id ) - current_state_events = current_state.values() defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, - state=current_state_events, + state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), private_user_data=self.private_user_data_for_room( room_id, tags_by_room @@ -361,7 +360,7 @@ class SyncHandler(BaseHandler): defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, - state=leave_state[leave_event_id].values(), + state=leave_state[leave_event_id], private_user_data=self.private_user_data_for_room( room_id, tags_by_room ), @@ -440,7 +439,10 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) - state = [event for event in recents if event.is_state()] + state = { + (event.type, event.state_key): event + for event in recents if event.is_state()} + if recents: prev_batch = now_token.copy_and_replace( "room_key", recents[0].internal_metadata.before @@ -575,7 +577,6 @@ class SyncHandler(BaseHandler): current_state = yield self.state_handler.get_current_state( room_id ) - current_state_events = current_state.values() state_at_previous_sync = yield self.get_state_at_previous_sync( room_id, since_token=since_token @@ -584,7 +585,7 @@ class SyncHandler(BaseHandler): state_events_delta = yield self.compute_state_delta( since_token=since_token, previous_state=state_at_previous_sync, - current_state=current_state_events, + current_state=current_state, ) state_events_delta, _ = yield self.check_joined_room( @@ -632,7 +633,7 @@ class SyncHandler(BaseHandler): [leave_event.event_id], None ) - state_events_at_leave = leave_state[leave_event.event_id].values() + state_events_at_leave = leave_state[leave_event.event_id] state_at_previous_sync = yield self.get_state_at_previous_sync( leave_event.room_id, since_token=since_token @@ -661,7 +662,7 @@ class SyncHandler(BaseHandler): def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. Returns: - A Deferred list of Events. + A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( room_id, end_token=since_token.room_key, limit=1, @@ -673,11 +674,12 @@ class SyncHandler(BaseHandler): last_event ) if last_event.is_state(): - state = [last_event] + last_context.current_state.values() + state = last_context.current_state.copy() + state[(last_event.type, last_event.state_key)] = last_event else: - state = last_context.current_state.values() + state = last_context.current_state else: - state = () + state = {} defer.returnValue(state) def compute_state_delta(self, since_token, previous_state, current_state): @@ -685,21 +687,23 @@ class SyncHandler(BaseHandler): state the client got when it last performed a sync. :param str since_token: the point we are comparing against - :param list[synapse.events.FrozenEvent] previous_state: the state to - compare to - :param list[synapse.events.FrozenEvent] current_state: the new state + :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the + state to compare to + :param dict[(str,str), synapse.events.FrozenEvent] current_state: the + new state - :returns: A list of events. + :returns A new event dictionary """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - previous_dict = {event.event_id: event for event in previous_state} - state_delta = [] - for event in current_state: - if event.event_id not in previous_dict: - state_delta.append(event) + + state_delta = {} + for key, event in current_state.iteritems(): + if (key not in previous_state or + previous_state[key].event_id != event.event_id): + state_delta[key] = event return state_delta @defer.inlineCallbacks @@ -708,21 +712,25 @@ class SyncHandler(BaseHandler): Check if the user has just joined the given room. If so, return the full state for the room, instead of the delta since the last sync. + :param sync_config: + :param room_id: + :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the + difference in state since the last sync + :returns A deferred Tuple (state_delta, limited) """ joined = False limited = False - for event in state_delta: - if ( - event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string() - ): - if event.content["membership"] == Membership.JOIN: - joined = True + + join_event = state_delta.get(( + EventTypes.Member, sync_config.user.to_string()), None) + if join_event is not None: + if join_event.content["membership"] == Membership.JOIN: + joined = True if joined: - res = yield self.state_handler.get_current_state(room_id) - state_delta = res.values() + state_delta = yield self.state_handler.get_current_state(room_id) + # the timeline is inherently limited if we've just joined limited = True defer.returnValue((state_delta, limited)) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 997a61abb..272a00bc8 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -256,7 +256,7 @@ class SyncRestServlet(RestServlet): :rtype: dict[str, object] """ event_map = {} - state_events = filter.filter_room_state(room.state) + state_events = filter.filter_room_state(room.state.values()) state_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. From fddedd51d9ca44f385d412e8c2e3e8d99325ba67 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Nov 2015 18:27:23 +0000 Subject: [PATCH 17/18] Fix a few race conditions in the state calculation Be a bit more careful about how we calculate the state to be returned by /sync. In a few places, it was possible for /sync to return slightly later state than that represented by the next_batch token and the timeline. In particular, the following cases were susceptible: * On a full state sync, for an active room * During a per-room incremental sync with a timeline gap * When the user has just joined a room. (Refactor check_joined_room to make it less magical) Also, use store.get_state_for_events() (and thus the existing stategroups) to calculate the state corresponding to a particular sync position, rather than state_handler.compute_event_context(), which recalculates from first principles (and tends to miss some state). Merged from PR https://github.com/matrix-org/synapse/pull/372 --- synapse/handlers/sync.py | 125 ++++++++++++++++++++------------------- synapse/storage/state.py | 14 +++++ 2 files changed, 78 insertions(+), 61 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8b154fa7e..6dc9d0fb9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -254,9 +254,7 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - current_state = yield self.state_handler.get_current_state( - room_id - ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( room_id=room_id, @@ -353,14 +351,12 @@ class SyncHandler(BaseHandler): room_id, sync_config, leave_token, since_token=timeline_since_token ) - leave_state = yield self.store.get_state_for_events( - [leave_event_id], None - ) + leave_state = yield self.store.get_state_for_event(leave_event_id) defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, - state=leave_state[leave_event_id], + state=leave_state, private_user_data=self.private_user_data_for_room( room_id, tags_by_room ), @@ -424,6 +420,9 @@ class SyncHandler(BaseHandler): if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. + logger.debug("Got %i events for incremental sync - not limited", + len(room_events)) + invite_events = [] leave_events = [] events_by_room_id = {} @@ -439,9 +438,11 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) + logger.debug("Events for room %s: %r", room_id, recents) state = { (event.type, event.state_key): event for event in recents if event.is_state()} + limited = False if recents: prev_batch = now_token.copy_and_replace( @@ -450,9 +451,13 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token - state, limited = yield self.check_joined_room( - sync_config, room_id, state - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + logger.debug("User has just joined %s: needs full state", + room_id) + state = yield self.get_state_at(room_id, now_token) + # the timeline is inherently limited if we've just joined + limited = True room_sync = JoinedSyncResult( room_id=room_id, @@ -467,10 +472,15 @@ class SyncHandler(BaseHandler): room_id, tags_by_room ), ) + logger.debug("Result for room %s: %r", room_id, room_sync) + if room_sync: joined.append(room_sync) else: + logger.debug("Got %i events for incremental sync - hit limit", + len(room_events)) + invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) @@ -563,6 +573,8 @@ class SyncHandler(BaseHandler): Returns: A Deferred JoinedSyncResult """ + logger.debug("Doing incremental sync for room %s between %s and %s", + room_id, since_token, now_token) # TODO(mjark): Check for redactions we might have missed. @@ -572,30 +584,26 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - current_state = yield self.state_handler.get_current_state( - room_id + current_state = yield self.get_state_at(room_id, now_token) + + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token ) - state_at_previous_sync = yield self.get_state_at_previous_sync( - room_id, since_token=since_token - ) - - state_events_delta = yield self.compute_state_delta( + state = yield self.compute_state_delta( since_token=since_token, previous_state=state_at_previous_sync, current_state=current_state, ) - state_events_delta, _ = yield self.check_joined_room( - sync_config, room_id, state_events_delta - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + state = yield self.get_state_at(room_id, now_token) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, - state=state_events_delta, + state=state, ephemeral=ephemeral_by_room.get(room_id, []), private_user_data=self.private_user_data_for_room( room_id, tags_by_room @@ -627,16 +635,12 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - leave_state = yield self.store.get_state_for_events( - [leave_event.event_id], None + state_events_at_leave = yield self.store.get_state_for_event( + leave_event.event_id ) - state_events_at_leave = leave_state[leave_event.event_id] - - state_at_previous_sync = yield self.get_state_at_previous_sync( - leave_event.room_id, since_token=since_token + state_at_previous_sync = yield self.get_state_at( + leave_event.room_id, stream_position=since_token ) state_events_delta = yield self.compute_state_delta( @@ -659,26 +663,36 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def get_state_at_previous_sync(self, room_id, since_token): - """ Get the room state at the previous sync the client made. - Returns: - A Deferred map from ((type, state_key)->Event) + def get_state_after_event(self, event): + """ + Get the room state after the given event + + :param synapse.events.EventBase event: event of interest + :return: A Deferred map from ((type, state_key)->Event) + """ + state = yield self.store.get_state_for_event(event.event_id) + if event.is_state(): + state = state.copy() + state[(event.type, event.state_key)] = event + defer.returnValue(state) + + @defer.inlineCallbacks + def get_state_at(self, room_id, stream_position): + """ Get the room state at a particular stream position + :param str room_id: room for which to get state + :param StreamToken stream_position: point at which to get state + :returns: A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( - room_id, end_token=since_token.room_key, limit=1, + room_id, end_token=stream_position.room_key, limit=1, ) if last_events: - last_event = last_events[0] - last_context = yield self.state_handler.compute_event_context( - last_event - ) - if last_event.is_state(): - state = last_context.current_state.copy() - state[(last_event.type, last_event.state_key)] = last_event - else: - state = last_context.current_state + last_event = last_events[-1] + state = yield self.get_state_after_event(last_event) + else: + # no events in this room - so presumably no state state = {} defer.returnValue(state) @@ -706,31 +720,20 @@ class SyncHandler(BaseHandler): state_delta[key] = event return state_delta - @defer.inlineCallbacks - def check_joined_room(self, sync_config, room_id, state_delta): + def check_joined_room(self, sync_config, state_delta): """ - Check if the user has just joined the given room. If so, return the - full state for the room, instead of the delta since the last sync. + Check if the user has just joined the given room (so should + be given the full state) :param sync_config: - :param room_id: :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the difference in state since the last sync :returns A deferred Tuple (state_delta, limited) """ - joined = False - limited = False - join_event = state_delta.get(( EventTypes.Member, sync_config.user.to_string()), None) if join_event is not None: if join_event.content["membership"] == Membership.JOIN: - joined = True - - if joined: - state_delta = yield self.state_handler.get_current_state(room_id) - # the timeline is inherently limited if we've just joined - limited = True - - defer.returnValue((state_delta, limited)) + return True + return False diff --git a/synapse/storage/state.py b/synapse/storage/state.py index acfb322a5..80e9b63f5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -237,6 +237,20 @@ class StateStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) + @defer.inlineCallbacks + def get_state_for_event(self, event_id, types=None): + """ + Get the state dict corresponding to a particular event + + :param str event_id: event whose state should be returned + :param list[(str, str)]|None types: List of (type, state_key) tuples + which are used to filter the state fetched. May be None, which + matches any key + :return: a deferred dict from (type, state_key) -> state_event + """ + state_map = yield self.get_state_for_events([event_id], types) + defer.returnValue(state_map[event_id]) + @cached(num_args=2, lru=True, max_entries=10000) def _get_state_group_for_event(self, room_id, event_id): return self._simple_select_one_onecol( From e4d622aaaf0df503f942d016a5bf798dd52899d1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Nov 2015 18:29:25 +0000 Subject: [PATCH 18/18] Implementation of state rollback in /sync Implementation of SPEC-254: roll back the state dictionary to how it looked at the start of the timeline. Merged PR https://github.com/matrix-org/synapse/pull/373 --- synapse/rest/client/v2_alpha/sync.py | 67 +++++++++++++++++++++++++++- synapse/storage/events.py | 6 ++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 272a00bc8..efd828155 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -20,6 +20,7 @@ from synapse.http.servlet import ( ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken +from synapse.events import FrozenEvent from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_event_id, ) @@ -256,7 +257,13 @@ class SyncRestServlet(RestServlet): :rtype: dict[str, object] """ event_map = {} - state_events = filter.filter_room_state(room.state.values()) + state_dict = room.state + timeline_events = filter.filter_room_timeline(room.timeline.events) + + state_dict = SyncRestServlet._rollback_state_for_timeline( + state_dict, timeline_events) + + state_events = filter.filter_room_state(state_dict.values()) state_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. @@ -266,7 +273,6 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - timeline_events = filter.filter_room_timeline(room.timeline.events) timeline_event_ids = [] for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. @@ -297,6 +303,63 @@ class SyncRestServlet(RestServlet): return result + @staticmethod + def _rollback_state_for_timeline(state, timeline): + """ + Wind the state dictionary backwards, so that it represents the + state at the start of the timeline, rather than at the end. + + :param dict[(str, str), synapse.events.EventBase] state: the + state dictionary. Will be updated to the state before the timeline. + :param list[synapse.events.EventBase] timeline: the event timeline + :return: updated state dictionary + """ + logger.debug("Processing state dict %r; timeline %r", state, + [e.get_dict() for e in timeline]) + + result = state.copy() + + for timeline_event in reversed(timeline): + if not timeline_event.is_state(): + continue + + event_key = (timeline_event.type, timeline_event.state_key) + + logger.debug("Considering %s for removal", event_key) + + state_event = result.get(event_key) + if (state_event is None or + state_event.event_id != timeline_event.event_id): + # the event in the timeline isn't present in the state + # dictionary. + # + # the most likely cause for this is that there was a fork in + # the event graph, and the state is no longer valid. Really, + # the event shouldn't be in the timeline. We're going to ignore + # it for now, however. + logger.warn("Found state event %r in timeline which doesn't " + "match state dictionary", timeline_event) + continue + + prev_event_id = timeline_event.unsigned.get("replaces_state", None) + logger.debug("Replacing %s with %s in state dict", + timeline_event.event_id, prev_event_id) + + if prev_event_id is None: + del result[event_key] + else: + result[event_key] = FrozenEvent({ + "type": timeline_event.type, + "state_key": timeline_event.state_key, + "content": timeline_event.unsigned['prev_content'], + "sender": timeline_event.unsigned['prev_sender'], + "event_id": prev_event_id, + "room_id": timeline_event.room_id, + }) + logger.debug("New value: %r", result.get(event_key)) + + return result + def register_servlets(hs, http_server): SyncRestServlet(hs).register(http_server) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4a365ff63..5d35ca90b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -831,7 +831,8 @@ class EventsStore(SQLBaseStore): allow_none=True, ) if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] + ev.unsigned["prev_content"] = prev.content + ev.unsigned["prev_sender"] = prev.sender self._get_event_cache.prefill( (ev.event_id, check_redacted, get_prev_content), ev @@ -888,7 +889,8 @@ class EventsStore(SQLBaseStore): get_prev_content=False, ) if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] + ev.unsigned["prev_content"] = prev.content + ev.unsigned["prev_sender"] = prev.sender self._get_event_cache.prefill( (ev.event_id, check_redacted, get_prev_content), ev