2014-08-12 15:10:52 +01:00

150 lines
5.8 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
RoomDataStreamData
)
from synapse.handlers.presence import PresenceStreamData
class EventStreamHandler(BaseHandler):
stream_data_classes = [
MessagesStreamData,
RoomMemberStreamData,
FeedbackStreamData,
RoomDataStreamData,
PresenceStreamData,
]
def __init__(self, hs):
super(EventStreamHandler, self).__init__(hs)
# Count of active streams per user
self._streams_per_user = {}
# Grace timers per user to delay the "stopped" signal
self._stop_timer_per_user = {}
self.distributor = hs.get_distributor()
self.distributor.declare("started_user_eventstream")
self.distributor.declare("stopped_user_eventstream")
self.clock = hs.get_clock()
def get_event_stream_token(self, stream_type, store_id, start_token):
"""Return the next token after this event.
Args:
stream_type (str): The StreamData.EVENT_TYPE
store_id (int): The new storage ID assigned from the data store.
start_token (str): The token the user started with.
Returns:
str: The end token.
"""
for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
if stream_cls.EVENT_TYPE == stream_type:
# this is the stream for this event, so replace this part of
# the token
store_ids = start_token.split(EventStream.SEPARATOR)
store_ids[i] = str(store_id)
return EventStream.SEPARATOR.join(store_ids)
raise RuntimeError("Didn't find a stream type %s" % stream_type)
@defer.inlineCallbacks
def get_stream(self, auth_user_id, pagin_config, timeout=0):
"""Gets events as an event stream for this user.
This function looks for interesting *events* for this user. This is
different from the notifier, which looks for interested *users* who may
want to know about a single event.
Args:
auth_user_id (str): The user requesting their event stream.
pagin_config (synapse.api.streams.PaginationConfig): The config to
use when obtaining the stream.
timeout (int): The max time to wait for an incoming event in ms.
Returns:
A pagination stream API dict
"""
auth_user = self.hs.parse_userid(auth_user_id)
stream_id = object()
try:
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
if auth_user in self._stop_timer_per_user:
self.clock.cancel_call_later(
self._stop_timer_per_user.pop(auth_user))
else:
self.distributor.fire(
"started_user_eventstream", auth_user
)
self._streams_per_user[auth_user] += 1
# construct an event stream with the correct data ordering
stream_data_list = []
for stream_class in EventStreamHandler.stream_data_classes:
stream_data_list.append(stream_class(self.hs))
event_stream = EventStream(auth_user_id, stream_data_list)
# fix unknown tokens to known tokens
pagin_config = yield event_stream.fix_tokens(pagin_config)
# register interest in receiving new events
self.notifier.store_events_for(user_id=auth_user_id,
stream_id=stream_id,
from_tok=pagin_config.from_tok)
# see if we can grab a chunk now
data_chunk = yield event_stream.get_chunk(config=pagin_config)
# if there are previous events, return those. If not, wait on the
# new events for 'timeout' seconds.
if len(data_chunk["chunk"]) == 0 and timeout != 0:
results = yield defer.maybeDeferred(
self.notifier.get_events_for,
user_id=auth_user_id,
stream_id=stream_id,
timeout=timeout
)
if results:
defer.returnValue(results)
defer.returnValue(data_chunk)
finally:
# cleanup
self.notifier.purge_events_for(user_id=auth_user_id,
stream_id=stream_id)
self._streams_per_user[auth_user] -= 1
if not self._streams_per_user[auth_user]:
del self._streams_per_user[auth_user]
# 10 seconds of grace to allow the client to reconnect again
# before we think they're gone
def _later():
self.distributor.fire(
"stopped_user_eventstream", auth_user
)
del self._stop_timer_per_user[auth_user]
self._stop_timer_per_user[auth_user] = (
self.clock.call_later(5, _later)
)