From e016f4043b81ffdedf71c4459772f66757386e44 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 14:40:28 +0000 Subject: [PATCH] Use get_room_events_stream to get changes to the rooms if the number of changes is small --- synapse/handlers/sync.py | 56 +++++++++++++++++++++++++++++++-------- synapse/storage/stream.py | 7 +++++ 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9e1188da5..e93dfe005 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -216,23 +216,57 @@ class SyncHandler(BaseHandler): typing_by_room = {event["room_id"]: event for event in typing} logger.debug("Typing %r", typing_by_room) - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE, Membership.JOIN] - ) + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) # TODO (mjark): Does public mean "published"? published_rooms = yield self.store.get_rooms(is_public=True) published_room_ids = set(r["room_id"] for r in published_rooms) + room_events, _ = yield self.store.get_room_events_stream( + sync_config.user.to_string(), + from_key=since_token.room_key, + to_key=now_token.room_key, + room_id=None, + limit=sync_config.limit + 1, + ) + rooms = [] - for event in room_list: - room_sync = yield self.incremental_sync_with_gap_for_room( - event.room_id, sync_config, since_token, now_token, - published_room_ids, typing_by_room - ) - if room_sync: - rooms.append(room_sync) + if len(room_events) <= sync_config.limit: + # There is no gap in any of the rooms. Therefore we can just + # partition the new events by room and return them. + events_by_room_id = {} + for event in room_events: + events_by_room_id.setdefault(event.room_id, []).append(event) + + for room_id in room_ids: + recents = events_by_room_id.get(room_id, []) + state = [event for event in recents if event.is_state()] + if recents: + prev_batch = now_token.copy_and_replace( + "room_key", recents[0].internal_metadata.before + ) + else: + prev_batch = now_token + room_sync = RoomSyncResult( + room_id=room_id, + published=room_id in published_room_ids, + events=recents, + prev_batch=prev_batch, + state=state, + limited=False, + typing=typing_by_room.get(room_id, None) + ) + if room_sync is not None: + rooms.append(room_sync) + else: + for room_id in room_ids: + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id, sync_config, since_token, now_token, + published_room_ids, typing_by_room + ) + if room_sync: + rooms.append(room_sync) defer.returnValue(SyncResult( public_user_data=presence, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index db1816ea8..93ccfd8c1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -181,6 +181,13 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + for event, row in zip(ret, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) + if rows: key = "s%d" % max([r["stream_ordering"] for r in rows]) else: