Start implementing the non-incremental sync portion of the v2 /sync API

This commit is contained in:
Mark Haines 2015-01-26 18:53:31 +00:00
parent 3186c5bdbc
commit 436513068d
4 changed files with 145 additions and 53 deletions

View File

@ -89,7 +89,7 @@ def prune_event(event):
return type(event)(allowed_fields) return type(event)(allowed_fields)
def serialize_event(e, time_now_ms, client_event=True): def serialize_event(e, time_now_ms, client_event=True, strip_ids=False):
# FIXME(erikj): To handle the case of presence events and the like # FIXME(erikj): To handle the case of presence events and the like
if not isinstance(e, EventBase): if not isinstance(e, EventBase):
return e return e
@ -138,4 +138,8 @@ def serialize_event(e, time_now_ms, client_event=True):
d.pop("unsigned", None) d.pop("unsigned", None)
d.pop("origin", None) d.pop("origin", None)
if strip_ids:
d.pop("room_id", None)
d.pop("event_id", None)
return d return d

View File

@ -26,6 +26,7 @@ from .presence import PresenceHandler
from .directory import DirectoryHandler from .directory import DirectoryHandler
from .typing import TypingNotificationHandler from .typing import TypingNotificationHandler
from .admin import AdminHandler from .admin import AdminHandler
from .sync import SyncHandler
class Handlers(object): class Handlers(object):
@ -51,3 +52,4 @@ class Handlers(object):
self.directory_handler = DirectoryHandler(hs) self.directory_handler = DirectoryHandler(hs)
self.typing_notification_handler = TypingNotificationHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs)
self.admin_handler = AdminHandler(hs) self.admin_handler = AdminHandler(hs)
self.sync_handler = SyncHandler(hs)

View File

@ -1,26 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import BaseHandler
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership
from twisted.internet import defer
import collections import collections
import logging
logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [ SyncConfig = collections.namedtuple("SyncConfig", [
"user", "user",
"device", "device",
"since",
"limit", "limit",
"gap", "gap",
"sort" "sort",
"backfill" "backfill",
"filter", "filter",
) ])
RoomSyncResult = collections.namedtuple("RoomSyncResult", [ RoomSyncResult = collections.namedtuple("RoomSyncResult", [
"room_id", "room_id",
"limited", "limited",
"published", "published",
"prev_batch", "events", # dict of event
"events",
"state", "state",
"event_map", "prev_batch",
]) ])
@ -41,10 +64,11 @@ class SyncHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(SyncHandler, self).__init__(hs) super(SyncHandler, self).__init__(hs)
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
if timeout == 0: if timeout == 0:
return self.current_sync_for_user(sync_config, since) return self.current_sync_for_user(sync_config, since_token)
else: else:
def current_sync_callback(since_token): def current_sync_callback(since_token):
return self.current_sync_for_user( return self.current_sync_for_user(
@ -53,58 +77,71 @@ class SyncHandler(BaseHandler):
return self.notifier.wait_for_events( return self.notifier.wait_for_events(
sync_config.filter, since_token, current_sync_callback sync_config.filter, since_token, current_sync_callback
) )
defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None): def current_sync_for_user(self, sync_config, since_token=None):
if since_token is None: if since_token is None:
return self.inital_sync(sync_config) return self.initial_sync(sync_config)
else: else:
return self.incremental_sync(sync_config) return self.incremental_sync(sync_config)
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync(self, sync_config): def initial_sync(self, sync_config):
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
presence_stream = self.event_sources.sources["presence"] presence_stream = self.event_sources.sources["presence"]
# TODO (markjh): This looks wrong, shouldn't we be getting the presence # TODO (mjark): This looks wrong, shouldn't we be getting the presence
# UP to the present rather than after the present? # UP to the present rather than after the present?
pagination_config = PaginationConfig(from_token=now_token) pagination_config = PaginationConfig(from_token=now_token)
presence, _ = yield presence_stream.get_pagination_rows( presence, _ = yield presence_stream.get_pagination_rows(
user, pagination_config.get_source_config("presence"), None user=sync_config.user,
pagination_config=pagination_config.get_source_config("presence"),
key=None
) )
room_list = yield self.store.get_rooms_for_user_where_membership_is( room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, user_id=sync_config.user.to_string(),
membership_list=[Membership.INVITE, Membership.JOIN] membership_list=[Membership.INVITE, Membership.JOIN]
) )
# TODO (markjh): Does public mean "published"? # TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True) published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in public_rooms) published_room_ids = set(r["room_id"] for r in published_rooms)
rooms = []
for event in room_list: for event in room_list:
#TODO (mjark): Apply the event filter in sync_config.
messages, token = yield self.store.get_recent_events_for_room( recent_events, token = yield self.store.get_recent_events_for_room(
event.room_id, event.room_id,
limit=sync_config.limit, limit=sync_config.limit,
end_token=now_token.room_key, end_token=now_token.room_key,
) )
prev_batch_token = now_token.copy_and_replace("room_key", token[0]) prev_batch_token = now_token.copy_and_replace("room_key", token[0])
current_state = yield self.state_handler.get_current_state( current_state_events = yield self.state_handler.get_current_state(
event.room_id event.room_id
) )
rooms.append(RoomSyncResult( rooms.append(RoomSyncResult(
room_id=event.room_id, room_id=event.room_id,
published=event.room_id in published_room_ids, published=event.room_id in published_room_ids,
events=recent_events,
prev_batch=prev_batch_token,
state=current_state_events,
limited=True,
))
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
rooms=rooms,
next_batch=now_token,
))
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync(self, sync_config): def incremental_sync(self, sync_config):
pass

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd # Copyright 2015 OpenMarket Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -16,6 +16,9 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.http.servlet import RestServlet from synapse.http.servlet import RestServlet
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
from synapse.events.utils import serialize_event
from ._base import client_v2_pattern from ._base import client_v2_pattern
import logging import logging
@ -73,14 +76,15 @@ class SyncRestServlet(RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(SyncRestServlet, self).__init__() super(SyncRestServlet, self).__init__()
self.auth = hs.get_auth() self.auth = hs.get_auth()
#self.sync_handler = hs.get_handlers().sync_hanlder self.sync_handler = hs.get_handlers().sync_handler
self.clock = hs.get_clock()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
user = yield self.auth.get_user_by_req(request) user = yield self.auth.get_user_by_req(request)
timeout = self.parse_integer(request, "timeout", default=0) timeout = self.parse_integer(request, "timeout", default=0)
limit = self.parse_integer(request, "limit", default=None) limit = self.parse_integer(request, "limit", required=True)
gap = self.parse_boolean(request, "gap", default=True) gap = self.parse_boolean(request, "gap", default=True)
sort = self.parse_string( sort = self.parse_string(
request, "sort", default="timeline,asc", request, "sort", default="timeline,asc",
@ -91,7 +95,7 @@ class SyncRestServlet(RestServlet):
request, "set_presence", default="online", request, "set_presence", default="online",
allowed_values=self.ALLOWED_PRESENCE allowed_values=self.ALLOWED_PRESENCE
) )
backfill = self.parse_boolean(request, "backfill", default=True) backfill = self.parse_boolean(request, "backfill", default=False)
filter_id = self.parse_string(request, "filter", default=None) filter_id = self.parse_string(request, "filter", default=None)
logger.info( logger.info(
@ -108,36 +112,81 @@ class SyncRestServlet(RestServlet):
# if filter.matches(event): # if filter.matches(event):
# # stuff # # stuff
# if timeout != 0: sync_config = SyncConfig(
# register for updates from the event stream user=user,
device="TODO", # TODO(mjark) Get the device_id from access_token
gap=gap,
limit=limit,
sort=sort,
backfill=backfill,
filter="TODO", # TODO(mjark) Add the filter to the config.
)
#rooms = [] if since is not None:
since_token = StreamToken.from_string(since)
if gap:
pass
# now_stream_token = get_current_stream_token
# for room_id in get_rooms_for_user(user, filter=filter):
# state, events, start, end, limited, published = updates_for_room(
# from=since, to=now_stream_token, limit=limit,
# anchor_to_start=False
# )
# rooms[room_id] = (state, events, start, limited, published)
# next_stream_token = now.
else: else:
pass since_token = None
# now_stream_token = get_current_stream_token
# for room_id in get_rooms_for_user(user, filter=filter)
# state, events, start, end, limited, published = updates_for_room(
# from=since, to=now_stream_token, limit=limit,
# anchor_to_start=False
# )
# next_stream_token = min(next_stream_token, end)
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
)
response_content = {} time_now = self.clock.time_msec()
response_content = {
"public_user_data": self.encode_events(
sync_result.public_user_data, filter, time_now
),
"private_user_data": self.encode_events(
sync_result.private_user_data, filter, time_now
),
"rooms": self.encode_rooms(sync_result.rooms, filter, time_now),
"next_batch": sync_result.next_batch.to_string(),
}
defer.returnValue((200, response_content)) defer.returnValue((200, response_content))
def encode_events(self, events, filter, time_now):
return [self.encode_event(event, filter, time_now) for event in events]
@staticmethod
def encode_event(event, filter, time_now):
# TODO(mjark): Respect formatting requirements in the filter.
return serialize_event(event, time_now)
def encode_rooms(self, rooms, filter, time_now):
return [self.encode_room(room, filter, time_now) for room in rooms]
@staticmethod
def encode_room(room, filter, time_now):
event_map = {}
state_event_ids = []
recent_event_ids = []
for event in room.state:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
event, time_now, strip_ids=True
)
state_event_ids.append(event.event_id)
for event in room.events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
event, time_now, strip_ids=True
)
recent_event_ids.append(event.event_id)
return {
"room_id": room.room_id,
"event_map": event_map,
"events": {
"batch": recent_event_ids,
"prev_batch": room.prev_batch.to_string(),
},
"state": state_event_ids,
"limited": room.limited,
"published": room.published,
}
def register_servlets(hs, http_server): def register_servlets(hs, http_server):
SyncRestServlet(hs).register(http_server) SyncRestServlet(hs).register(http_server)