Merge branch 'stream_refactor' into develop

This commit is contained in:
Erik Johnston 2014-08-27 16:11:43 +01:00
commit 89c044c2a0
17 changed files with 595 additions and 632 deletions

7
.gitignore vendored
View File

@ -17,3 +17,10 @@ htmlcov
demo/*.db demo/*.db
demo/*.log demo/*.log
demo/*.pid
graph/*.svg
graph/*.png
graph/*.dot
uploads

View File

@ -1,196 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
from synapse.api.streams.event import EventsStreamData
from twisted.internet import defer
from twisted.internet import reactor
import logging
logger = logging.getLogger(__name__)
class Notifier(object):
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.stored_event_listeners = {}
@defer.inlineCallbacks
def on_new_room_event(self, event, store_id):
"""Called when there is a new room event which may potentially be sent
down listening users' event streams.
This function looks for interested *users* who may want to be notified
for this event. This is different to users requesting from the event
stream which looks for interested *events* for this user.
Args:
event (SynapseEvent): The new event, which must have a room_id
store_id (int): The ID of this event after it was stored with the
data store.
'"""
member_list = yield self.store.get_room_members(room_id=event.room_id,
membership="join")
if not member_list:
member_list = []
member_list = [u.user_id for u in member_list]
# invites MUST prod the person being invited, who won't be in the room.
if (event.type == RoomMemberEvent.TYPE and
event.content["membership"] == Membership.INVITE):
member_list.append(event.state_key)
# similarly, LEAVEs must be sent to the person leaving
if (event.type == RoomMemberEvent.TYPE and
event.content["membership"] == Membership.LEAVE):
member_list.append(event.state_key)
for user_id in member_list:
if user_id in self.stored_event_listeners:
self._notify_and_callback(
user_id=user_id,
event_data=event.get_dict(),
stream_type=EventsStreamData.EVENT_TYPE,
store_id=store_id)
def on_new_user_event(self, user_id, event_data, stream_type, store_id):
if user_id in self.stored_event_listeners:
self._notify_and_callback(
user_id=user_id,
event_data=event_data,
stream_type=stream_type,
store_id=store_id
)
def _notify_and_callback(self, user_id, event_data, stream_type, store_id):
logger.debug(
"Notifying %s of a new event.",
user_id
)
stream_ids = list(self.stored_event_listeners[user_id])
for stream_id in stream_ids:
self._notify_and_callback_stream(user_id, stream_id, event_data,
stream_type, store_id)
if not self.stored_event_listeners[user_id]:
del self.stored_event_listeners[user_id]
def _notify_and_callback_stream(self, user_id, stream_id, event_data,
stream_type, store_id):
event_listener = self.stored_event_listeners[user_id].pop(stream_id)
return_event_object = {
k: event_listener[k] for k in ["start", "chunk", "end"]
}
# work out the new end token
token = event_listener["start"]
end = self._next_token(stream_type, store_id, token)
return_event_object["end"] = end
# add the event to the chunk
chunk = event_listener["chunk"]
chunk.append(event_data)
# callback the defer. We know this can't have been resolved before as
# we always remove the event_listener from the map before resolving.
event_listener["defer"].callback(return_event_object)
def _next_token(self, stream_type, store_id, current_token):
stream_handler = self.hs.get_handlers().event_stream_handler
return stream_handler.get_event_stream_token(
stream_type,
store_id,
current_token
)
def store_events_for(self, user_id=None, stream_id=None, from_tok=None):
"""Store all incoming events for this user. This should be paired with
get_events_for to return chunked data.
Args:
user_id (str): The user to monitor incoming events for.
stream (object): The stream that is receiving events
from_tok (str): The token to monitor incoming events from.
"""
event_listener = {
"start": from_tok,
"chunk": [],
"end": from_tok,
"defer": defer.Deferred(),
}
if user_id not in self.stored_event_listeners:
self.stored_event_listeners[user_id] = {stream_id: event_listener}
else:
self.stored_event_listeners[user_id][stream_id] = event_listener
def purge_events_for(self, user_id=None, stream_id=None):
"""Purges any stored events for this user.
Args:
user_id (str): The user to purge stored events for.
"""
try:
del self.stored_event_listeners[user_id][stream_id]
if not self.stored_event_listeners[user_id]:
del self.stored_event_listeners[user_id]
except KeyError:
pass
def get_events_for(self, user_id=None, stream_id=None, timeout=0):
"""Retrieve stored events for this user, waiting if necessary.
It is advisable to wrap this call in a maybeDeferred.
Args:
user_id (str): The user to get events for.
timeout (int): The time in seconds to wait before giving up.
Returns:
A Deferred or a dict containing the chunk data, depending on if
there was data to return yet. The Deferred callback may be None if
there were no events before the timeout expired.
"""
logger.debug("%s is listening for events.", user_id)
try:
streams = self.stored_event_listeners[user_id][stream_id]["chunk"]
if streams:
logger.debug("%s returning existing chunk.", user_id)
return streams
except KeyError:
return None
reactor.callLater(
(timeout / 1000.0), self._timeout, user_id, stream_id
)
return self.stored_event_listeners[user_id][stream_id]["defer"]
def _timeout(self, user_id, stream_id):
try:
# We remove the event_listener from the map so that we can't
# resolve the deferred twice.
event_listeners = self.stored_event_listeners[user_id]
event_listener = event_listeners.pop(stream_id)
event_listener["defer"].callback(None)
logger.debug("%s event listening timed out.", user_id)
except KeyError:
pass

View File

@ -1,103 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.errors import SynapseError
class PaginationConfig(object):
"""A configuration object which stores pagination parameters."""
def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0):
self.from_tok = from_tok
self.to_tok = to_tok
self.direction = direction
self.limit = limit
@classmethod
def from_request(cls, request, raise_invalid_params=True):
params = {
"from_tok": "END",
"direction": 'f',
}
query_param_mappings = [ # 3-tuple of qp_key, attribute, rules
("from", "from_tok", lambda x: type(x) == str),
("to", "to_tok", lambda x: type(x) == str),
("limit", "limit", lambda x: x.isdigit()),
("dir", "direction", lambda x: x == 'f' or x == 'b'),
]
for qp, attr, is_valid in query_param_mappings:
if qp in request.args:
if is_valid(request.args[qp][0]):
params[attr] = request.args[qp][0]
elif raise_invalid_params:
raise SynapseError(400, "%s parameter is invalid." % qp)
return PaginationConfig(**params)
def __str__(self):
return (
"<PaginationConfig from_tok=%s, to_tok=%s, "
"direction=%s, limit=%s>"
) % (self.from_tok, self.to_tok, self.direction, self.limit)
class PaginationStream(object):
""" An interface for streaming data as chunks. """
TOK_END = "END"
def get_chunk(self, config=None):
""" Return the next chunk in the stream.
Args:
config (PaginationConfig): The config to aid which chunk to get.
Returns:
A dict containing the new start token "start", the new end token
"end" and the data "chunk" as a list.
"""
raise NotImplementedError()
class StreamData(object):
""" An interface for obtaining streaming data from a table. """
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
def get_rows(self, user_id, from_pkey, to_pkey, limit, direction):
""" Get event stream data between the specified pkeys.
Args:
user_id : The user's ID
from_pkey : The starting pkey.
to_pkey : The end pkey. May be -1 to mean "latest".
limit: The max number of results to return.
Returns:
A tuple containing the list of event stream data and the last pkey.
"""
raise NotImplementedError()
def max_token(self):
""" Get the latest currently-valid token.
Returns:
The latest token."""
raise NotImplementedError()

View File

@ -1,194 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains classes for streaming from the event stream: /events.
"""
from twisted.internet import defer
from synapse.api.errors import EventStreamError
from synapse.api.events import SynapseEvent
from synapse.api.streams import PaginationStream, StreamData
import logging
logger = logging.getLogger(__name__)
class EventsStreamData(StreamData):
EVENT_TYPE = "EventsStream"
def __init__(self, hs, room_id=None, feedback=False):
super(EventsStreamData, self).__init__(hs)
self.room_id = room_id
self.with_feedback = feedback
@defer.inlineCallbacks
def get_rows(self, user_id, from_key, to_key, limit, direction):
data, latest_ver = yield self.store.get_room_events(
user_id=user_id,
from_key=from_key,
to_key=to_key,
limit=limit,
room_id=self.room_id,
with_feedback=self.with_feedback
)
defer.returnValue((data, latest_ver))
@defer.inlineCallbacks
def max_token(self):
val = yield self.store.get_room_events_max_id()
defer.returnValue(val)
class EventStream(PaginationStream):
SEPARATOR = '_'
def __init__(self, user_id, stream_data_list):
super(EventStream, self).__init__()
self.user_id = user_id
self.stream_data = stream_data_list
@defer.inlineCallbacks
def fix_tokens(self, pagination_config):
pagination_config.from_tok = yield self.fix_token(
pagination_config.from_tok)
pagination_config.to_tok = yield self.fix_token(
pagination_config.to_tok)
if (
not pagination_config.to_tok
and pagination_config.direction == 'f'
):
pagination_config.to_tok = yield self.get_current_max_token()
logger.debug("pagination_config: %s", pagination_config)
defer.returnValue(pagination_config)
@defer.inlineCallbacks
def fix_token(self, token):
"""Fixes unknown values in a token to known values.
Args:
token (str): The token to fix up.
Returns:
The fixed-up token, which may == token.
"""
if token == PaginationStream.TOK_END:
new_token = yield self.get_current_max_token()
logger.debug("fix_token: From %s to %s", token, new_token)
token = new_token
defer.returnValue(token)
@defer.inlineCallbacks
def get_current_max_token(self):
new_token_parts = []
for s in self.stream_data:
mx = yield s.max_token()
new_token_parts.append(str(mx))
new_token = EventStream.SEPARATOR.join(new_token_parts)
logger.debug("get_current_max_token: %s", new_token)
defer.returnValue(new_token)
@defer.inlineCallbacks
def get_chunk(self, config):
# no support for limit on >1 streams, makes no sense.
if config.limit and len(self.stream_data) > 1:
raise EventStreamError(
400, "Limit not supported on multiplexed streams."
)
chunk_data, next_tok = yield self._get_chunk_data(
config.from_tok,
config.to_tok,
config.limit,
config.direction,
)
defer.returnValue({
"chunk": chunk_data,
"start": config.from_tok,
"end": next_tok
})
@defer.inlineCallbacks
def _get_chunk_data(self, from_tok, to_tok, limit, direction):
""" Get event data between the two tokens.
Tokens are SEPARATOR separated values representing pkey values of
certain tables, and the position determines the StreamData invoked
according to the STREAM_DATA list.
The magic value '-1' can be used to get the latest value.
Args:
from_tok - The token to start from.
to_tok - The token to end at. Must have values > from_tok or be -1.
Returns:
A list of event data.
Raises:
EventStreamError if something went wrong.
"""
# sanity check
if to_tok is not None:
if (from_tok.count(EventStream.SEPARATOR) !=
to_tok.count(EventStream.SEPARATOR) or
(from_tok.count(EventStream.SEPARATOR) + 1) !=
len(self.stream_data)):
raise EventStreamError(400, "Token lengths don't match.")
chunk = []
next_ver = []
for i, (from_pkey, to_pkey) in enumerate(zip(
self._split_token(from_tok),
self._split_token(to_tok)
)):
if from_pkey == to_pkey:
# tokens are the same, we have nothing to do.
next_ver.append(str(to_pkey))
continue
(event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
self.user_id, from_pkey, to_pkey, limit, direction,
)
chunk.extend([
e.get_dict() if isinstance(e, SynapseEvent) else e
for e in event_chunk
])
next_ver.append(str(max_pkey))
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
def _split_token(self, token):
"""Splits the given token into a list of pkeys.
Args:
token (str): The token with SEPARATOR values.
Returns:
A list of ints.
"""
if token:
segments = token.split(EventStream.SEPARATOR)
else:
segments = [None] * len(self.stream_data)
return segments

View File

@ -15,20 +15,18 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.events import SynapseEvent
from ._base import BaseHandler from ._base import BaseHandler
from synapse.api.streams.event import (
EventStream, EventsStreamData import logging
)
from synapse.handlers.presence import PresenceStreamData
logger = logging.getLogger(__name__)
class EventStreamHandler(BaseHandler): class EventStreamHandler(BaseHandler):
stream_data_classes = [
EventsStreamData,
PresenceStreamData,
]
def __init__(self, hs): def __init__(self, hs):
super(EventStreamHandler, self).__init__(hs) super(EventStreamHandler, self).__init__(hs)
@ -43,45 +41,12 @@ class EventStreamHandler(BaseHandler):
self.clock = hs.get_clock() self.clock = hs.get_clock()
def get_event_stream_token(self, stream_type, store_id, start_token): self.notifier = hs.get_notifier()
"""Return the next token after this event.
Args:
stream_type (str): The StreamData.EVENT_TYPE
store_id (int): The new storage ID assigned from the data store.
start_token (str): The token the user started with.
Returns:
str: The end token.
"""
for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
if stream_cls.EVENT_TYPE == stream_type:
# this is the stream for this event, so replace this part of
# the token
store_ids = start_token.split(EventStream.SEPARATOR)
store_ids[i] = str(store_id)
return EventStream.SEPARATOR.join(store_ids)
raise RuntimeError("Didn't find a stream type %s" % stream_type)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_stream(self, auth_user_id, pagin_config, timeout=0): def get_stream(self, auth_user_id, pagin_config, timeout=0):
"""Gets events as an event stream for this user.
This function looks for interesting *events* for this user. This is
different from the notifier, which looks for interested *users* who may
want to know about a single event.
Args:
auth_user_id (str): The user requesting their event stream.
pagin_config (synapse.api.streams.PaginationConfig): The config to
use when obtaining the stream.
timeout (int): The max time to wait for an incoming event in ms.
Returns:
A pagination stream API dict
"""
auth_user = self.hs.parse_userid(auth_user_id) auth_user = self.hs.parse_userid(auth_user_id)
stream_id = object()
try: try:
if auth_user not in self._streams_per_user: if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0 self._streams_per_user[auth_user] = 0
@ -94,41 +59,30 @@ class EventStreamHandler(BaseHandler):
) )
self._streams_per_user[auth_user] += 1 self._streams_per_user[auth_user] += 1
# construct an event stream with the correct data ordering if pagin_config.from_token is None:
stream_data_list = [] pagin_config.from_token = None
for stream_class in EventStreamHandler.stream_data_classes:
stream_data_list.append(stream_class(self.hs))
event_stream = EventStream(auth_user_id, stream_data_list)
# fix unknown tokens to known tokens rm_handler = self.hs.get_handlers().room_member_handler
pagin_config = yield event_stream.fix_tokens(pagin_config) room_ids = yield rm_handler.get_rooms_for_user(auth_user)
# register interest in receiving new events events, tokens = yield self.notifier.get_events_for(
self.notifier.store_events_for(user_id=auth_user_id, auth_user, room_ids, pagin_config, timeout
stream_id=stream_id,
from_tok=pagin_config.from_tok)
# see if we can grab a chunk now
data_chunk = yield event_stream.get_chunk(config=pagin_config)
# if there are previous events, return those. If not, wait on the
# new events for 'timeout' seconds.
if len(data_chunk["chunk"]) == 0 and timeout != 0:
results = yield defer.maybeDeferred(
self.notifier.get_events_for,
user_id=auth_user_id,
stream_id=stream_id,
timeout=timeout
) )
if results:
defer.returnValue(results)
defer.returnValue(data_chunk) chunks = [
e.get_dict() if isinstance(e, SynapseEvent) else e
for e in events
]
chunk = {
"chunk": chunks,
"start": tokens[0].to_string(),
"end": tokens[1].to_string(),
}
defer.returnValue(chunk)
finally: finally:
# cleanup
self.notifier.purge_events_for(user_id=auth_user_id,
stream_id=stream_id)
self._streams_per_user[auth_user] -= 1 self._streams_per_user[auth_user] -= 1
if not self._streams_per_user[auth_user]: if not self._streams_per_user[auth_user]:
del self._streams_per_user[auth_user] del self._streams_per_user[auth_user]

View File

@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.api.streams import StreamData
from ._base import BaseHandler from ._base import BaseHandler
@ -677,46 +676,10 @@ class PresenceHandler(BaseHandler):
statuscache.make_event(user=observed_user, clock=self.clock) statuscache.make_event(user=observed_user, clock=self.clock)
self.notifier.on_new_user_event( self.notifier.on_new_user_event(
observer_user.to_string(), [observer_user],
event_data=statuscache.make_event(
user=observed_user,
clock=self.clock
),
stream_type=PresenceStreamData,
store_id=statuscache.serial
) )
class PresenceStreamData(StreamData):
def __init__(self, hs):
super(PresenceStreamData, self).__init__(hs)
self.presence = hs.get_handlers().presence_handler
def get_rows(self, user_id, from_key, to_key, limit, direction):
from_key = int(from_key)
to_key = int(to_key)
cachemap = self.presence._user_cachemap
# TODO(paul): limit, and filter by visibility
updates = [(k, cachemap[k]) for k in cachemap
if from_key < cachemap[k].serial <= to_key]
if updates:
clock = self.presence.clock
latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
return ((data, latest_serial))
else:
return (([], self.presence._user_cachemap_latest_serial))
def max_token(self):
return self.presence._user_cachemap_latest_serial
PresenceStreamData.EVENT_TYPE = PresenceStreamData
class UserPresenceCache(object): class UserPresenceCache(object):
"""Store an observed user's state and status message. """Store an observed user's state and status message.

View File

@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError
from synapse.api.events.room import ( from synapse.api.events.room import (
RoomTopicEvent, RoomMemberEvent, RoomConfigEvent RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
) )
from synapse.api.streams.event import EventStream, EventsStreamData from synapse.streams.config import PaginationConfig
from synapse.handlers.presence import PresenceStreamData
from synapse.util import stringutils from synapse.util import stringutils
from ._base import BaseHandler from ._base import BaseHandler
@ -115,13 +114,24 @@ class MessageHandler(BaseHandler):
""" """
yield self.auth.check_joined_room(room_id, user_id) yield self.auth.check_joined_room(room_id, user_id)
data_source = [ data_source = self.hs.get_event_sources().sources["room"]
EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
] if not pagin_config.from_token:
event_stream = EventStream(user_id, data_source) pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
pagin_config = yield event_stream.fix_tokens(pagin_config)
data_chunk = yield event_stream.get_chunk(config=pagin_config) user = self.hs.parse_userid(user_id)
defer.returnValue(data_chunk)
events, next_token = yield data_source.get_pagination_rows(
user, pagin_config, room_id
)
chunk = {
"chunk": [e.get_dict() for e in events],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
defer.returnValue(chunk)
@defer.inlineCallbacks @defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True): def store_room_data(self, event=None, stamp_event=True):
@ -256,20 +266,20 @@ class MessageHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN] membership_list=[Membership.INVITE, Membership.JOIN]
) )
user = self.hs.parse_userid(user_id)
rooms_ret = [] rooms_ret = []
now_rooms_token = yield self.store.get_room_events_max_id() # FIXME (erikj): We need to not generate this token,
now_token = yield self.hs.get_event_sources().get_current_token()
# FIXME (erikj): Fix this. # FIXME (erikj): Fix this.
presence_stream = PresenceStreamData(self.hs) presence_stream = self.hs.get_event_sources().sources["presence"]
now_presence_token = yield presence_stream.max_token() pagination_config = PaginationConfig(from_token=now_token)
presence = yield presence_stream.get_rows( presence, _ = yield presence_stream.get_pagination_rows(
user_id, 0, now_presence_token, None, None user, pagination_config, None
) )
# FIXME (erikj): We need to not generate this token,
now_token = "%s_%s" % (now_rooms_token, now_presence_token)
limit = pagin_config.limit limit = pagin_config.limit
if not limit: if not limit:
limit = 10 limit = 10
@ -291,7 +301,7 @@ class MessageHandler(BaseHandler):
messages, token = yield self.store.get_recent_events_for_room( messages, token = yield self.store.get_recent_events_for_room(
event.room_id, event.room_id,
limit=limit, limit=limit,
end_token=now_rooms_token, end_token=now_token.events_key,
) )
d["messages"] = { d["messages"] = {
@ -300,14 +310,18 @@ class MessageHandler(BaseHandler):
"end": token[1], "end": token[1],
} }
current_state = yield self.store.get_current_state(event.room_id) current_state = yield self.store.get_current_state(
event.room_id
)
d["state"] = [c.get_dict() for c in current_state] d["state"] = [c.get_dict() for c in current_state]
except: except:
logger.exception("Failed to get snapshot") logger.exception("Failed to get snapshot")
ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} ret = {
"rooms": rooms_ret,
# logger.debug("snapshot_all_rooms returning: %s", ret) "presence": presence,
"end": now_token.to_string()
}
defer.returnValue(ret) defer.returnValue(ret)
@ -490,7 +504,7 @@ class RoomMemberHandler(BaseHandler):
for entry in member_list for entry in member_list
] ]
chunk_data = { chunk_data = {
"start": "START", # FIXME (erikj): START is no longer a valid value "start": "START", # FIXME (erikj): START is no longer valid
"end": "END", "end": "END",
"chunk": event_list "chunk": event_list
} }

201
synapse/notifier.py Normal file
View File

@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from synapse.util.logutils import log_function
import logging
logger = logging.getLogger(__name__)
class _NotificationListener(object):
def __init__(self, user, rooms, from_token, limit, timeout, deferred):
self.user = user
self.from_token = from_token
self.limit = limit
self.timeout = timeout
self.deferred = deferred
self.rooms = rooms
self.pending_notifications = []
def notify(self, notifier, events, start_token, end_token):
result = (events, (start_token, end_token))
try:
self.deferred.callback(result)
except defer.AlreadyCalledError:
pass
for room in self.rooms:
lst = notifier.rooms_to_listeners.get(room, set())
lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self)
class Notifier(object):
def __init__(self, hs):
self.hs = hs
self.rooms_to_listeners = {}
self.user_to_listeners = {}
self.event_sources = hs.get_event_sources()
hs.get_distributor().observe(
"user_joined_room", self._user_joined_room
)
@log_function
@defer.inlineCallbacks
def on_new_room_event(self, event, extra_users=[]):
room_id = event.room_id
source = self.event_sources.sources["room"]
listeners = self.rooms_to_listeners.get(room_id, set()).copy()
for user in extra_users:
listeners |= self.user_to_listeners.get(user, set()).copy()
logger.debug("on_new_room_event listeners %s", listeners)
# TODO (erikj): Can we make this more efficient by hitting the
# db once?
for listener in listeners:
events, end_token = yield source.get_new_events_for_user(
listener.user,
listener.from_token,
listener.limit,
)
if events:
listener.notify(
self, events, listener.from_token, end_token
)
@defer.inlineCallbacks
def on_new_user_event(self, users=[], rooms=[]):
source = self.event_sources.sources["presence"]
listeners = set()
for user in users:
listeners |= self.user_to_listeners.get(user, set()).copy()
for room in rooms:
listeners |= self.rooms_to_listeners.get(room, set()).copy()
for listener in listeners:
events, end_token = yield source.get_new_events_for_user(
listener.user,
listener.from_token,
listener.limit,
)
if events:
listener.notify(
self, events, listener.from_token, end_token
)
def get_events_for(self, user, rooms, pagination_config, timeout):
deferred = defer.Deferred()
self._get_events(
deferred, user, rooms, pagination_config.from_token,
pagination_config.limit, timeout
).addErrback(deferred.errback)
return deferred
@defer.inlineCallbacks
def _get_events(self, deferred, user, rooms, from_token, limit, timeout):
if not from_token:
from_token = yield self.event_sources.get_current_token()
listener = _NotificationListener(
user,
rooms,
from_token,
limit,
timeout,
deferred,
)
if timeout:
reactor.callLater(timeout/1000, self._timeout_listener, listener)
self._register_with_keys(listener)
yield self._check_for_updates(listener)
return
def _timeout_listener(self, listener):
# TODO (erikj): We should probably set to_token to the current max
# rather than reusing from_token.
listener.notify(
self,
[],
listener.from_token,
listener.from_token,
)
@log_function
def _register_with_keys(self, listener):
for room in listener.rooms:
s = self.rooms_to_listeners.setdefault(room, set())
s.add(listener)
self.user_to_listeners.setdefault(listener.user, set()).add(listener)
@defer.inlineCallbacks
@log_function
def _check_for_updates(self, listener):
# TODO (erikj): We need to think about limits across multiple sources
events = []
from_token = listener.from_token
limit = listener.limit
# TODO (erikj): DeferredList?
for source in self.event_sources.sources.values():
stuff, new_token = yield source.get_new_events_for_user(
listener.user,
from_token,
limit,
)
events.extend(stuff)
from_token = new_token
end_token = from_token
if events:
listener.notify(self, events, listener.from_token, end_token)
defer.returnValue(listener)
def _user_joined_room(self, user, room_id):
new_listeners = self.user_to_listeners.get(user, set())
listeners = self.rooms_to_listeners.setdefault(room_id, set())
listeners |= new_listeners

View File

@ -17,7 +17,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.api.streams import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.rest.base import RestServlet, client_path_pattern from synapse.rest.base import RestServlet, client_path_pattern
@ -41,6 +41,7 @@ class EventStreamRestServlet(RestServlet):
chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, chunk = yield handler.get_stream(auth_user.to_string(), pagin_config,
timeout=timeout) timeout=timeout)
defer.returnValue((200, chunk)) defer.returnValue((200, chunk))
def on_OPTIONS(self, request): def on_OPTIONS(self, request):

View File

@ -15,7 +15,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.streams import PaginationConfig from synapse.streams.config import PaginationConfig
from base import RestServlet, client_path_pattern from base import RestServlet, client_path_pattern

View File

@ -18,9 +18,9 @@ from twisted.internet import defer
from base import RestServlet, client_path_pattern from base import RestServlet, client_path_pattern
from synapse.api.errors import SynapseError, Codes from synapse.api.errors import SynapseError, Codes
from synapse.streams.config import PaginationConfig
from synapse.api.events.room import RoomMemberEvent from synapse.api.events.room import RoomMemberEvent
from synapse.api.constants import Membership from synapse.api.constants import Membership
from synapse.api.streams import PaginationConfig
import json import json
import logging import logging

View File

@ -22,7 +22,7 @@
from synapse.federation import initialize_http_replication from synapse.federation import initialize_http_replication
from synapse.federation.handler import FederationEventHandler from synapse.federation.handler import FederationEventHandler
from synapse.api.events.factory import EventFactory from synapse.api.events.factory import EventFactory
from synapse.api.notifier import Notifier from synapse.notifier import Notifier
from synapse.api.auth import Auth from synapse.api.auth import Auth
from synapse.handlers import Handlers from synapse.handlers import Handlers
from synapse.rest import RestServletFactory from synapse.rest import RestServletFactory
@ -32,6 +32,7 @@ from synapse.types import UserID, RoomAlias, RoomID
from synapse.util import Clock from synapse.util import Clock
from synapse.util.distributor import Distributor from synapse.util.distributor import Distributor
from synapse.util.lockutils import LockManager from synapse.util.lockutils import LockManager
from synapse.streams.events import EventSources
class BaseHomeServer(object): class BaseHomeServer(object):
@ -73,6 +74,7 @@ class BaseHomeServer(object):
'resource_for_federation', 'resource_for_federation',
'resource_for_web_client', 'resource_for_web_client',
'resource_for_content_repo', 'resource_for_content_repo',
'event_sources',
] ]
def __init__(self, hostname, **kwargs): def __init__(self, hostname, **kwargs):
@ -195,6 +197,9 @@ class HomeServer(BaseHomeServer):
def build_distributor(self): def build_distributor(self):
return Distributor() return Distributor()
def build_event_sources(self):
return EventSources(self)
def register_servlets(self): def register_servlets(self):
""" Register all servlets associated with this HomeServer. """ Register all servlets associated with this HomeServer.
""" """

View File

@ -174,7 +174,7 @@ class StreamStore(SQLBaseStore):
"SELECT * FROM events as e WHERE " "SELECT * FROM events as e WHERE "
"((room_id IN (%(current)s)) OR " "((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) " "(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering < ? " "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
"AND e.outlier = 0 " "AND e.outlier = 0 "
"ORDER BY stream_ordering ASC LIMIT %(limit)d " "ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % { ) % {
@ -293,5 +293,5 @@ class StreamStore(SQLBaseStore):
defer.returnValue("s1") defer.returnValue("s1")
return return
key = res[0]["m"] + 1 key = res[0]["m"]
defer.returnValue("s%d" % (key,)) defer.returnValue("s%d" % (key,))

View File

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

84
synapse/streams/config.py Normal file
View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.types import StreamToken
import logging
logger = logging.getLogger(__name__)
class PaginationConfig(object):
"""A configuration object which stores pagination parameters."""
def __init__(self, from_token=None, to_token=None, direction='f',
limit=0):
self.from_token = from_token
self.to_token = to_token
self.direction = 'f' if direction == 'f' else 'b'
self.limit = int(limit)
@classmethod
def from_request(cls, request, raise_invalid_params=True):
def get_param(name, default=None):
lst = request.args.get(name, [])
if len(lst) > 1:
raise SynapseError(
400, "%s must be specified only once" % (name,)
)
elif len(lst) == 1:
return lst[0]
else:
return default
direction = get_param("dir", 'f')
if direction not in ['f', 'b']:
raise SynapseError(400, "'dir' parameter is invalid.")
from_tok = get_param("from")
to_tok = get_param("to")
try:
if from_tok == "END":
from_tok = None # For backwards compat.
elif from_tok:
from_tok = StreamToken.from_string(from_tok)
except:
raise SynapseError(400, "'from' paramater is invalid")
try:
if to_tok:
to_tok = StreamToken.from_string(to_tok)
except:
raise SynapseError(400, "'to' paramater is invalid")
limit = get_param("limit", "0")
if not limit.isdigit():
raise SynapseError(400, "'limit' parameter must be an integer.")
try:
return PaginationConfig(from_tok, to_tok, direction, limit)
except:
logger.exception("Failed to create pagination config")
raise SynapseError(400, "Invalid request.")
def __str__(self):
return (
"<PaginationConfig from_tok=%s, to_tok=%s, "
"direction=%s, limit=%s>"
) % (self.from_tok, self.to_tok, self.direction, self.limit)

180
synapse/streams/events.py Normal file
View File

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.types import StreamToken
class RoomEventSource(object):
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_new_events_for_user(self, user, from_token, limit):
# We just ignore the key for now.
to_key = yield self.get_current_token_part()
events, end_key = yield self.store.get_room_events_stream(
user_id=user.to_string(),
from_key=from_token.events_key,
to_key=to_key,
room_id=None,
limit=limit,
)
end_token = from_token.copy_and_replace("events_key", end_key)
defer.returnValue((events, end_token))
def get_current_token_part(self):
return self.store.get_room_events_max_id()
@defer.inlineCallbacks
def get_pagination_rows(self, user, pagination_config, key):
from_token = pagination_config.from_token
to_token = pagination_config.to_token
limit = pagination_config.limit
direction = pagination_config.direction
to_key = to_token.events_key if to_token else None
events, next_key = yield self.store.paginate_room_events(
room_id=key,
from_key=from_token.events_key,
to_key=to_key,
direction=direction,
limit=limit,
with_feedback=True
)
next_token = from_token.copy_and_replace("events_key", next_key)
defer.returnValue((events, next_token))
class PresenceSource(object):
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
def get_new_events_for_user(self, user, from_token, limit):
from_key = int(from_token.presence_key)
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
# TODO(paul): limit, and filter by visibility
updates = [(k, cachemap[k]) for k in cachemap
if from_key < cachemap[k].serial]
if updates:
clock = self.clock
latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
end_token = from_token.copy_and_replace(
"presence_key", latest_serial
)
return ((data, end_token))
else:
end_token = from_token.copy_and_replace(
"presence_key", presence._user_cachemap_latest_serial
)
return (([], end_token))
def get_current_token_part(self):
presence = self.hs.get_handlers().presence_handler
return presence._user_cachemap_latest_serial
def get_pagination_rows(self, user, pagination_config, key):
# TODO (erikj): Does this make sense? Ordering?
from_token = pagination_config.from_token
to_token = pagination_config.to_token
from_key = int(from_token.presence_key)
if to_token:
to_key = int(to_token.presence_key)
else:
to_key = -1
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
# TODO(paul): limit, and filter by visibility
updates = [(k, cachemap[k]) for k in cachemap
if to_key < cachemap[k].serial < from_key]
if updates:
clock = self.clock
earliest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
if to_token:
next_token = to_token
else:
next_token = from_token
next_token = next_token.copy_and_replace(
"presence_key", earliest_serial
)
return ((data, next_token))
else:
if not to_token:
to_token = from_token.copy_and_replace(
"presence_key", 0
)
return (([], to_token))
class EventSources(object):
SOURCE_TYPES = {
"room": RoomEventSource,
"presence": PresenceSource,
}
def __init__(self, hs):
self.sources = {
name: cls(hs)
for name, cls in EventSources.SOURCE_TYPES.items()
}
@staticmethod
def create_token(events_key, presence_key):
return StreamToken(events_key=events_key, presence_key=presence_key)
@defer.inlineCallbacks
def get_current_token(self):
events_key = yield self.sources["room"].get_current_token_part()
presence_key = yield self.sources["presence"].get_current_token_part()
token = EventSources.create_token(events_key, presence_key)
defer.returnValue(token)
class StreamSource(object):
def get_new_events_for_user(self, user, from_token, limit):
raise NotImplementedError("get_new_events_for_user")
def get_current_token_part(self):
raise NotImplementedError("get_current_token_part")
def get_pagination_rows(self, user, pagination_config, key):
raise NotImplementedError("get_rows")

View File

@ -92,3 +92,36 @@ class RoomAlias(DomainSpecificString):
class RoomID(DomainSpecificString): class RoomID(DomainSpecificString):
"""Structure representing a room id. """ """Structure representing a room id. """
SIGIL = "!" SIGIL = "!"
class StreamToken(
namedtuple(
"Token",
("events_key", "presence_key")
)
):
_SEPARATOR = "_"
@classmethod
def from_string(cls, string):
try:
events_key, presence_key = string.split(cls._SEPARATOR)
return cls(
events_key=events_key,
presence_key=presence_key,
)
except:
raise SynapseError(400, "Invalid Token")
def to_string(self):
return "".join([
str(self.events_key),
self._SEPARATOR,
str(self.presence_key),
])
def copy_and_replace(self, key, new_value):
d = self._asdict()
d[key] = new_value
return StreamToken(**d)