From 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 18:57:46 +0100 Subject: [PATCH] WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model. --- .gitignore | 7 + synapse/api/notifier.py | 196 ------------------ synapse/api/streams/event.py | 194 ----------------- synapse/handlers/events.py | 116 ++--------- synapse/handlers/presence.py | 32 --- synapse/handlers/room.py | 46 ++-- synapse/notifier.py | 184 ++++++++++++++++ synapse/rest/events.py | 4 +- synapse/rest/initial_sync.py | 2 +- synapse/rest/room.py | 2 +- synapse/server.py | 7 +- synapse/storage/stream.py | 4 +- synapse/streams/__init__.py | 14 ++ .../streams/__init__.py => streams/config.py} | 59 ++---- synapse/streams/events.py | 149 +++++++++++++ synapse/types.py | 66 ++---- 16 files changed, 432 insertions(+), 650 deletions(-) delete mode 100644 synapse/api/notifier.py delete mode 100644 synapse/api/streams/event.py create mode 100644 synapse/notifier.py create mode 100644 synapse/streams/__init__.py rename synapse/{api/streams/__init__.py => streams/config.py} (58%) create mode 100644 synapse/streams/events.py diff --git a/.gitignore b/.gitignore index 14bd9078f..ab1a3e1aa 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,10 @@ htmlcov demo/*.db demo/*.log +demo/*.pid + +graph/*.svg +graph/*.png +graph/*.dot + +uploads diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py deleted file mode 100644 index ec9c4e513..000000000 --- a/synapse/api/notifier.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent -from synapse.api.streams.event import EventsStreamData - -from twisted.internet import defer -from twisted.internet import reactor - -import logging - -logger = logging.getLogger(__name__) - - -class Notifier(object): - - def __init__(self, hs): - self.store = hs.get_datastore() - self.hs = hs - self.stored_event_listeners = {} - - @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): - """Called when there is a new room event which may potentially be sent - down listening users' event streams. - - This function looks for interested *users* who may want to be notified - for this event. This is different to users requesting from the event - stream which looks for interested *events* for this user. - - Args: - event (SynapseEvent): The new event, which must have a room_id - store_id (int): The ID of this event after it was stored with the - data store. - '""" - member_list = yield self.store.get_room_members(room_id=event.room_id, - membership="join") - if not member_list: - member_list = [] - - member_list = [u.user_id for u in member_list] - - # invites MUST prod the person being invited, who won't be in the room. - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.INVITE): - member_list.append(event.state_key) - # similarly, LEAVEs must be sent to the person leaving - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.LEAVE): - member_list.append(event.state_key) - - for user_id in member_list: - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event.get_dict(), - stream_type=EventsStreamData.EVENT_TYPE, - store_id=store_id) - - def on_new_user_event(self, user_id, event_data, stream_type, store_id): - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event_data, - stream_type=stream_type, - store_id=store_id - ) - - def _notify_and_callback(self, user_id, event_data, stream_type, store_id): - logger.debug( - "Notifying %s of a new event.", - user_id - ) - - stream_ids = list(self.stored_event_listeners[user_id]) - for stream_id in stream_ids: - self._notify_and_callback_stream(user_id, stream_id, event_data, - stream_type, store_id) - - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - - def _notify_and_callback_stream(self, user_id, stream_id, event_data, - stream_type, store_id): - - event_listener = self.stored_event_listeners[user_id].pop(stream_id) - return_event_object = { - k: event_listener[k] for k in ["start", "chunk", "end"] - } - - # work out the new end token - token = event_listener["start"] - end = self._next_token(stream_type, store_id, token) - return_event_object["end"] = end - - # add the event to the chunk - chunk = event_listener["chunk"] - chunk.append(event_data) - - # callback the defer. We know this can't have been resolved before as - # we always remove the event_listener from the map before resolving. - event_listener["defer"].callback(return_event_object) - - def _next_token(self, stream_type, store_id, current_token): - stream_handler = self.hs.get_handlers().event_stream_handler - return stream_handler.get_event_stream_token( - stream_type, - store_id, - current_token - ) - - def store_events_for(self, user_id=None, stream_id=None, from_tok=None): - """Store all incoming events for this user. This should be paired with - get_events_for to return chunked data. - - Args: - user_id (str): The user to monitor incoming events for. - stream (object): The stream that is receiving events - from_tok (str): The token to monitor incoming events from. - """ - event_listener = { - "start": from_tok, - "chunk": [], - "end": from_tok, - "defer": defer.Deferred(), - } - - if user_id not in self.stored_event_listeners: - self.stored_event_listeners[user_id] = {stream_id: event_listener} - else: - self.stored_event_listeners[user_id][stream_id] = event_listener - - def purge_events_for(self, user_id=None, stream_id=None): - """Purges any stored events for this user. - - Args: - user_id (str): The user to purge stored events for. - """ - try: - del self.stored_event_listeners[user_id][stream_id] - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - except KeyError: - pass - - def get_events_for(self, user_id=None, stream_id=None, timeout=0): - """Retrieve stored events for this user, waiting if necessary. - - It is advisable to wrap this call in a maybeDeferred. - - Args: - user_id (str): The user to get events for. - timeout (int): The time in seconds to wait before giving up. - Returns: - A Deferred or a dict containing the chunk data, depending on if - there was data to return yet. The Deferred callback may be None if - there were no events before the timeout expired. - """ - logger.debug("%s is listening for events.", user_id) - - try: - streams = self.stored_event_listeners[user_id][stream_id]["chunk"] - if streams: - logger.debug("%s returning existing chunk.", user_id) - return streams - except KeyError: - return None - - reactor.callLater( - (timeout / 1000.0), self._timeout, user_id, stream_id - ) - return self.stored_event_listeners[user_id][stream_id]["defer"] - - def _timeout(self, user_id, stream_id): - try: - # We remove the event_listener from the map so that we can't - # resolve the deferred twice. - event_listeners = self.stored_event_listeners[user_id] - event_listener = event_listeners.pop(stream_id) - event_listener["defer"].callback(None) - logger.debug("%s event listening timed out.", user_id) - except KeyError: - pass diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py deleted file mode 100644 index fe44a488b..000000000 --- a/synapse/api/streams/event.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains classes for streaming from the event stream: /events. -""" -from twisted.internet import defer - -from synapse.api.errors import EventStreamError -from synapse.api.events import SynapseEvent -from synapse.api.streams import PaginationStream, StreamData - -import logging - -logger = logging.getLogger(__name__) - - -class EventsStreamData(StreamData): - EVENT_TYPE = "EventsStream" - - def __init__(self, hs, room_id=None, feedback=False): - super(EventsStreamData, self).__init__(hs) - self.room_id = room_id - self.with_feedback = feedback - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit, direction): - data, latest_ver = yield self.store.get_room_events( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id, - with_feedback=self.with_feedback - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_room_events_max_id() - defer.returnValue(val) - - -class EventStream(PaginationStream): - - SEPARATOR = '_' - - def __init__(self, user_id, stream_data_list): - super(EventStream, self).__init__() - self.user_id = user_id - self.stream_data = stream_data_list - - @defer.inlineCallbacks - def fix_tokens(self, pagination_config): - pagination_config.from_tok = yield self.fix_token( - pagination_config.from_tok) - pagination_config.to_tok = yield self.fix_token( - pagination_config.to_tok) - - if ( - not pagination_config.to_tok - and pagination_config.direction == 'f' - ): - pagination_config.to_tok = yield self.get_current_max_token() - - logger.debug("pagination_config: %s", pagination_config) - - defer.returnValue(pagination_config) - - @defer.inlineCallbacks - def fix_token(self, token): - """Fixes unknown values in a token to known values. - - Args: - token (str): The token to fix up. - Returns: - The fixed-up token, which may == token. - """ - if token == PaginationStream.TOK_END: - new_token = yield self.get_current_max_token() - - logger.debug("fix_token: From %s to %s", token, new_token) - - token = new_token - - defer.returnValue(token) - - @defer.inlineCallbacks - def get_current_max_token(self): - new_token_parts = [] - for s in self.stream_data: - mx = yield s.max_token() - new_token_parts.append(str(mx)) - - new_token = EventStream.SEPARATOR.join(new_token_parts) - - logger.debug("get_current_max_token: %s", new_token) - - defer.returnValue(new_token) - - @defer.inlineCallbacks - def get_chunk(self, config): - # no support for limit on >1 streams, makes no sense. - if config.limit and len(self.stream_data) > 1: - raise EventStreamError( - 400, "Limit not supported on multiplexed streams." - ) - - chunk_data, next_tok = yield self._get_chunk_data( - config.from_tok, - config.to_tok, - config.limit, - config.direction, - ) - - defer.returnValue({ - "chunk": chunk_data, - "start": config.from_tok, - "end": next_tok - }) - - @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit, direction): - """ Get event data between the two tokens. - - Tokens are SEPARATOR separated values representing pkey values of - certain tables, and the position determines the StreamData invoked - according to the STREAM_DATA list. - - The magic value '-1' can be used to get the latest value. - - Args: - from_tok - The token to start from. - to_tok - The token to end at. Must have values > from_tok or be -1. - Returns: - A list of event data. - Raises: - EventStreamError if something went wrong. - """ - # sanity check - if to_tok is not None: - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") - - chunk = [] - next_ver = [] - for i, (from_pkey, to_pkey) in enumerate(zip( - self._split_token(from_tok), - self._split_token(to_tok) - )): - if from_pkey == to_pkey: - # tokens are the same, we have nothing to do. - next_ver.append(str(to_pkey)) - continue - - (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit, direction, - ) - - chunk.extend([ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in event_chunk - ]) - next_ver.append(str(max_pkey)) - - defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) - - def _split_token(self, token): - """Splits the given token into a list of pkeys. - - Args: - token (str): The token with SEPARATOR values. - Returns: - A list of ints. - """ - if token: - segments = token.split(EventStream.SEPARATOR) - else: - segments = [None] * len(self.stream_data) - return segments diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 6bb797caf..2d7bd5083 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -16,19 +16,15 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.api.streams.event import ( - EventStream, EventsStreamData -) -from synapse.handlers.presence import PresenceStreamData + +import logging + + +logger = logging.getLogger(__name__) class EventStreamHandler(BaseHandler): - stream_data_classes = [ - EventsStreamData, - PresenceStreamData, - ] - def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) @@ -43,104 +39,22 @@ class EventStreamHandler(BaseHandler): self.clock = hs.get_clock() - def get_event_stream_token(self, stream_type, store_id, start_token): - """Return the next token after this event. - - Args: - stream_type (str): The StreamData.EVENT_TYPE - store_id (int): The new storage ID assigned from the data store. - start_token (str): The token the user started with. - Returns: - str: The end token. - """ - for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): - if stream_cls.EVENT_TYPE == stream_type: - # this is the stream for this event, so replace this part of - # the token - store_ids = start_token.split(EventStream.SEPARATOR) - store_ids[i] = str(store_id) - return EventStream.SEPARATOR.join(store_ids) - raise RuntimeError("Didn't find a stream type %s" % stream_type) + self.notifier = hs.get_notifier() @defer.inlineCallbacks def get_stream(self, auth_user_id, pagin_config, timeout=0): - """Gets events as an event stream for this user. - - This function looks for interesting *events* for this user. This is - different from the notifier, which looks for interested *users* who may - want to know about a single event. - - Args: - auth_user_id (str): The user requesting their event stream. - pagin_config (synapse.api.streams.PaginationConfig): The config to - use when obtaining the stream. - timeout (int): The max time to wait for an incoming event in ms. - Returns: - A pagination stream API dict - """ auth_user = self.hs.parse_userid(auth_user_id) - stream_id = object() + if pagin_config.from_token is None: + pagin_config.from_token = None - try: - if auth_user not in self._streams_per_user: - self._streams_per_user[auth_user] = 0 - if auth_user in self._stop_timer_per_user: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user)) - else: - self.distributor.fire( - "started_user_eventstream", auth_user - ) - self._streams_per_user[auth_user] += 1 + events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) - # construct an event stream with the correct data ordering - stream_data_list = [] - for stream_class in EventStreamHandler.stream_data_classes: - stream_data_list.append(stream_class(self.hs)) - event_stream = EventStream(auth_user_id, stream_data_list) + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": tokens[0].to_string(), + "end_token": tokens[1].to_string(), + } - # fix unknown tokens to known tokens - pagin_config = yield event_stream.fix_tokens(pagin_config) + defer.returnValue(chunk) - # register interest in receiving new events - self.notifier.store_events_for(user_id=auth_user_id, - stream_id=stream_id, - from_tok=pagin_config.from_tok) - - # see if we can grab a chunk now - data_chunk = yield event_stream.get_chunk(config=pagin_config) - - # if there are previous events, return those. If not, wait on the - # new events for 'timeout' seconds. - if len(data_chunk["chunk"]) == 0 and timeout != 0: - results = yield defer.maybeDeferred( - self.notifier.get_events_for, - user_id=auth_user_id, - stream_id=stream_id, - timeout=timeout - ) - if results: - defer.returnValue(results) - - defer.returnValue(data_chunk) - finally: - # cleanup - self.notifier.purge_events_for(user_id=auth_user_id, - stream_id=stream_id) - - self._streams_per_user[auth_user] -= 1 - if not self._streams_per_user[auth_user]: - del self._streams_per_user[auth_user] - - # 10 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - self.distributor.fire( - "stopped_user_eventstream", auth_user - ) - del self._stop_timer_per_user[auth_user] - - self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(5, _later) - ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index be10162db..30d6269e2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.api.streams import StreamData from ._base import BaseHandler @@ -682,41 +681,10 @@ class PresenceHandler(BaseHandler): user=observed_user, clock=self.clock ), - stream_type=PresenceStreamData, store_id=statuscache.serial ) -class PresenceStreamData(StreamData): - def __init__(self, hs): - super(PresenceStreamData, self).__init__(hs) - self.presence = hs.get_handlers().presence_handler - - def get_rows(self, user_id, from_key, to_key, limit, direction): - from_key = int(from_key) - to_key = int(to_key) - - cachemap = self.presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial <= to_key] - - if updates: - clock = self.presence.clock - - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) - else: - return (([], self.presence._user_cachemap_latest_serial)) - - def max_token(self): - return self.presence._user_cachemap_latest_serial - -PresenceStreamData.EVENT_TYPE = PresenceStreamData - - class UserPresenceCache(object): """Store an observed user's state and status message. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac9..20b4bbb66 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,6 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -115,13 +113,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources[0] + + if pagin_config.from_token: + from_token = pagin_config.from_token + else: + from_token = yield self.hs.get_event_sources().get_current_token() + + events, next_token = yield data_source.get_pagination_rows( + from_token, pagin_config.to_token, pagin_config.limit, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": from_token.to_string(), + "end_token": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -258,18 +267,15 @@ class MessageHandler(BaseHandler): rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + # FIXME (erikj): We need to not generate this token, + now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources[1] + presence = yield presence_stream.get_new_events_for_user( + user_id, now_token, None, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +297,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key.to_string(), ) d["messages"] = { @@ -305,9 +311,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()} defer.returnValue(ret) diff --git a/synapse/notifier.py b/synapse/notifier.py new file mode 100644 index 000000000..1911fd20a --- /dev/null +++ b/synapse/notifier.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +from synapse.util.logutils import log_function + +import logging + + +logger = logging.getLogger(__name__) + + +class _NotificationListener(object): + def __init__(self, user, from_token, limit, timeout, deferred): + self.user = user + self.from_token = from_token + self.limit = limit + self.timeout = timeout + self.deferred = deferred + + self.signal_key_list = [] + + self.pending_notifications = [] + + def notify(self, notifier, events, start_token, end_token): + result = (events, (start_token, end_token)) + + try: + self.deferred.callback(result) + except defer.AlreadyCalledError: + pass + + for signal, key in self.signal_key_list: + lst = notifier.signal_keys_to_users.get((signal, key), []) + + try: + lst.remove(self) + except: + pass + +class Notifier(object): + + def __init__(self, hs): + self.hs = hs + + self.signal_keys_to_users = {} + + self.event_sources = hs.get_event_sources() + + @log_function + @defer.inlineCallbacks + def on_new_room_event(self, event, store_id): + room_id = event.room_id + + source = self.event_sources.sources[0] + + listeners = self.signal_keys_to_users.get( + (source.SIGNAL_NAME, room_id), + [] + ) + + logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners) + logger.debug("on_new_room_event listeners %s", listeners) + + # TODO (erikj): Can we make this more efficient by hitting the + # db once? + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + key=room_id, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + def on_new_user_event(self, *args, **kwargs): + pass + + def get_events_for(self, user, pagination_config, timeout): + deferred = defer.Deferred() + + self._get_events( + deferred, user, pagination_config.from_token, + pagination_config.limit, timeout + ).addErrback(deferred.errback) + + return deferred + + @defer.inlineCallbacks + def _get_events(self, deferred, user, from_token, limit, timeout): + if not from_token: + from_token = yield self.event_sources.get_current_token() + + listener = _NotificationListener( + user, + from_token, + limit, + timeout, + deferred, + ) + + if timeout: + reactor.callLater(timeout/1000, self._timeout_listener, listener) + + yield self._register_with_keys(listener) + yield self._check_for_updates(listener) + + return + + def _timeout_listener(self, listener): + # TODO (erikj): We should probably set to_token to the current max + # rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) + + @defer.inlineCallbacks + @log_function + def _register_with_keys(self, listener): + signals_keys = {} + + # TODO (erikj): This can probably be replaced by a DeferredList + for source in self.event_sources.sources: + keys = yield source.get_keys_for_user(listener.user) + signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys) + + for signal, keys in signals_keys.items(): + for key in keys: + s = self.signal_keys_to_users.setdefault((signal, key), []) + s.append(listener) + listener.signal_key_list.append((signal, key)) + + logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users) + + defer.returnValue(listener) + + @defer.inlineCallbacks + @log_function + def _check_for_updates(self, listener): + # TODO (erikj): We need to think about limits across multiple sources + events = [] + + from_token = listener.from_token + limit = listener.limit + + # TODO (erikj): DeferredList? + for source in self.event_sources.sources: + stuff, new_token = yield source.get_new_events_for_user( + listener.user, + from_token, + limit, + ) + + events.extend(stuff) + + from_token = new_token + + end_token = from_token + + + if events: + listener.notify(self, events, listener.from_token, end_token) + + defer.returnValue(listener) diff --git a/synapse/rest/events.py b/synapse/rest/events.py index ca2f6978e..28da41849 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig from synapse.rest.base import RestServlet, client_path_pattern @@ -41,11 +41,13 @@ class EventStreamRestServlet(RestServlet): chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, timeout=timeout) + defer.returnValue((200, chunk)) def on_OPTIONS(self, request): return (200, {}) + def register_servlets(hs, http_server): EventStreamRestServlet(hs).register(http_server) diff --git a/synapse/rest/initial_sync.py b/synapse/rest/initial_sync.py index fab748f56..e5059ff78 100644 --- a/synapse/rest/initial_sync.py +++ b/synapse/rest/initial_sync.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig from base import RestServlet, client_path_pattern diff --git a/synapse/rest/room.py b/synapse/rest/room.py index b031ad1bc..ab05a7438 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -22,7 +22,7 @@ from synapse.api.events.room import ( MessageEvent, RoomMemberEvent, FeedbackEvent ) from synapse.api.constants import Feedback -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig import json import logging diff --git a/synapse/server.py b/synapse/server.py index c5b0a3275..5e9b76603 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,7 +22,7 @@ from synapse.federation import initialize_http_replication from synapse.federation.handler import FederationEventHandler from synapse.api.events.factory import EventFactory -from synapse.api.notifier import Notifier +from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers from synapse.rest import RestServletFactory @@ -32,6 +32,7 @@ from synapse.types import UserID, RoomAlias from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.util.lockutils import LockManager +from synapse.streams.events import EventSources class BaseHomeServer(object): @@ -73,6 +74,7 @@ class BaseHomeServer(object): 'resource_for_federation', 'resource_for_web_client', 'resource_for_content_repo', + 'event_sources', ] def __init__(self, hostname, **kwargs): @@ -182,6 +184,9 @@ class HomeServer(BaseHomeServer): def build_distributor(self): return Distributor() + def build_event_sources(self): + return EventSources(self) + def register_servlets(self): """ Register all servlets associated with this HomeServer. """ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cae80563b..6a22d5aea 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -174,7 +174,7 @@ class StreamStore(SQLBaseStore): "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " - "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { @@ -293,5 +293,5 @@ class StreamStore(SQLBaseStore): defer.returnValue("s1") return - key = res[0]["m"] + 1 + key = res[0]["m"] defer.returnValue("s%d" % (key,)) diff --git a/synapse/streams/__init__.py b/synapse/streams/__init__.py new file mode 100644 index 000000000..fe8a073cd --- /dev/null +++ b/synapse/streams/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/api/streams/__init__.py b/synapse/streams/config.py similarity index 58% rename from synapse/api/streams/__init__.py rename to synapse/streams/config.py index 0ba4783ea..b6ffbab1e 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/streams/config.py @@ -16,21 +16,25 @@ from synapse.api.errors import SynapseError from synapse.types import StreamToken +import logging + + +logger = logging.getLogger(__name__) + class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = StreamToken(from_tok) if from_tok else None - self.to_tok = StreamToken(to_tok) if to_tok else None + self.from_token = StreamToken.from_string(from_tok) if from_tok else None + self.to_token = StreamToken.from_string(to_tok) if to_tok else None self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": "END", "direction": 'f', } @@ -48,9 +52,14 @@ class PaginationConfig(object): elif raise_invalid_params: raise SynapseError(400, "%s parameter is invalid." % qp) + if "from_tok" in params and params["from_tok"] == "END": + # TODO (erikj): This is for compatibility only. + del params["from_tok"] + try: return PaginationConfig(**params) except: + logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") def __str__(self): @@ -60,48 +69,4 @@ class PaginationConfig(object): ) % (self.from_tok, self.to_tok, self.direction, self.limit) -class PaginationStream(object): - """ An interface for streaming data as chunks. """ - - TOK_END = "END" - - def get_chunk(self, config=None): - """ Return the next chunk in the stream. - - Args: - config (PaginationConfig): The config to aid which chunk to get. - Returns: - A dict containing the new start token "start", the new end token - "end" and the data "chunk" as a list. - """ - raise NotImplementedError() - - -class StreamData(object): - - """ An interface for obtaining streaming data from a table. """ - - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - - def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): - """ Get event stream data between the specified pkeys. - - Args: - user_id : The user's ID - from_pkey : The starting pkey. - to_pkey : The end pkey. May be -1 to mean "latest". - limit: The max number of results to return. - Returns: - A tuple containing the list of event stream data and the last pkey. - """ - raise NotImplementedError() - - def max_token(self): - """ Get the latest currently-valid token. - - Returns: - The latest token.""" - raise NotImplementedError() diff --git a/synapse/streams/events.py b/synapse/streams/events.py new file mode 100644 index 000000000..1d4467af4 --- /dev/null +++ b/synapse/streams/events.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import Membership +from synapse.types import StreamToken + + +class RoomEventSource(object): + SIGNAL_NAME = "EventStreamSourceSignal" + + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_keys_for_user(self, user): + events = yield self.store.get_rooms_for_user_where_membership_is( + user.to_string(), + (Membership.JOIN,), + ) + + defer.returnValue(set([e.room_id for e in events])) + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_token, limit, key=None): + # We just ignore the key for now. + + to_key = yield self.get_current_token_part() + + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_token.events_key, + to_key=to_key, + room_id=None, + limit=limit, + ) + + end_token = from_token.copy_and_replace("events_key", end_key) + + defer.returnValue((events, end_token)) + + def get_current_token_part(self): + return self.store.get_room_events_max_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, from_token, to_token, limit, key): + to_key = to_token.events_key if to_token else None + + events, next_key = yield self.store.paginate_room_events( + room_id=key, + from_key=from_token.events_key, + to_key=to_key, + direction='b', + limit=limit, + with_feedback=True + ) + + next_token = from_token.copy_and_replace("events_key", next_key) + + defer.returnValue((events, next_token)) + + +class PresenceStreamSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + def get_new_events_for_user(self, user, from_token, limit, key=None): + from_key = int(from_token.presence_key) + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial] + + if updates: + clock = self.clock + + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + end_token = from_token.copy_and_replace( + "presence_key", latest_serial + ) + return ((data, end_token)) + else: + end_token = from_token.copy_and_replace( + "presence_key", presence._user_cachemap_latest_serial + ) + return (([], end_token)) + + def get_keys_for_user(self, user): + raise NotImplementedError("get_keys_for_user") + + def get_current_token_part(self): + presence = self.hs.get_handlers().presence_handler + return presence._user_cachemap_latest_serial + + +class EventSources(object): + SOURCE_TYPES = [ + RoomEventSource, + PresenceStreamSource, + ] + + def __init__(self, hs): + self.sources = [t(hs) for t in EventSources.SOURCE_TYPES] + + @staticmethod + def create_token(events_key, presence_key): + return StreamToken(events_key=events_key, presence_key=presence_key) + + @defer.inlineCallbacks + def get_current_token(self): + events_key = yield self.sources[0].get_current_token_part() + token = EventSources.create_token(events_key, "0") + defer.returnValue(token) + + +class StreamSource(object): + def get_keys_for_user(self, user): + raise NotImplementedError("get_keys_for_user") + + def get_new_events_for_user(self, user, from_token, limit, key=None): + raise NotImplementedError("get_new_events_for_user") + + def get_current_token_part(self): + raise NotImplementedError("get_current_token_part") + + +class PaginationSource(object): + def get_pagination_rows(self, from_token, to_token, limit, key): + raise NotImplementedError("get_rows") + diff --git a/synapse/types.py b/synapse/types.py index baec8a600..c8936b575 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -97,72 +97,32 @@ class RoomID(DomainSpecificString): class StreamToken( namedtuple( "Token", - ("events_type", "topological_key", "stream_key", "presence_key") + ("events_key", "presence_key") ) ): _SEPARATOR = "_" - _TOPOLOGICAL_PREFIX = "t" - _STREAM_PREFIX = "s" - - _TOPOLOGICAL_SEPERATOR = "-" - - TOPOLOGICAL_TYPE = "topo" - STREAM_TYPE = "stream" - @classmethod def from_string(cls, string): try: - events_part, presence_part = string.split(cls._SEPARATOR) - - presence_key = int(presence_part) - - topo_length = len(cls._TOPOLOGICAL_PREFIX) - stream_length = len(cls._STREAM_PREFIX) - if events_part[:topo_length] == cls._TOPOLOGICAL_PREFIX: - # topological event token - topo_tok = events_part[topo_length:] - topo_key, stream_key = topo_tok.split( - cls._TOPOLOGICAL_SEPERATOR, 1 - ) - - topo_key = int(topo_key) - stream_key = int(stream_key) - - events_type = cls.TOPOLOGICAL_TYPE - elif events_part[:stream_length] == cls._STREAM_PREFIX: - topo_key = None - stream_key = int(events_part[stream_length:]) - - events_type = cls.STREAM_TYPE - else: - raise + events_key, presence_key = string.split(cls._SEPARATOR) return cls( - events_type=events_type, - topological_key=topo_key, - stream_key=stream_key, + events_key=events_key, presence_key=presence_key, ) except: raise SynapseError(400, "Invalid Token") def to_string(self): - if self.events_type == self.TOPOLOGICAL_TYPE: - return "".join([ - self._TOPOLOGICAL_PREFIX, - str(self.topological_key), - self._TOPOLOGICAL_SEPERATOR, - str(self.stream_key), - self._SEPARATOR, - str(self.presence_key), - ]) - elif self.events_type == self.STREAM_TYPE: - return "".join([ - self._STREAM_PREFIX, - str(self.stream_key), - self._SEPARATOR, - str(self.presence_key), - ]) + return "".join([ + str(self.events_key), + self._SEPARATOR, + str(self.presence_key), + ]) - raise RuntimeError("Unrecognized event type: %s", self.events_type) + + def copy_and_replace(self, key, new_value): + d = self._asdict() + d[key] = new_value + return StreamToken(**d)