From f31014b18f618d81cb667c2b01146b246d32760c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 1 Oct 2015 17:53:07 +0100 Subject: [PATCH 01/22] Start updating the sync API to match the specification --- synapse/api/filtering.py | 5 +- synapse/handlers/sync.py | 64 +++++++++++------------- synapse/rest/client/v2_alpha/sync.py | 75 ++++++++-------------------- 3 files changed, 54 insertions(+), 90 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 4d570b74f..c066ce89e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -54,7 +54,7 @@ class Filtering(object): ] room_level_definitions = [ - "state", "events", "ephemeral" + "state", "timeline", "ephemeral" ] for key in top_level_definitions: @@ -135,6 +135,9 @@ class Filter(object): def __init__(self, filter_json): self.filter_json = filter_json + def timeline_limit(self): + return self.filter_json.get("room", {}).get("timeline", {}).get(limit, 10) + def filter_public_user_data(self, events): return self._filter_on_key(events, ["public_user_data"]) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9914ff6f9..2a0e04543 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,21 +28,26 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", - "limit", - "gap", - "sort", - "backfill", "filter", ]) +class TimelineBatch(collections.namedtuple("TimelineBatch", [ + "prev_batch", + "events", + "limited", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.events) class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "room_id", - "limited", - "published", - "events", + "timeline", "state", - "prev_batch", "ephemeral", ])): __slots__ = [] @@ -51,13 +56,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.events or self.state or self.ephemeral) + return bool(self.timeline or self.state or self.ephemeral) class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync - "private_user_data", # List of private events for the user. - "public_user_data", # List of public events for all users. + "presence", # List of presence events for the user. "rooms", # RoomSyncResult for each room. ])): __slots__ = [] @@ -133,12 +137,6 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ - if sync_config.sort == "timeline,desc": - # TODO(mjark): Handle going through events in reverse order?. - # What does "most recent events" mean when applying the limits mean - # in this case? - raise NotImplementedError() - now_token = yield self.event_sources.get_current_token() presence_stream = self.event_sources.sources["presence"] @@ -155,20 +153,15 @@ class SyncHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - # TODO (mjark): Does public mean "published"? - published_rooms = yield self.store.get_rooms(is_public=True) - published_room_ids = set(r["room_id"] for r in published_rooms) - rooms = [] for event in room_list: room_sync = yield self.initial_sync_for_room( - event.room_id, sync_config, now_token, published_room_ids + event.room_id, sync_config, now_token, ) rooms.append(room_sync) defer.returnValue(SyncResult( - public_user_data=presence, - private_user_data=[], + presence=presence, rooms=rooms, next_batch=now_token, )) @@ -192,7 +185,6 @@ class SyncHandler(BaseHandler): defer.returnValue(RoomSyncResult( room_id=room_id, - published=room_id in published_room_ids, events=recents, prev_batch=prev_batch_token, state=current_state_events, @@ -219,7 +211,6 @@ class SyncHandler(BaseHandler): presence, presence_key = yield presence_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.presence_key, - limit=sync_config.limit, ) now_token = now_token.copy_and_replace("presence_key", presence_key) @@ -227,7 +218,6 @@ class SyncHandler(BaseHandler): typing, typing_key = yield typing_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.typing_key, - limit=sync_config.limit, ) now_token = now_token.copy_and_replace("typing_key", typing_key) @@ -252,16 +242,18 @@ class SyncHandler(BaseHandler): published_rooms = yield self.store.get_rooms(is_public=True) published_room_ids = set(r["room_id"] for r in published_rooms) + timeline_limit = sync_config.filter.timeline_limit() + room_events, _ = yield self.store.get_room_events_stream( sync_config.user.to_string(), from_key=since_token.room_key, to_key=now_token.room_key, room_id=None, - limit=sync_config.limit + 1, + limit=timeline_limit + 1, ) rooms = [] - if len(room_events) <= sync_config.limit: + if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. events_by_room_id = {} @@ -365,8 +357,9 @@ class SyncHandler(BaseHandler): max_repeat = 3 # Only try a few times per room, otherwise room_key = now_token.room_key end_key = room_key + timeline_limit = sync_config.filter.timeline_limit() - while limited and len(recents) < sync_config.limit and max_repeat: + while limited and len(recents) < timeline_limit and max_repeat: events, keys = yield self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, @@ -393,7 +386,9 @@ class SyncHandler(BaseHandler): "room_key", room_key ) - defer.returnValue((recents, prev_batch_token, limited)) + defer.returnValue(TimelineBatch( + events=recents, prev_batch=prev_batch_token, limited=limited + )) @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, @@ -408,7 +403,7 @@ class SyncHandler(BaseHandler): # TODO(mjark): Check for redactions we might have missed. - recents, prev_batch_token, limited = yield self.load_filtered_recents( + batch = yield self.load_filtered_recents( room_id, sync_config, now_token, since_token, ) @@ -437,11 +432,8 @@ class SyncHandler(BaseHandler): room_sync = RoomSyncResult( room_id=room_id, - published=room_id in published_room_ids, - events=recents, - prev_batch=prev_batch_token, + timeline=batch, state=state_events_delta, - limited=limited, ephemeral=typing_by_room.get(room_id, []) ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cac28b47b..ea6600b1d 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -36,47 +36,35 @@ class SyncRestServlet(RestServlet): GET parameters:: timeout(int): How long to wait for new events in milliseconds. - limit(int): Maxiumum number of events per room to return. - gap(bool): Create gaps the message history if limit is exceeded to - ensure that the client has the most recent messages. Defaults to - "true". - sort(str,str): tuple of sort key (e.g. "timeline") and direction - (e.g. "asc", "desc"). Defaults to "timeline,asc". since(batch_token): Batch token when asking for incremental deltas. set_presence(str): What state the device presence should be set to. default is "online". - backfill(bool): Should the HS request message history from other - servers. This may take a long time making it unsuitable for clients - expecting a prompt response. Defaults to "true". filter(filter_id): A filter to apply to the events returned. - filter_*: Filter override parameters. Response JSON:: { - "next_batch": // batch token for the next /sync - "private_user_data": // private events for this user. - "public_user_data": // public events for all users including the - // public events for this user. - "rooms": [{ // List of rooms with updates. - "room_id": // Id of the room being updated - "limited": // Was the per-room event limit exceeded? - "published": // Is the room published by our HS? - "event_map": // Map of EventID -> event JSON. - "events": { // The recent events in the room if gap is "true" - // otherwise the next events in the room. - "batch": [] // list of EventIDs in the "event_map". - "prev_batch": // back token for getting previous events. - } - "state": [] // list of EventIDs updating the current state to - // be what it should be at the end of the batch. - "ephemeral": [] + "next_batch": // batch token for the next /sync + "presence": // presence data for the user. + "rooms": { + "roomlist": [{ // List of rooms with updates. + "room_id": // Id of the room being updated + "event_map": // Map of EventID -> event JSON. + "timeline": { // The recent events in the room if gap is "true" + "limited": // Was the per-room event limit exceeded? + // otherwise the next events in the room. + "batch": [] // list of EventIDs in the "event_map". + "prev_batch": // back token for getting previous events. + } + "state": [] // list of EventIDs updating the current state to + // be what it should be at the end of the batch. + "ephemeral": [] }] + } } """ PATTERN = client_v2_pattern("/sync$") - ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) - ALLOWED_PRESENCE = set(["online", "offline", "idle"]) + ALLOWED_PRESENCE = set(["online", "offline"]) def __init__(self, hs): super(SyncRestServlet, self).__init__() @@ -90,45 +78,29 @@ class SyncRestServlet(RestServlet): user, token_id = yield self.auth.get_user_by_req(request) timeout = parse_integer(request, "timeout", default=0) - limit = parse_integer(request, "limit", required=True) - gap = parse_boolean(request, "gap", default=True) - sort = parse_string( - request, "sort", default="timeline,asc", - allowed_values=self.ALLOWED_SORT - ) since = parse_string(request, "since") set_presence = parse_string( request, "set_presence", default="online", allowed_values=self.ALLOWED_PRESENCE ) - backfill = parse_boolean(request, "backfill", default=False) filter_id = parse_string(request, "filter", default=None) logger.info( - "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r," - " set_presence=%r, backfill=%r, filter_id=%r" % ( - user, timeout, limit, gap, sort, since, set_presence, - backfill, filter_id + "/sync: user=%r, timeout=%r, since=%r," + " set_presence=%r, filter_id=%r" % ( + user, timeout, since, set_presence, filter_id ) ) - # TODO(mjark): Load filter and apply overrides. try: filter = yield self.filtering.get_user_filter( user.localpart, filter_id ) except: filter = Filter({}) - # filter = filter.apply_overrides(http_request) - # if filter.matches(event): - # # stuff sync_config = SyncConfig( user=user, - gap=gap, - limit=limit, - sort=sort, - backfill=backfill, filter=filter, ) @@ -144,11 +116,8 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() response_content = { - "public_user_data": self.encode_user_data( - sync_result.public_user_data, filter, time_now - ), - "private_user_data": self.encode_user_data( - sync_result.private_user_data, filter, time_now + "presence": self.encode_user_data( + sync_result.presence, filter, time_now ), "rooms": self.encode_rooms( sync_result.rooms, filter, time_now, token_id From 471555b3a815968c4d7e41a1b99390c6a7917a21 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 5 Oct 2015 16:39:22 +0100 Subject: [PATCH 02/22] Move the rooms out into a room_map mapping from room_id to room. --- synapse/api/filtering.py | 8 ++++- synapse/handlers/sync.py | 27 ++++++----------- synapse/rest/client/v2_alpha/sync.py | 44 ++++++++++++++++++++-------- 3 files changed, 47 insertions(+), 32 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index c066ce89e..2d5431ba6 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -136,7 +136,13 @@ class Filter(object): self.filter_json = filter_json def timeline_limit(self): - return self.filter_json.get("room", {}).get("timeline", {}).get(limit, 10) + return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10) + + def presence_limit(self): + return self.filter_json.get("presence", {}).get("limit", 10) + + def ephemeral_limit(self): + return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10) def filter_public_user_data(self, events): return self._filter_on_key(events, ["public_user_data"]) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2a0e04543..9d488fa25 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -31,6 +31,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [ "filter", ]) + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -44,6 +45,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ """ return bool(self.events) + class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "room_id", "timeline", @@ -125,11 +127,7 @@ class SyncHandler(BaseHandler): if since_token is None: return self.initial_sync(sync_config) else: - if sync_config.gap: - return self.incremental_sync_with_gap(sync_config, since_token) - else: - # TODO(mjark): Handle gapless sync - raise NotImplementedError() + return self.incremental_sync_with_gap(sync_config, since_token) @defer.inlineCallbacks def initial_sync(self, sync_config): @@ -174,7 +172,7 @@ class SyncHandler(BaseHandler): A Deferred RoomSyncResult. """ - recents, prev_batch_token, limited = yield self.load_filtered_recents( + batch = yield self.load_filtered_recents( room_id, sync_config, now_token, ) @@ -185,10 +183,8 @@ class SyncHandler(BaseHandler): defer.returnValue(RoomSyncResult( room_id=room_id, - events=recents, - prev_batch=prev_batch_token, + timeline=batch, state=current_state_events, - limited=limited, ephemeral=[], )) @@ -199,18 +195,13 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ - if sync_config.sort == "timeline,desc": - # TODO(mjark): Handle going through events in reverse order?. - # What does "most recent events" mean when applying the limits mean - # in this case? - raise NotImplementedError() - now_token = yield self.event_sources.get_current_token() presence_source = self.event_sources.sources["presence"] presence, presence_key = yield presence_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.presence_key, + limit=sync_config.filter.presence_limit(), ) now_token = now_token.copy_and_replace("presence_key", presence_key) @@ -218,6 +209,7 @@ class SyncHandler(BaseHandler): typing, typing_key = yield typing_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.typing_key, + limit=sync_config.filter.ephemeral_limit(), ) now_token = now_token.copy_and_replace("typing_key", typing_key) @@ -295,8 +287,7 @@ class SyncHandler(BaseHandler): rooms.append(room_sync) defer.returnValue(SyncResult( - public_user_data=presence, - private_user_data=[], + presence=presence, rooms=rooms, next_batch=now_token, )) @@ -407,7 +398,7 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token, ) - logging.debug("Recents %r", recents) + logging.debug("Recents %r", batch) # TODO(mjark): This seems racy since this isn't being passed a # token to indicate what point in the stream this is diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index ea6600b1d..1f3824d92 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer, parse_boolean + RestServlet, parse_string, parse_integer ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken @@ -46,8 +46,14 @@ class SyncRestServlet(RestServlet): "next_batch": // batch token for the next /sync "presence": // presence data for the user. "rooms": { - "roomlist": [{ // List of rooms with updates. - "room_id": // Id of the room being updated + "default": { + "invited": [], // Ids of invited rooms being updated. + "joined": [], // Ids of joined rooms being updated. + "archived": [] // Ids of archived rooms being updated. + } + } + "room_map": { + "${room_id}": { // Id of the room being updated "event_map": // Map of EventID -> event JSON. "timeline": { // The recent events in the room if gap is "true" "limited": // Was the per-room event limit exceeded? @@ -58,7 +64,7 @@ class SyncRestServlet(RestServlet): "state": [] // list of EventIDs updating the current state to // be what it should be at the end of the batch. "ephemeral": [] - }] + } } } """ @@ -115,13 +121,16 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() + room_map, rooms = self.encode_rooms( + sync_result.rooms, filter, time_now, token_id + ) + response_content = { "presence": self.encode_user_data( sync_result.presence, filter, time_now ), - "rooms": self.encode_rooms( - sync_result.rooms, filter, time_now, token_id - ), + "room_map": room_map, + "rooms": rooms, "next_batch": sync_result.next_batch.to_string(), } @@ -131,10 +140,21 @@ class SyncRestServlet(RestServlet): return events def encode_rooms(self, rooms, filter, time_now, token_id): - return [ - self.encode_room(room, filter, time_now, token_id) - for room in rooms - ] + room_map = {} + joined = [] + for room in rooms: + room_map[room.room_id] = self.encode_room( + room, filter, time_now, token_id + ) + joined.append(room.room_id) + + return room_map, { + "default": { + "joined": joined, + "invited": [], + "archived": [], + } + } @staticmethod def encode_room(room, filter, time_now, token_id): @@ -159,7 +179,6 @@ class SyncRestServlet(RestServlet): ) recent_event_ids.append(event.event_id) result = { - "room_id": room.room_id, "event_map": event_map, "events": { "batch": recent_event_ids, @@ -167,7 +186,6 @@ class SyncRestServlet(RestServlet): }, "state": state_event_ids, "limited": room.limited, - "published": room.published, "ephemeral": room.ephemeral, } return result From e3d3205cd953342ce84b8a148c4f469ce7b79b7a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Oct 2015 15:55:20 +0100 Subject: [PATCH 03/22] Update the sync response to match the latest spec --- synapse/rest/client/v2_alpha/sync.py | 46 +++++++++++++--------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1f3824d92..84011918a 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -45,26 +45,29 @@ class SyncRestServlet(RestServlet): { "next_batch": // batch token for the next /sync "presence": // presence data for the user. - "rooms": { - "default": { "invited": [], // Ids of invited rooms being updated. "joined": [], // Ids of joined rooms being updated. "archived": [] // Ids of archived rooms being updated. } } - "room_map": { - "${room_id}": { // Id of the room being updated - "event_map": // Map of EventID -> event JSON. - "timeline": { // The recent events in the room if gap is "true" + "rooms": { + "joined": { // Joined rooms being updated. + "${room_id}": { // Id of the room being updated + "event_map": // Map of EventID -> event JSON. + "timeline": { // The recent events in the room if gap is "true" "limited": // Was the per-room event limit exceeded? - // otherwise the next events in the room. - "batch": [] // list of EventIDs in the "event_map". + // otherwise the next events in the room. + "events": [] // list of EventIDs in the "event_map". "prev_batch": // back token for getting previous events. + } + "state": {"events": []} // list of EventIDs updating the + // current state to be what it should + // be at the end of the batch. + "ephemeral": {"events": []} // list of event objects } - "state": [] // list of EventIDs updating the current state to - // be what it should be at the end of the batch. - "ephemeral": [] - } + }, + "invited": {}, // Ids of invited rooms being updated. + "archived": {} // Ids of archived rooms being updated. } } """ @@ -121,7 +124,7 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() - room_map, rooms = self.encode_rooms( + rooms = self.encode_rooms( sync_result.rooms, filter, time_now, token_id ) @@ -129,7 +132,6 @@ class SyncRestServlet(RestServlet): "presence": self.encode_user_data( sync_result.presence, filter, time_now ), - "room_map": room_map, "rooms": rooms, "next_batch": sync_result.next_batch.to_string(), } @@ -140,20 +142,16 @@ class SyncRestServlet(RestServlet): return events def encode_rooms(self, rooms, filter, time_now, token_id): - room_map = {} - joined = [] + joined = {} for room in rooms: - room_map[room.room_id] = self.encode_room( + joined[room.room_id] = self.encode_room( room, filter, time_now, token_id ) - joined.append(room.room_id) - return room_map, { - "default": { - "joined": joined, - "invited": [], - "archived": [], - } + return { + "joined": joined, + "invited": {}, + "archived": {}, } @staticmethod From dfef2b41aa3202b130661c3c423b2cf7d0dbba97 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Oct 2015 15:17:43 +0100 Subject: [PATCH 04/22] Update the v2 room sync format to match the current v2 spec --- synapse/handlers/sync.py | 25 +++++++++++-------------- synapse/rest/client/v2_alpha/sync.py | 14 +++++++------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9d488fa25..76cca7c62 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -165,8 +165,7 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def initial_sync_for_room(self, room_id, sync_config, now_token, - published_room_ids): + def initial_sync_for_room(self, room_id, sync_config, now_token): """Sync a room for a client which is starting without any state Returns: A Deferred RoomSyncResult. @@ -230,10 +229,6 @@ class SyncHandler(BaseHandler): sync_config.user ) - # TODO (mjark): Does public mean "published"? - published_rooms = yield self.store.get_rooms(is_public=True) - published_room_ids = set(r["room_id"] for r in published_rooms) - timeline_limit = sync_config.filter.timeline_limit() room_events, _ = yield self.store.get_room_events_stream( @@ -268,11 +263,12 @@ class SyncHandler(BaseHandler): room_sync = RoomSyncResult( room_id=room_id, - published=room_id in published_room_ids, - events=recents, - prev_batch=prev_batch, + timeline=TimelineBatch( + events=recents, + prev_batch=prev_batch, + limited=False, + ), state=state, - limited=False, ephemeral=typing_by_room.get(room_id, []) ) if room_sync: @@ -344,11 +340,11 @@ class SyncHandler(BaseHandler): limited = True recents = [] filtering_factor = 2 - load_limit = max(sync_config.limit * filtering_factor, 100) + timeline_limit = sync_config.filter.timeline_limit() + load_limit = max(timeline_limit * filtering_factor, 100) max_repeat = 3 # Only try a few times per room, otherwise room_key = now_token.room_key end_key = room_key - timeline_limit = sync_config.filter.timeline_limit() while limited and len(recents) < timeline_limit and max_repeat: events, keys = yield self.store.get_recent_events_for_room( @@ -369,8 +365,9 @@ class SyncHandler(BaseHandler): limited = False max_repeat -= 1 - if len(recents) > sync_config.limit: - recents = recents[-sync_config.limit:] + if len(recents) > timeline_limit: + limited = True + recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before prev_batch_token = now_token.copy_and_replace( diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 84011918a..97bf95acf 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -158,7 +158,7 @@ class SyncRestServlet(RestServlet): def encode_room(room, filter, time_now, token_id): event_map = {} state_events = filter.filter_room_state(room.state) - recent_events = filter.filter_room_events(room.events) + recent_events = filter.filter_room_events(room.timeline.events) state_event_ids = [] recent_event_ids = [] for event in state_events: @@ -178,13 +178,13 @@ class SyncRestServlet(RestServlet): recent_event_ids.append(event.event_id) result = { "event_map": event_map, - "events": { - "batch": recent_event_ids, - "prev_batch": room.prev_batch.to_string(), + "timeline": { + "events": recent_event_ids, + "prev_batch": room.timeline.prev_batch.to_string(), + "limited": room.timeline.limited, }, - "state": state_event_ids, - "limited": room.limited, - "ephemeral": room.ephemeral, + "state": {"events": state_event_ids}, + "ephemeral": {"events": room.ephemeral}, } return result From c15cf6ac069386df3095b5c69af96f0c76ce5276 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Oct 2015 18:50:15 +0100 Subject: [PATCH 05/22] Format the presence events correctly for v2 --- synapse/rest/client/v2_alpha/sync.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 97bf95acf..f20b830ed 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -26,6 +26,7 @@ from synapse.events.utils import ( from synapse.api.filtering import Filter from ._base import client_v2_pattern +import copy import logging logger = logging.getLogger(__name__) @@ -129,7 +130,7 @@ class SyncRestServlet(RestServlet): ) response_content = { - "presence": self.encode_user_data( + "presence": self.encode_presence( sync_result.presence, filter, time_now ), "rooms": rooms, @@ -138,8 +139,13 @@ class SyncRestServlet(RestServlet): defer.returnValue((200, response_content)) - def encode_user_data(self, events, filter, time_now): - return events + def encode_presence(self, events, filter, time_now): + formatted = [] + for event in events: + event = copy.deepcopy(event) + event['sender'] = event['content'].pop('user_id'); + formatted.append(event) + return {"events": formatted} def encode_rooms(self, rooms, filter, time_now, token_id): joined = {} From 0a96a9a02371bd36970db7dfdb2d4c6e98e0200e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Oct 2015 19:57:50 +0100 Subject: [PATCH 06/22] Set the user as online if they start polling the v2 sync --- synapse/rest/client/v2_alpha/sync.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f20b830ed..3348b46c1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -79,6 +79,7 @@ class SyncRestServlet(RestServlet): def __init__(self, hs): super(SyncRestServlet, self).__init__() self.auth = hs.get_auth() + self.event_stream_handler = hs.get_handlers().event_stream_handler self.sync_handler = hs.get_handlers().sync_handler self.clock = hs.get_clock() self.filtering = hs.get_filtering() @@ -119,9 +120,16 @@ class SyncRestServlet(RestServlet): else: since_token = None - sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout - ) + if set_presence == "online": + yield self.event_stream_handler.started_stream(user) + + try: + sync_result = yield self.sync_handler.wait_for_sync_for_user( + sync_config, since_token=since_token, timeout=timeout + ) + finally: + if set_presence == "online": + self.event_stream_handler.stopped_stream(user) time_now = self.clock.time_msec() From 586beb8318bd259581918a8b47f5981f0b90b7e9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 12 Oct 2015 16:54:58 +0100 Subject: [PATCH 07/22] Update the filters to match the latest spec. Apply the filter the 'timeline' and 'ephemeral' keys of rooms. Apply the filter to the 'presence' key of a sync response. --- synapse/api/filtering.py | 53 ++++++++++++++++++---------- synapse/handlers/sync.py | 6 ++-- synapse/rest/client/v2_alpha/sync.py | 26 ++++++-------- tests/api/test_filtering.py | 12 +++---- 4 files changed, 55 insertions(+), 42 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 2d5431ba6..e79e91e7e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -144,17 +144,14 @@ class Filter(object): def ephemeral_limit(self): return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10) - def filter_public_user_data(self, events): - return self._filter_on_key(events, ["public_user_data"]) - - def filter_private_user_data(self, events): - return self._filter_on_key(events, ["private_user_data"]) + def filter_presence(self, events): + return self._filter_on_key(events, ["presence"]) def filter_room_state(self, events): return self._filter_on_key(events, ["room", "state"]) - def filter_room_events(self, events): - return self._filter_on_key(events, ["room", "events"]) + def filter_room_timeline(self, events): + return self._filter_on_key(events, ["room", "timeline"]) def filter_room_ephemeral(self, events): return self._filter_on_key(events, ["room", "ephemeral"]) @@ -178,11 +175,34 @@ class Filter(object): return [e for e in events if self._passes_definition(definition, e)] def _passes_definition(self, definition, event): + """Check if the event passes the filter definition + Args: + definition(dict): The filter definition to check against + event(dict or Event): The event to check + Returns: + True if the event passes the filter in the definition + """ + if type(event) is dict: + room_id = event.get("room_id") + sender = event.get("sender") + event_type = event["type"] + else: + room_id = getattr(event, "room_id", None) + sender = getattr(event, "sender", None) + event_type = event.type + return self._event_passes_definition( + definition, room_id, sender, event_type + ) + + def _event_passes_definition(self, definition, room_id, sender, + event_type): """Check if the event passes through the given definition. Args: definition(dict): The definition to check against. - event(Event): The event to check. + room_id(str): The id of the room this event is in or None. + sender(str): The sender of the event + event_type(str): The type of the event. Returns: True if the event passes through the filter. """ @@ -194,8 +214,7 @@ class Filter(object): # and 'not_types' then it is treated as only being in 'not_types') # room checks - if hasattr(event, "room_id"): - room_id = event.room_id + if room_id is not None: allow_rooms = definition.get("rooms", None) reject_rooms = definition.get("not_rooms", None) if reject_rooms and room_id in reject_rooms: @@ -204,9 +223,7 @@ class Filter(object): return False # sender checks - if hasattr(event, "sender"): - # Should we be including event.state_key for some event types? - sender = event.sender + if sender is not None: allow_senders = definition.get("senders", None) reject_senders = definition.get("not_senders", None) if reject_senders and sender in reject_senders: @@ -217,12 +234,12 @@ class Filter(object): # type checks if "not_types" in definition: for def_type in definition["not_types"]: - if self._event_matches_type(event, def_type): + if self._event_matches_type(event_type, def_type): return False if "types" in definition: included = False for def_type in definition["types"]: - if self._event_matches_type(event, def_type): + if self._event_matches_type(event_type, def_type): included = True break if not included: @@ -230,9 +247,9 @@ class Filter(object): return True - def _event_matches_type(self, event, def_type): + def _event_matches_type(self, event_type, def_type): if def_type.endswith("*"): type_prefix = def_type[:-1] - return event.type.startswith(type_prefix) + return event_type.startswith(type_prefix) else: - return event.type == def_type + return event_type == def_type diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 76cca7c62..edc728ece 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -277,7 +277,7 @@ class SyncHandler(BaseHandler): for room_id in room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - published_room_ids, typing_by_room + typing_by_room ) if room_sync: rooms.append(room_sync) @@ -355,7 +355,7 @@ class SyncHandler(BaseHandler): ) (room_key, _) = keys end_key = "s" + room_key.split('-')[-1] - loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents = sync_config.filter.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( sync_config.user.to_string(), room_id, loaded_recents, ) @@ -381,7 +381,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - published_room_ids, typing_by_room): + typing_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 3348b46c1..1223a4a7f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -46,11 +46,6 @@ class SyncRestServlet(RestServlet): { "next_batch": // batch token for the next /sync "presence": // presence data for the user. - "invited": [], // Ids of invited rooms being updated. - "joined": [], // Ids of joined rooms being updated. - "archived": [] // Ids of archived rooms being updated. - } - } "rooms": { "joined": { // Joined rooms being updated. "${room_id}": { // Id of the room being updated @@ -67,8 +62,8 @@ class SyncRestServlet(RestServlet): "ephemeral": {"events": []} // list of event objects } }, - "invited": {}, // Ids of invited rooms being updated. - "archived": {} // Ids of archived rooms being updated. + "invited": {}, // Invited rooms being updated. + "archived": {} // Archived rooms being updated. } } """ @@ -151,9 +146,9 @@ class SyncRestServlet(RestServlet): formatted = [] for event in events: event = copy.deepcopy(event) - event['sender'] = event['content'].pop('user_id'); + event['sender'] = event['content'].pop('user_id') formatted.append(event) - return {"events": formatted} + return {"events": filter.filter_presence(formatted)} def encode_rooms(self, rooms, filter, time_now, token_id): joined = {} @@ -172,9 +167,10 @@ class SyncRestServlet(RestServlet): def encode_room(room, filter, time_now, token_id): event_map = {} state_events = filter.filter_room_state(room.state) - recent_events = filter.filter_room_events(room.timeline.events) + timeline_events = filter.filter_room_timeline(room.timeline.events) + ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) state_event_ids = [] - recent_event_ids = [] + timeline_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -183,22 +179,22 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - for event in recent_events: + for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( event, time_now, token_id=token_id, event_format=format_event_for_client_v2_without_event_id, ) - recent_event_ids.append(event.event_id) + timeline_event_ids.append(event.event_id) result = { "event_map": event_map, "timeline": { - "events": recent_event_ids, + "events": timeline_event_ids, "prev_batch": room.timeline.prev_batch.to_string(), "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, - "ephemeral": {"events": room.ephemeral}, + "ephemeral": {"events": ephemeral_events}, } return result diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 65b2f590c..6942cdac5 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -345,9 +345,9 @@ class FilteringTestCase(unittest.TestCase): ) @defer.inlineCallbacks - def test_filter_public_user_data_match(self): + def test_filter_presence_match(self): user_filter_json = { - "public_user_data": { + "presence": { "types": ["m.*"] } } @@ -368,13 +368,13 @@ class FilteringTestCase(unittest.TestCase): filter_id=filter_id, ) - results = user_filter.filter_public_user_data(events=events) + results = user_filter.filter_presence(events=events) self.assertEquals(events, results) @defer.inlineCallbacks - def test_filter_public_user_data_no_match(self): + def test_filter_presence_no_match(self): user_filter_json = { - "public_user_data": { + "presence": { "types": ["m.*"] } } @@ -395,7 +395,7 @@ class FilteringTestCase(unittest.TestCase): filter_id=filter_id, ) - results = user_filter.filter_public_user_data(events=events) + results = user_filter.filter_presence(events=events) self.assertEquals([], results) @defer.inlineCallbacks From 956509dfecccca944d89bc9e9f002e5039cf81fc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 10:24:51 +0100 Subject: [PATCH 08/22] Start spliting out the rooms into joined and invited in v2 sync --- synapse/handlers/sync.py | 58 +++++++++++++++++++--------- synapse/rest/client/v2_alpha/sync.py | 18 ++++----- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index edc728ece..e693e7c80 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -46,7 +46,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ return bool(self.events) -class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ +class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", "timeline", "state", @@ -61,10 +61,24 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ return bool(self.timeline or self.state or self.ephemeral) +class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ + "room_id", + "invite_state", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.invite_state) + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. - "rooms", # RoomSyncResult for each room. + "joined", # JoinedSyncResult for each joined room. + "invited", # InvitedSyncResult for each invited room. ])): __slots__ = [] @@ -151,24 +165,31 @@ class SyncHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - rooms = [] + joined = [] for event in room_list: - room_sync = yield self.initial_sync_for_room( - event.room_id, sync_config, now_token, - ) - rooms.append(room_sync) + if event.membership == Membership.JOIN: + room_sync = yield self.initial_sync_for_room( + event.room_id, sync_config, now_token, + ) + joined.append(room_sync) + elif event.membership == Membership.INVITE: + invited.append(InvitedSyncResult( + room_id=event.room_id, + invited_state=[event], + ) defer.returnValue(SyncResult( presence=presence, - rooms=rooms, + joined=joined, + invited=[], next_batch=now_token, )) @defer.inlineCallbacks - def initial_sync_for_room(self, room_id, sync_config, now_token): + def initial_sync_for_joined_room(self, room_id, sync_config, now_token): """Sync a room for a client which is starting without any state Returns: - A Deferred RoomSyncResult. + A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( @@ -180,7 +201,7 @@ class SyncHandler(BaseHandler): ) current_state_events = current_state.values() - defer.returnValue(RoomSyncResult( + defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, state=current_state_events, @@ -239,7 +260,7 @@ class SyncHandler(BaseHandler): limit=timeline_limit + 1, ) - rooms = [] + joined = [] if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. @@ -261,7 +282,7 @@ class SyncHandler(BaseHandler): sync_config, room_id, state ) - room_sync = RoomSyncResult( + room_sync = JoinedSyncResult( room_id=room_id, timeline=TimelineBatch( events=recents, @@ -272,7 +293,7 @@ class SyncHandler(BaseHandler): ephemeral=typing_by_room.get(room_id, []) ) if room_sync: - rooms.append(room_sync) + joined.append(room_sync) else: for room_id in room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( @@ -280,11 +301,12 @@ class SyncHandler(BaseHandler): typing_by_room ) if room_sync: - rooms.append(room_sync) + joined.append(room_sync) defer.returnValue(SyncResult( presence=presence, - rooms=rooms, + joined=joined, + invited=[], next_batch=now_token, )) @@ -386,7 +408,7 @@ class SyncHandler(BaseHandler): the room. Gives the client the most recent events and the changes to state. Returns: - A Deferred RoomSyncResult + A Deferred JoinedSyncResult """ # TODO(mjark): Check for redactions we might have missed. @@ -418,7 +440,7 @@ class SyncHandler(BaseHandler): sync_config, room_id, state_events_delta ) - room_sync = RoomSyncResult( + room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, state=state_events_delta, diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1223a4a7f..9b87879f5 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -128,15 +128,19 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() - rooms = self.encode_rooms( - sync_result.rooms, filter, time_now, token_id + joined = self.encode_joined( + sync_result.joined, filter, time_now, token_id ) response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now ), - "rooms": rooms, + "rooms": { + "joined": joined, + "invited": {}, + "archived": {}, + }, "next_batch": sync_result.next_batch.to_string(), } @@ -150,18 +154,14 @@ class SyncRestServlet(RestServlet): formatted.append(event) return {"events": filter.filter_presence(formatted)} - def encode_rooms(self, rooms, filter, time_now, token_id): + def encode_joined(self, rooms, filter, time_now, token_id): joined = {} for room in rooms: joined[room.room_id] = self.encode_room( room, filter, time_now, token_id ) - return { - "joined": joined, - "invited": {}, - "archived": {}, - } + return joined @staticmethod def encode_room(room, filter, time_now, token_id): From ab9cf732585244781ba67f4bb4c235ded3d4661a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 11:03:48 +0100 Subject: [PATCH 09/22] Include invited rooms in the initial sync --- synapse/handlers/sync.py | 16 ++++++---------- synapse/rest/client/v2_alpha/sync.py | 21 ++++++++++++++++++++- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e693e7c80..574412d6b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -63,16 +63,10 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ "room_id", - "invite_state", + "invite", ])): __slots__ = [] - def __nonzero__(self): - """Make the result appear empty if there are no updates. This is used - to tell if room needs to be part of the sync result. - """ - return bool(self.invite_state) - class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync @@ -166,6 +160,7 @@ class SyncHandler(BaseHandler): ) joined = [] + invited = [] for event in room_list: if event.membership == Membership.JOIN: room_sync = yield self.initial_sync_for_room( @@ -173,15 +168,16 @@ class SyncHandler(BaseHandler): ) joined.append(room_sync) elif event.membership == Membership.INVITE: + invite = yield self.store.get_event(event.event_id) invited.append(InvitedSyncResult( room_id=event.room_id, - invited_state=[event], - ) + invite=invite, + )) defer.returnValue(SyncResult( presence=presence, joined=joined, - invited=[], + invited=invited, next_batch=now_token, )) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 9b87879f5..399df9e77 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -132,13 +132,17 @@ class SyncRestServlet(RestServlet): sync_result.joined, filter, time_now, token_id ) + invited = self.encode_invited( + sync_result.invited, filter, time_now, token_id + ) + response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now ), "rooms": { "joined": joined, - "invited": {}, + "invited": invited, "archived": {}, }, "next_batch": sync_result.next_batch.to_string(), @@ -163,6 +167,21 @@ class SyncRestServlet(RestServlet): return joined + def encode_invited(self, rooms, filter, time_now, token_id): + invited = {} + for room in rooms: + invite = serialize_event( + room.invite, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, + ) + invited_state = invite.get("unsigned", {}).pop("invite_room_state", []) + invited_state.append(invite) + invited[room.room_id] = { + "invite_state": { "events": invited_state } + } + + return invited + @staticmethod def encode_room(room, filter, time_now, token_id): event_map = {} From 40b6a5aad1309fed9d1e32be387798dd46b2cf4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2015 11:38:48 +0100 Subject: [PATCH 10/22] Split out the schema preparation and update logic into its own module --- synapse/storage/__init__.py | 378 +------------------------- synapse/storage/_schema_prepare.py | 395 ++++++++++++++++++++++++++++ synapse/storage/engines/postgres.py | 2 +- synapse/storage/engines/sqlite3.py | 4 +- 4 files changed, 402 insertions(+), 377 deletions(-) create mode 100644 synapse/storage/_schema_prepare.py diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 340e59afc..4be629bff 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -41,23 +41,16 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore +from ._schema_prepare import UpgradeDatabaseException + +__all__ = [UpgradeDatabaseException] -import fnmatch -import imp import logging -import os -import re logger = logging.getLogger(__name__) -# Remember to update this number every time a change is made to database -# schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 24 - -dir_path = os.path.abspath(os.path.dirname(__file__)) - # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes @@ -158,371 +151,6 @@ class DataStore(RoomMemberStore, RoomStore, ) -def read_schema(path): - """ Read the named database schema. - - Args: - path: Path of the database schema. - Returns: - A string containing the database schema. - """ - with open(path) as schema_file: - return schema_file.read() - - -class PrepareDatabaseException(Exception): - pass - - -class UpgradeDatabaseException(PrepareDatabaseException): - pass - - -def prepare_database(db_conn, database_engine): - """Prepares a database for usage. Will either create all necessary tables - or upgrade from an older schema version. - """ - try: - cur = db_conn.cursor() - version_info = _get_or_create_schema_state(cur, database_engine) - - if version_info: - user_version, delta_files, upgraded = version_info - _upgrade_existing_database( - cur, user_version, delta_files, upgraded, database_engine - ) - else: - _setup_new_database(cur, database_engine) - - # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) - - cur.close() - db_conn.commit() - except: - db_conn.rollback() - raise - - -def _setup_new_database(cur, database_engine): - """Sets up the database by finding a base set of "full schemas" and then - applying any necessary deltas. - - The "full_schemas" directory has subdirectories named after versions. This - function searches for the highest version less than or equal to - `SCHEMA_VERSION` and executes all .sql files in that directory. - - The function will then apply all deltas for all versions after the base - version. - - Example directory structure: - - schema/ - delta/ - ... - full_schemas/ - 3/ - test.sql - ... - 11/ - foo.sql - bar.sql - ... - - In the example foo.sql and bar.sql would be run, and then any delta files - for versions strictly greater than 11. - """ - current_dir = os.path.join(dir_path, "schema", "full_schemas") - directory_entries = os.listdir(current_dir) - - valid_dirs = [] - pattern = re.compile(r"^\d+(\.sql)?$") - for filename in directory_entries: - match = pattern.match(filename) - abs_path = os.path.join(current_dir, filename) - if match and os.path.isdir(abs_path): - ver = int(match.group(0)) - if ver <= SCHEMA_VERSION: - valid_dirs.append((ver, abs_path)) - else: - logger.warn("Unexpected entry in 'full_schemas': %s", filename) - - if not valid_dirs: - raise PrepareDatabaseException( - "Could not find a suitable base set of full schemas" - ) - - max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0]) - - logger.debug("Initialising schema v%d", max_current_ver) - - directory_entries = os.listdir(sql_dir) - - for filename in fnmatch.filter(directory_entries, "*.sql"): - sql_loc = os.path.join(sql_dir, filename) - logger.debug("Applying schema %s", sql_loc) - executescript(cur, sql_loc) - - cur.execute( - database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded)" - " VALUES (?,?)" - ), - (max_current_ver, False,) - ) - - _upgrade_existing_database( - cur, - current_version=max_current_ver, - applied_delta_files=[], - upgraded=False, - database_engine=database_engine, - ) - - -def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded, database_engine): - """Upgrades an existing database. - - Delta files can either be SQL stored in *.sql files, or python modules - in *.py. - - There can be multiple delta files per version. Synapse will keep track of - which delta files have been applied, and will apply any that haven't been - even if there has been no version bump. This is useful for development - where orthogonal schema changes may happen on separate branches. - - Different delta files for the same version *must* be orthogonal and give - the same result when applied in any order. No guarantees are made on the - order of execution of these scripts. - - This is a no-op of current_version == SCHEMA_VERSION. - - Example directory structure: - - schema/ - delta/ - 11/ - foo.sql - ... - 12/ - foo.sql - bar.py - ... - full_schemas/ - ... - - In the example, if current_version is 11, then foo.sql will be run if and - only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in - some arbitrary order. - - Args: - cur (Cursor) - current_version (int): The current version of the schema. - applied_delta_files (list): A list of deltas that have already been - applied. - upgraded (bool): Whether the current version was generated by having - applied deltas or from full schema file. If `True` the function - will never apply delta files for the given `current_version`, since - the current_version wasn't generated by applying those delta files. - """ - - if current_version > SCHEMA_VERSION: - raise ValueError( - "Cannot use this database as it is too " + - "new for the server to understand" - ) - - start_ver = current_version - if not upgraded: - start_ver += 1 - - logger.debug("applied_delta_files: %s", applied_delta_files) - - for v in range(start_ver, SCHEMA_VERSION + 1): - logger.debug("Upgrading schema to v%d", v) - - delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) - - try: - directory_entries = os.listdir(delta_dir) - except OSError: - logger.exception("Could not open delta dir for version %d", v) - raise UpgradeDatabaseException( - "Could not open delta dir for version %d" % (v,) - ) - - directory_entries.sort() - for file_name in directory_entries: - relative_path = os.path.join(str(v), file_name) - logger.debug("Found file: %s", relative_path) - if relative_path in applied_delta_files: - continue - - absolute_path = os.path.join( - dir_path, "schema", "delta", relative_path, - ) - root_name, ext = os.path.splitext(file_name) - if ext == ".py": - # This is a python upgrade module. We need to import into some - # package and then execute its `run_upgrade` function. - module_name = "synapse.storage.v%d_%s" % ( - v, root_name - ) - with open(absolute_path) as python_file: - module = imp.load_source( - module_name, absolute_path, python_file - ) - logger.debug("Running script %s", relative_path) - module.run_upgrade(cur, database_engine) - elif ext == ".pyc": - # Sometimes .pyc files turn up anyway even though we've - # disabled their generation; e.g. from distribution package - # installers. Silently skip it - pass - elif ext == ".sql": - # A plain old .sql file, just read and execute it - logger.debug("Applying schema %s", relative_path) - executescript(cur, absolute_path) - else: - # Not a valid delta file. - logger.warn( - "Found directory entry that did not end in .py or" - " .sql: %s", - relative_path, - ) - continue - - # Mark as done. - cur.execute( - database_engine.convert_param_style( - "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)", - ), - (v, relative_path) - ) - - cur.execute("DELETE FROM schema_version") - cur.execute( - database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded)" - " VALUES (?,?)", - ), - (v, True) - ) - - -def get_statements(f): - statement_buffer = "" - in_comment = False # If we're in a /* ... */ style comment - - for line in f: - line = line.strip() - - if in_comment: - # Check if this line contains an end to the comment - comments = line.split("*/", 1) - if len(comments) == 1: - continue - line = comments[1] - in_comment = False - - # Remove inline block comments - line = re.sub(r"/\*.*\*/", " ", line) - - # Does this line start a comment? - comments = line.split("/*", 1) - if len(comments) > 1: - line = comments[0] - in_comment = True - - # Deal with line comments - line = line.split("--", 1)[0] - line = line.split("//", 1)[0] - - # Find *all* semicolons. We need to treat first and last entry - # specially. - statements = line.split(";") - - # We must prepend statement_buffer to the first statement - first_statement = "%s %s" % ( - statement_buffer.strip(), - statements[0].strip() - ) - statements[0] = first_statement - - # Every entry, except the last, is a full statement - for statement in statements[:-1]: - yield statement.strip() - - # The last entry did *not* end in a semicolon, so we store it for the - # next semicolon we find - statement_buffer = statements[-1].strip() - - -def executescript(txn, schema_path): - with open(schema_path, 'r') as f: - for statement in get_statements(f): - txn.execute(statement) - - -def _get_or_create_schema_state(txn, database_engine): - # Bluntly try creating the schema_version tables. - schema_path = os.path.join( - dir_path, "schema", "schema_version.sql", - ) - executescript(txn, schema_path) - - txn.execute("SELECT version, upgraded FROM schema_version") - row = txn.fetchone() - current_version = int(row[0]) if row else None - upgraded = bool(row[1]) if row else None - - if current_version: - txn.execute( - database_engine.convert_param_style( - "SELECT file FROM applied_schema_deltas WHERE version >= ?" - ), - (current_version,) - ) - applied_deltas = [d for d, in txn.fetchall()] - return current_version, applied_deltas, upgraded - - return None - - -def prepare_sqlite3_database(db_conn): - """This function should be called before `prepare_database` on sqlite3 - databases. - - Since we changed the way we store the current schema version and handle - updates to schemas, we need a way to upgrade from the old method to the - new. This only affects sqlite databases since they were the only ones - supported at the time. - """ - with db_conn: - schema_path = os.path.join( - dir_path, "schema", "schema_version.sql", - ) - create_schema = read_schema(schema_path) - db_conn.executescript(create_schema) - - c = db_conn.execute("SELECT * FROM schema_version") - rows = c.fetchall() - c.close() - - if not rows: - c = db_conn.execute("PRAGMA user_version") - row = c.fetchone() - c.close() - - if row and row[0]: - db_conn.execute( - "REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (row[0], False) - ) - - def are_all_users_on_domain(txn, database_engine, domain): sql = database_engine.convert_param_style( "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?" diff --git a/synapse/storage/_schema_prepare.py b/synapse/storage/_schema_prepare.py new file mode 100644 index 000000000..1ddf55be4 --- /dev/null +++ b/synapse/storage/_schema_prepare.py @@ -0,0 +1,395 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import fnmatch +import imp +import logging +import os +import re + + +logger = logging.getLogger(__name__) + + +# Remember to update this number every time a change is made to database +# schema files, so the users will be informed on server restarts. +SCHEMA_VERSION = 24 + +dir_path = os.path.abspath(os.path.dirname(__file__)) + + +def read_schema(path): + """ Read the named database schema. + + Args: + path: Path of the database schema. + Returns: + A string containing the database schema. + """ + with open(path) as schema_file: + return schema_file.read() + + +class PrepareDatabaseException(Exception): + pass + + +class UpgradeDatabaseException(PrepareDatabaseException): + pass + + +def prepare_database(db_conn, database_engine): + """Prepares a database for usage. Will either create all necessary tables + or upgrade from an older schema version. + """ + try: + cur = db_conn.cursor() + version_info = _get_or_create_schema_state(cur, database_engine) + + if version_info: + user_version, delta_files, upgraded = version_info + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine + ) + else: + _setup_new_database(cur, database_engine) + + # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + + cur.close() + db_conn.commit() + except: + db_conn.rollback() + raise + + +def _setup_new_database(cur, database_engine): + """Sets up the database by finding a base set of "full schemas" and then + applying any necessary deltas. + + The "full_schemas" directory has subdirectories named after versions. This + function searches for the highest version less than or equal to + `SCHEMA_VERSION` and executes all .sql files in that directory. + + The function will then apply all deltas for all versions after the base + version. + + Example directory structure: + + schema/ + delta/ + ... + full_schemas/ + 3/ + test.sql + ... + 11/ + foo.sql + bar.sql + ... + + In the example foo.sql and bar.sql would be run, and then any delta files + for versions strictly greater than 11. + """ + current_dir = os.path.join(dir_path, "schema", "full_schemas") + directory_entries = os.listdir(current_dir) + + valid_dirs = [] + pattern = re.compile(r"^\d+(\.sql)?$") + for filename in directory_entries: + match = pattern.match(filename) + abs_path = os.path.join(current_dir, filename) + if match and os.path.isdir(abs_path): + ver = int(match.group(0)) + if ver <= SCHEMA_VERSION: + valid_dirs.append((ver, abs_path)) + else: + logger.warn("Unexpected entry in 'full_schemas': %s", filename) + + if not valid_dirs: + raise PrepareDatabaseException( + "Could not find a suitable base set of full schemas" + ) + + max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0]) + + logger.debug("Initialising schema v%d", max_current_ver) + + directory_entries = os.listdir(sql_dir) + + for filename in fnmatch.filter(directory_entries, "*.sql"): + sql_loc = os.path.join(sql_dir, filename) + logger.debug("Applying schema %s", sql_loc) + executescript(cur, sql_loc) + + cur.execute( + database_engine.convert_param_style( + "INSERT INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), + (max_current_ver, False,) + ) + + _upgrade_existing_database( + cur, + current_version=max_current_ver, + applied_delta_files=[], + upgraded=False, + database_engine=database_engine, + ) + + +def _upgrade_existing_database(cur, current_version, applied_delta_files, + upgraded, database_engine): + """Upgrades an existing database. + + Delta files can either be SQL stored in *.sql files, or python modules + in *.py. + + There can be multiple delta files per version. Synapse will keep track of + which delta files have been applied, and will apply any that haven't been + even if there has been no version bump. This is useful for development + where orthogonal schema changes may happen on separate branches. + + Different delta files for the same version *must* be orthogonal and give + the same result when applied in any order. No guarantees are made on the + order of execution of these scripts. + + This is a no-op of current_version == SCHEMA_VERSION. + + Example directory structure: + + schema/ + delta/ + 11/ + foo.sql + ... + 12/ + foo.sql + bar.py + ... + full_schemas/ + ... + + In the example, if current_version is 11, then foo.sql will be run if and + only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in + some arbitrary order. + + Args: + cur (Cursor) + current_version (int): The current version of the schema. + applied_delta_files (list): A list of deltas that have already been + applied. + upgraded (bool): Whether the current version was generated by having + applied deltas or from full schema file. If `True` the function + will never apply delta files for the given `current_version`, since + the current_version wasn't generated by applying those delta files. + """ + + if current_version > SCHEMA_VERSION: + raise ValueError( + "Cannot use this database as it is too " + + "new for the server to understand" + ) + + start_ver = current_version + if not upgraded: + start_ver += 1 + + logger.debug("applied_delta_files: %s", applied_delta_files) + + for v in range(start_ver, SCHEMA_VERSION + 1): + logger.debug("Upgrading schema to v%d", v) + + delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + + try: + directory_entries = os.listdir(delta_dir) + except OSError: + logger.exception("Could not open delta dir for version %d", v) + raise UpgradeDatabaseException( + "Could not open delta dir for version %d" % (v,) + ) + + directory_entries.sort() + for file_name in directory_entries: + relative_path = os.path.join(str(v), file_name) + logger.debug("Found file: %s", relative_path) + if relative_path in applied_delta_files: + continue + + absolute_path = os.path.join( + dir_path, "schema", "delta", relative_path, + ) + root_name, ext = os.path.splitext(file_name) + if ext == ".py": + # This is a python upgrade module. We need to import into some + # package and then execute its `run_upgrade` function. + module_name = "synapse.storage.v%d_%s" % ( + v, root_name + ) + with open(absolute_path) as python_file: + module = imp.load_source( + module_name, absolute_path, python_file + ) + logger.debug("Running script %s", relative_path) + module.run_upgrade(cur, database_engine) + elif ext == ".pyc": + # Sometimes .pyc files turn up anyway even though we've + # disabled their generation; e.g. from distribution package + # installers. Silently skip it + pass + elif ext == ".sql": + # A plain old .sql file, just read and execute it + logger.debug("Applying schema %s", relative_path) + executescript(cur, absolute_path) + else: + # Not a valid delta file. + logger.warn( + "Found directory entry that did not end in .py or" + " .sql: %s", + relative_path, + ) + continue + + # Mark as done. + cur.execute( + database_engine.convert_param_style( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + ), + (v, relative_path) + ) + + cur.execute("DELETE FROM schema_version") + cur.execute( + database_engine.convert_param_style( + "INSERT INTO schema_version (version, upgraded)" + " VALUES (?,?)", + ), + (v, True) + ) + + +def get_statements(f): + statement_buffer = "" + in_comment = False # If we're in a /* ... */ style comment + + for line in f: + line = line.strip() + + if in_comment: + # Check if this line contains an end to the comment + comments = line.split("*/", 1) + if len(comments) == 1: + continue + line = comments[1] + in_comment = False + + # Remove inline block comments + line = re.sub(r"/\*.*\*/", " ", line) + + # Does this line start a comment? + comments = line.split("/*", 1) + if len(comments) > 1: + line = comments[0] + in_comment = True + + # Deal with line comments + line = line.split("--", 1)[0] + line = line.split("//", 1)[0] + + # Find *all* semicolons. We need to treat first and last entry + # specially. + statements = line.split(";") + + # We must prepend statement_buffer to the first statement + first_statement = "%s %s" % ( + statement_buffer.strip(), + statements[0].strip() + ) + statements[0] = first_statement + + # Every entry, except the last, is a full statement + for statement in statements[:-1]: + yield statement.strip() + + # The last entry did *not* end in a semicolon, so we store it for the + # next semicolon we find + statement_buffer = statements[-1].strip() + + +def executescript(txn, schema_path): + with open(schema_path, 'r') as f: + for statement in get_statements(f): + txn.execute(statement) + + +def _get_or_create_schema_state(txn, database_engine): + # Bluntly try creating the schema_version tables. + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + executescript(txn, schema_path) + + txn.execute("SELECT version, upgraded FROM schema_version") + row = txn.fetchone() + current_version = int(row[0]) if row else None + upgraded = bool(row[1]) if row else None + + if current_version: + txn.execute( + database_engine.convert_param_style( + "SELECT file FROM applied_schema_deltas WHERE version >= ?" + ), + (current_version,) + ) + applied_deltas = [d for d, in txn.fetchall()] + return current_version, applied_deltas, upgraded + + return None + + +def prepare_sqlite3_database(db_conn): + """This function should be called before `prepare_database` on sqlite3 + databases. + + Since we changed the way we store the current schema version and handle + updates to schemas, we need a way to upgrade from the old method to the + new. This only affects sqlite databases since they were the only ones + supported at the time. + """ + with db_conn: + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + create_schema = read_schema(schema_path) + db_conn.executescript(create_schema) + + c = db_conn.execute("SELECT * FROM schema_version") + rows = c.fetchall() + c.close() + + if not rows: + c = db_conn.execute("PRAGMA user_version") + row = c.fetchone() + c.close() + + if row and row[0]: + db_conn.execute( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (row[0], False) + ) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 4a855ffd5..949396044 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import prepare_database +from synapse.storage._schema_prepare import prepare_database from ._base import IncorrectDatabaseSetup diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index d18e2808d..a66815ef2 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import prepare_database, prepare_sqlite3_database +from synapse.storage._schema_prepare import ( + prepare_database, prepare_sqlite3_database +) class Sqlite3Engine(object): From 54414221e4ced47e632144afa7d768a7e252214c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 11:43:12 +0100 Subject: [PATCH 11/22] Include invites in incremental sync --- synapse/handlers/sync.py | 31 ++++++++++++++++++++++------ synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 574412d6b..d9e55d8a5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -163,7 +163,7 @@ class SyncHandler(BaseHandler): invited = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.initial_sync_for_room( + room_sync = yield self.initial_sync_for_joined_room( event.room_id, sync_config, now_token, ) joined.append(room_sync) @@ -240,9 +240,9 @@ class SyncHandler(BaseHandler): ) if app_service: rooms = yield self.store.get_app_service_rooms(app_service) - room_ids = set(r.room_id for r in rooms) + joined_room_ids = set(r.room_id for r in rooms) else: - room_ids = yield rm_handler.get_joined_rooms_for_user( + joined_room_ids = yield rm_handler.get_joined_rooms_for_user( sync_config.user ) @@ -260,11 +260,17 @@ class SyncHandler(BaseHandler): if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. + invite_events = [] events_by_room_id = {} for event in room_events: events_by_room_id.setdefault(event.room_id, []).append(event) + if event.room_id not in joined_room_ids: + if (event.type == EventTypes.Member + and event.membership == Membership.INVITE + and event.state_key == sync_config.user.to_string()): + invite_events.append(event) - for room_id in room_ids: + for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) state = [event for event in recents if event.is_state()] if recents: @@ -291,7 +297,15 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) else: - for room_id in room_ids: + invites = yield self.store.get_rooms_for_user_where_membership_is( + user_id=sync_config.user.to_string(), + membership_list=[Membership.INVITE], + ) + invite_events = yield self.store.get_events( + [invite.event_id for invite in invites] + ) + + for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, typing_by_room @@ -299,10 +313,15 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) + invited = [ + InvitedSyncResult(room_id=event.room_id, invite=event) + for event in invite_events + ] + defer.returnValue(SyncResult( presence=presence, joined=joined, - invited=[], + invited=invited, next_batch=now_token, )) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 399df9e77..fffecb24f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -177,7 +177,7 @@ class SyncRestServlet(RestServlet): invited_state = invite.get("unsigned", {}).pop("invite_room_state", []) invited_state.append(invite) invited[room.room_id] = { - "invite_state": { "events": invited_state } + "invite_state": {"events": invited_state} } return invited From ec398af41c4d276abb02279efbcbb0aa08a4cbc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2015 11:41:04 +0100 Subject: [PATCH 12/22] Expose error more nicely --- synapse/app/homeserver.py | 5 ++--- synapse/storage/__init__.py | 3 --- synapse/storage/engines/postgres.py | 2 +- synapse/storage/engines/sqlite3.py | 2 +- synapse/storage/{_schema_prepare.py => schema_prepare.py} | 0 tests/utils.py | 2 +- 6 files changed, 5 insertions(+), 9 deletions(-) rename synapse/storage/{_schema_prepare.py => schema_prepare.py} (100%) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 190b03e2f..b284d07cf 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,9 +35,8 @@ if __name__ == '__main__': from synapse.storage.engines import create_engine, IncorrectDatabaseSetup -from synapse.storage import ( - are_all_users_on_domain, UpgradeDatabaseException, -) +from synapse.storage import are_all_users_on_domain +from synapse.storage.schema_prepare import UpgradeDatabaseException from synapse.server import HomeServer diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4be629bff..48a063374 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -41,9 +41,6 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore -from ._schema_prepare import UpgradeDatabaseException - -__all__ = [UpgradeDatabaseException] import logging diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 949396044..7e45dabf4 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage._schema_prepare import prepare_database +from synapse.storage.schema_prepare import prepare_database from ._base import IncorrectDatabaseSetup diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index a66815ef2..0eeaa45d1 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage._schema_prepare import ( +from synapse.storage.schema_prepare import ( prepare_database, prepare_sqlite3_database ) diff --git a/synapse/storage/_schema_prepare.py b/synapse/storage/schema_prepare.py similarity index 100% rename from synapse/storage/_schema_prepare.py rename to synapse/storage/schema_prepare.py diff --git a/tests/utils.py b/tests/utils.py index dd19a16fc..6eb575bd0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,7 +16,7 @@ from synapse.http.server import HttpServer from synapse.api.errors import cs_error, CodeMessageException, StoreError from synapse.api.constants import EventTypes -from synapse.storage import prepare_database +from synapse.storage.schema_prepare import prepare_database from synapse.storage.engines import create_engine from synapse.server import HomeServer From 17c80c8a3d92acca5bda9b0fc7d9898547476563 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2015 13:56:22 +0100 Subject: [PATCH 13/22] rename schema_prepare to prepare_database --- synapse/app/homeserver.py | 2 +- synapse/storage/engines/postgres.py | 2 +- synapse/storage/engines/sqlite3.py | 2 +- synapse/storage/{schema_prepare.py => prepare_database.py} | 0 tests/utils.py | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) rename synapse/storage/{schema_prepare.py => prepare_database.py} (100%) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b284d07cf..af53acb36 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -36,7 +36,7 @@ if __name__ == '__main__': from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain -from synapse.storage.schema_prepare import UpgradeDatabaseException +from synapse.storage.prepare_database import UpgradeDatabaseException from synapse.server import HomeServer diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 7e45dabf4..98d66e0a8 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.schema_prepare import prepare_database +from synapse.storage.prepare_database import prepare_database from ._base import IncorrectDatabaseSetup diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 0eeaa45d1..bad3b5c5a 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.schema_prepare import ( +from synapse.storage.prepare_database import ( prepare_database, prepare_sqlite3_database ) diff --git a/synapse/storage/schema_prepare.py b/synapse/storage/prepare_database.py similarity index 100% rename from synapse/storage/schema_prepare.py rename to synapse/storage/prepare_database.py diff --git a/tests/utils.py b/tests/utils.py index 6eb575bd0..4da51291a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,7 +16,7 @@ from synapse.http.server import HttpServer from synapse.api.errors import cs_error, CodeMessageException, StoreError from synapse.api.constants import EventTypes -from synapse.storage.schema_prepare import prepare_database +from synapse.storage.prepare_database import prepare_database from synapse.storage.engines import create_engine from synapse.server import HomeServer From cacf0688c691517dab55c3cff294b6bac7f0d6e3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 14:08:38 +0100 Subject: [PATCH 14/22] Add a get_invites_for_user method to the storage to find out the rooms a user is invited to --- synapse/handlers/sync.py | 8 ++------ synapse/storage/roommember.py | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d9e55d8a5..380798b7a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -297,12 +297,8 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) else: - invites = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE], - ) - invite_events = yield self.store.get_events( - [invite.event_id for invite in invites] + invite_events = yield self.store.get_invites_for_user( + sync_config.user.to_string() ) for room_id in joined_room_ids: diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c40d9a8a..dd98dcfda 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -110,6 +110,20 @@ class RoomMemberStore(SQLBaseStore): membership=membership, ).addCallback(self._get_events) + def get_invites_for_user(self, user_id): + """ Get all the invite events for a user + Args: + user_id (str): The user ID. + Returns: + A deferred list of event objects. + """ + + return self.get_rooms_for_user_where_membership_is( + user_id, [Membership.INVITE] + ).addCallback(lambda invites: self._get_events([ + invites.event_id for invite in invites + ])) + def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. From 2fa9e23e04a9bc2c9a192309db59fa8aae495432 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 14:12:43 +0100 Subject: [PATCH 15/22] Update the v2 filters to support filtering presence and remove support for public/private user data --- synapse/api/filtering.py | 62 ++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 4d570b74f..e79e91e7e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -54,7 +54,7 @@ class Filtering(object): ] room_level_definitions = [ - "state", "events", "ephemeral" + "state", "timeline", "ephemeral" ] for key in top_level_definitions: @@ -135,17 +135,23 @@ class Filter(object): def __init__(self, filter_json): self.filter_json = filter_json - def filter_public_user_data(self, events): - return self._filter_on_key(events, ["public_user_data"]) + def timeline_limit(self): + return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10) - def filter_private_user_data(self, events): - return self._filter_on_key(events, ["private_user_data"]) + def presence_limit(self): + return self.filter_json.get("presence", {}).get("limit", 10) + + def ephemeral_limit(self): + return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10) + + def filter_presence(self, events): + return self._filter_on_key(events, ["presence"]) def filter_room_state(self, events): return self._filter_on_key(events, ["room", "state"]) - def filter_room_events(self, events): - return self._filter_on_key(events, ["room", "events"]) + def filter_room_timeline(self, events): + return self._filter_on_key(events, ["room", "timeline"]) def filter_room_ephemeral(self, events): return self._filter_on_key(events, ["room", "ephemeral"]) @@ -169,11 +175,34 @@ class Filter(object): return [e for e in events if self._passes_definition(definition, e)] def _passes_definition(self, definition, event): + """Check if the event passes the filter definition + Args: + definition(dict): The filter definition to check against + event(dict or Event): The event to check + Returns: + True if the event passes the filter in the definition + """ + if type(event) is dict: + room_id = event.get("room_id") + sender = event.get("sender") + event_type = event["type"] + else: + room_id = getattr(event, "room_id", None) + sender = getattr(event, "sender", None) + event_type = event.type + return self._event_passes_definition( + definition, room_id, sender, event_type + ) + + def _event_passes_definition(self, definition, room_id, sender, + event_type): """Check if the event passes through the given definition. Args: definition(dict): The definition to check against. - event(Event): The event to check. + room_id(str): The id of the room this event is in or None. + sender(str): The sender of the event + event_type(str): The type of the event. Returns: True if the event passes through the filter. """ @@ -185,8 +214,7 @@ class Filter(object): # and 'not_types' then it is treated as only being in 'not_types') # room checks - if hasattr(event, "room_id"): - room_id = event.room_id + if room_id is not None: allow_rooms = definition.get("rooms", None) reject_rooms = definition.get("not_rooms", None) if reject_rooms and room_id in reject_rooms: @@ -195,9 +223,7 @@ class Filter(object): return False # sender checks - if hasattr(event, "sender"): - # Should we be including event.state_key for some event types? - sender = event.sender + if sender is not None: allow_senders = definition.get("senders", None) reject_senders = definition.get("not_senders", None) if reject_senders and sender in reject_senders: @@ -208,12 +234,12 @@ class Filter(object): # type checks if "not_types" in definition: for def_type in definition["not_types"]: - if self._event_matches_type(event, def_type): + if self._event_matches_type(event_type, def_type): return False if "types" in definition: included = False for def_type in definition["types"]: - if self._event_matches_type(event, def_type): + if self._event_matches_type(event_type, def_type): included = True break if not included: @@ -221,9 +247,9 @@ class Filter(object): return True - def _event_matches_type(self, event, def_type): + def _event_matches_type(self, event_type, def_type): if def_type.endswith("*"): type_prefix = def_type[:-1] - return event.type.startswith(type_prefix) + return event_type.startswith(type_prefix) else: - return event.type == def_type + return event_type == def_type From 889778155811277585debda837c359a4ae471706 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 14:13:51 +0100 Subject: [PATCH 16/22] update filtering tests --- tests/api/test_filtering.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 65b2f590c..6942cdac5 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -345,9 +345,9 @@ class FilteringTestCase(unittest.TestCase): ) @defer.inlineCallbacks - def test_filter_public_user_data_match(self): + def test_filter_presence_match(self): user_filter_json = { - "public_user_data": { + "presence": { "types": ["m.*"] } } @@ -368,13 +368,13 @@ class FilteringTestCase(unittest.TestCase): filter_id=filter_id, ) - results = user_filter.filter_public_user_data(events=events) + results = user_filter.filter_presence(events=events) self.assertEquals(events, results) @defer.inlineCallbacks - def test_filter_public_user_data_no_match(self): + def test_filter_presence_no_match(self): user_filter_json = { - "public_user_data": { + "presence": { "types": ["m.*"] } } @@ -395,7 +395,7 @@ class FilteringTestCase(unittest.TestCase): filter_id=filter_id, ) - results = user_filter.filter_public_user_data(events=events) + results = user_filter.filter_presence(events=events) self.assertEquals([], results) @defer.inlineCallbacks From 7639c3d9e53cdb6222df6a8e1b12bc2a40612367 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 17:13:04 +0100 Subject: [PATCH 17/22] Bounce all deferreds through the reactor to make debugging easier. If all deferreds wait a reactor tick before resolving then there is always a chance to add an errback to the deferred so that stacktraces get reported, rather than being discarded. --- synapse/app/homeserver.py | 2 ++ synapse/util/debug.py | 68 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 synapse/util/debug.py diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index af53acb36..1c84242aa 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,6 +33,8 @@ if __name__ == '__main__': sys.stderr.writelines(message) sys.exit(1) + from synapse.util.debug import debug_deferreds + debug_deferreds() from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain diff --git a/synapse/util/debug.py b/synapse/util/debug.py new file mode 100644 index 000000000..66ac12c29 --- /dev/null +++ b/synapse/util/debug.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor +from functools import wraps +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext + +def with_logging_context(fn): + context = LoggingContext.current_context() + def restore_context_callback(x): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = context + return fn(x) + return restore_context_callback + +def debug_deferreds(): + """Cause all deferreds to wait for a reactor tick before running their + callbacks. This increases the chance of getting a stack trace out of + a defer.inlineCallback since the code waiting on the deferred will get + a chance to add an errback before the deferred runs.""" + + # We are going to modify the __init__ method of defer.Deferred so we + # need to get a copy of the old method so we can still call it. + old__init__ = defer.Deferred.__init__ + + # We need to create a deferred to bounce the callbacks through the reactor + # but we don't want to add a callback when we create that deferred so we + # we create a new type of deferred that uses the old __init__ method. + # This is safe as long as the old __init__ method doesn't invoke an + # __init__ using super. + class Bouncer(defer.Deferred): + __init__ = old__init__ + + # We'll add this as a callback to all Deferreds. Twisted will wait until + # the bouncer deferred resolves before calling the callbacks of the + # original deferred. + def bounce_callback(x): + bouncer = Bouncer() + reactor.callLater(0, with_logging_context(bouncer.callback), x) + return bouncer + + # We'll add this as an errback to all Deferreds. Twisted will wait until + # the bouncer deferred resolves before calling the errbacks of the + # original deferred. + def bounce_errback(x): + bouncer = Bouncer() + reactor.callLater(0, with_logging_context(bouncer.errback), x) + return bouncer + + @wraps(old__init__) + def new__init__(self, *args, **kargs): + old__init__(self, *args, **kargs) + self.addCallbacks(bounce_callback, bounce_errback) + + defer.Deferred.__init__ = new__init__ + From 32d66738b0229aa7f011d203d0cb7963f950bb95 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 17:18:29 +0100 Subject: [PATCH 18/22] Fix pep8 warnings. --- synapse/util/debug.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/util/debug.py b/synapse/util/debug.py index 66ac12c29..f6a5a841a 100644 --- a/synapse/util/debug.py +++ b/synapse/util/debug.py @@ -17,13 +17,6 @@ from twisted.internet import defer, reactor from functools import wraps from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -def with_logging_context(fn): - context = LoggingContext.current_context() - def restore_context_callback(x): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = context - return fn(x) - return restore_context_callback def debug_deferreds(): """Cause all deferreds to wait for a reactor tick before running their @@ -31,6 +24,18 @@ def debug_deferreds(): a defer.inlineCallback since the code waiting on the deferred will get a chance to add an errback before the deferred runs.""" + # Helper method for retrieving and restoring the current logging context + # around a callback. + def with_logging_context(fn): + context = LoggingContext.current_context() + + def restore_context_callback(x): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = context + return fn(x) + + return restore_context_callback + # We are going to modify the __init__ method of defer.Deferred so we # need to get a copy of the old method so we can still call it. old__init__ = defer.Deferred.__init__ @@ -65,4 +70,3 @@ def debug_deferreds(): self.addCallbacks(bounce_callback, bounce_errback) defer.Deferred.__init__ = new__init__ - From 9020860479a9f70ae4d05ddcdc231d7e336474e3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 17:50:44 +0100 Subject: [PATCH 19/22] Only turn on the twisted deferred debugging if full_twisted_stacktraces is set in the config --- synapse/app/homeserver.py | 3 --- synapse/config/logger.py | 8 ++++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1c84242aa..cf2fa221d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,9 +33,6 @@ if __name__ == '__main__': sys.stderr.writelines(message) sys.exit(1) - from synapse.util.debug import debug_deferreds - debug_deferreds() - from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain from synapse.storage.prepare_database import UpgradeDatabaseException diff --git a/synapse/config/logger.py b/synapse/config/logger.py index bd0c17c86..a13dc170c 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -22,6 +22,7 @@ import yaml from string import Template import os import signal +from synapse.util.debug import debug_deferreds DEFAULT_LOG_CONFIG = Template(""" @@ -69,6 +70,8 @@ class LoggingConfig(Config): self.verbosity = config.get("verbose", 0) self.log_config = self.abspath(config.get("log_config")) self.log_file = self.abspath(config.get("log_file")) + if config.get("full_twisted_stacktraces"): + debug_deferreds() def default_config(self, config_dir_path, server_name, **kwargs): log_file = self.abspath("homeserver.log") @@ -84,6 +87,11 @@ class LoggingConfig(Config): # A yaml python logging config file log_config: "%(log_config)s" + + # Stop twisted from discarding the stack traces of exceptions in + # deferreds by waiting a reactor tick before running a deferred's + # callbacks. + # full_twisted_stacktraces: true """ % locals() def read_arguments(self, args): From 1941eb315d692c44b0e21fb3fbf1b95eed138d53 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 18:00:02 +0100 Subject: [PATCH 20/22] Enable stack traces for the demo scripts --- demo/start.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/demo/start.sh b/demo/start.sh index a90561488..d90115ec9 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -38,6 +38,9 @@ for port in 8080 8081 8082; do perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config + echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config + echo "report_stats: false" >> $DIR/etc/$port.config + python -m synapse.app.homeserver \ --config-path "$DIR/etc/$port.config" \ -D \ From 858634e1d0ca489bb546851d6ee052d548870b06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Oct 2015 09:29:08 +0100 Subject: [PATCH 21/22] Remove unused room_id arg --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 10 +++++----- synapse/handlers/sync.py | 2 +- synapse/storage/state.py | 10 +++++----- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3882ba79e..a710bdcfd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -242,7 +242,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), + frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, None), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b70258697..dfeeae76d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -164,7 +164,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): event_id_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), + frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id), @@ -290,7 +290,7 @@ class MessageHandler(BaseHandler): elif member_event.membership == Membership.LEAVE: key = (event_type, state_key) room_state = yield self.store.get_state_for_events( - room_id, [member_event.event_id], [key] + [member_event.event_id], [key] ) data = room_state[member_event.event_id].get(key) @@ -314,7 +314,7 @@ class MessageHandler(BaseHandler): room_state = yield self.state_handler.get_current_state(room_id) elif member_event.membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - room_id, [member_event.event_id], None + [member_event.event_id], None ) room_state = room_state[member_event.event_id] @@ -406,7 +406,7 @@ class MessageHandler(BaseHandler): elif event.membership == Membership.LEAVE: room_end_token = "s%d" % (event.stream_ordering,) deferred_room_state = self.store.get_state_for_events( - event.room_id, [event.event_id], None + [event.event_id], None ) deferred_room_state.addCallback( lambda states: states[event.event_id] @@ -499,7 +499,7 @@ class MessageHandler(BaseHandler): def _room_initial_sync_parted(self, user_id, room_id, pagin_config, member_event): room_state = yield self.store.get_state_for_events( - member_event.room_id, [member_event.event_id], None + [member_event.event_id], None ) room_state = room_state[member_event.event_id] diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9914ff6f9..a8940de16 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -312,7 +312,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): event_id_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), + frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id), diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e935b9443..6f2a50d58 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -54,7 +54,7 @@ class StateStore(SQLBaseStore): defer.returnValue({}) event_to_groups = yield self._get_state_group_for_events( - room_id, event_ids, + event_ids, ) groups = set(event_to_groups.values()) @@ -208,7 +208,7 @@ class StateStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_state_for_events(self, room_id, event_ids, types): + def get_state_for_events(self, event_ids, types): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. @@ -225,7 +225,7 @@ class StateStore(SQLBaseStore): The dicts are mappings from (type, state_key) -> state_events """ event_to_groups = yield self._get_state_group_for_events( - room_id, event_ids, + event_ids, ) groups = set(event_to_groups.values()) @@ -251,8 +251,8 @@ class StateStore(SQLBaseStore): ) @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", - num_args=2) - def _get_state_group_for_events(self, room_id, event_ids): + num_args=1) + def _get_state_group_for_events(self, event_ids): """Returns mapping event_id -> state_group """ def f(txn): From c185c1c4139f9ea1ad5b586a2cfd9f658cffcbb3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 14 Oct 2015 13:16:53 +0100 Subject: [PATCH 22/22] Fix v2 sync polling --- synapse/handlers/sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 380798b7a..7b2d6e345 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -82,7 +82,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ events. """ return bool( - self.private_user_data or self.public_user_data or self.rooms + self.presence or self.joined or self.invited ) @@ -122,8 +122,8 @@ class SyncHandler(BaseHandler): ) result = yield self.notifier.wait_for_events( - sync_config.user, room_ids, - sync_config.filter, timeout, current_sync_callback + sync_config.user, room_ids, timeout, current_sync_callback, + from_token=since_token ) defer.returnValue(result)