Have EventSource's get_new_events_for_user() API work only on keys within that source, not overall eventstream tokens

This commit is contained in:
Paul "LeoNerd" Evans 2014-08-29 19:13:55 +01:00
parent 56424eca5c
commit eec67a675f
5 changed files with 33 additions and 30 deletions

View File

@ -727,8 +727,8 @@ class PresenceEventSource(object):
self.hs = hs self.hs = hs
self.clock = hs.get_clock() self.clock = hs.get_clock()
def get_new_events_for_user(self, user, from_token, limit): def get_new_events_for_user(self, user, from_key, limit):
from_key = int(from_token.presence_key) from_key = int(from_key)
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap cachemap = presence._user_cachemap
@ -743,15 +743,9 @@ class PresenceEventSource(object):
latest_serial = max([x[1].serial for x in updates]) latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
end_token = from_token.copy_and_replace( return ((data, latest_serial))
"presence_key", latest_serial
)
return ((data, end_token))
else: else:
end_token = from_token.copy_and_replace( return (([], presence._user_cachemap_latest_serial))
"presence_key", presence._user_cachemap_latest_serial
)
return (([], end_token))
def get_current_token_part(self): def get_current_token_part(self):
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler

View File

@ -469,22 +469,20 @@ class RoomEventSource(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_new_events_for_user(self, user, from_token, limit): def get_new_events_for_user(self, user, from_key, limit):
# We just ignore the key for now. # We just ignore the key for now.
to_key = yield self.get_current_token_part() to_key = yield self.get_current_token_part()
events, end_key = yield self.store.get_room_events_stream( events, end_key = yield self.store.get_room_events_stream(
user_id=user.to_string(), user_id=user.to_string(),
from_key=from_token.room_key, from_key=from_key,
to_key=to_key, to_key=to_key,
room_id=None, room_id=None,
limit=limit, limit=limit,
) )
end_token = from_token.copy_and_replace("room_key", end_key) defer.returnValue((events, end_key))
defer.returnValue((events, end_token))
def get_current_token_part(self): def get_current_token_part(self):
return self.store.get_room_events_max_id() return self.store.get_room_events_max_id()

View File

@ -151,8 +151,8 @@ class TypingNotificationEventSource(object):
def __init__(self, hs): def __init__(self, hs):
self.hs = hs self.hs = hs
def get_new_events_for_user(self, user, from_token, limit): def get_new_events_for_user(self, user, from_key, limit):
return ([], from_token) return ([], from_key)
def get_current_token_part(self): def get_current_token_part(self):
return 0 return 0

View File

@ -95,7 +95,7 @@ class Notifier(object):
""" """
room_id = event.room_id room_id = event.room_id
source = self.event_sources.sources["room"] room_source = self.event_sources.sources["room"]
listeners = self.rooms_to_listeners.get(room_id, set()).copy() listeners = self.rooms_to_listeners.get(room_id, set()).copy()
@ -107,13 +107,17 @@ class Notifier(object):
# TODO (erikj): Can we make this more efficient by hitting the # TODO (erikj): Can we make this more efficient by hitting the
# db once? # db once?
for listener in listeners: for listener in listeners:
events, end_token = yield source.get_new_events_for_user( events, end_key = yield room_source.get_new_events_for_user(
listener.user, listener.user,
listener.from_token, listener.from_token.room_key,
listener.limit, listener.limit,
) )
if events: if events:
end_token = listener.from_token.copy_and_replace(
"room_key", end_key
)
listener.notify( listener.notify(
self, events, listener.from_token, end_token self, events, listener.from_token, end_token
) )
@ -126,7 +130,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms. Will wake up all listeners for the given users and rooms.
""" """
source = self.event_sources.sources["presence"] presence_source = self.event_sources.sources["presence"]
listeners = set() listeners = set()
@ -137,13 +141,17 @@ class Notifier(object):
listeners |= self.rooms_to_listeners.get(room, set()).copy() listeners |= self.rooms_to_listeners.get(room, set()).copy()
for listener in listeners: for listener in listeners:
events, end_token = yield source.get_new_events_for_user( events, end_key = yield presence_source.get_new_events_for_user(
listener.user, listener.user,
listener.from_token, listener.from_token.presence_key,
listener.limit, listener.limit,
) )
if events: if events:
end_token = listener.from_token.copy_and_replace(
"presence_key", end_key
)
listener.notify( listener.notify(
self, events, listener.from_token, end_token self, events, listener.from_token, end_token
) )
@ -216,16 +224,18 @@ class Notifier(object):
limit = listener.limit limit = listener.limit
# TODO (erikj): DeferredList? # TODO (erikj): DeferredList?
for source in self.event_sources.sources.values(): for name, source in self.event_sources.sources.items():
stuff, new_token = yield source.get_new_events_for_user( keyname = "%s_key" % name
stuff, new_key = yield source.get_new_events_for_user(
listener.user, listener.user,
from_token, getattr(from_token, keyname),
limit, limit,
) )
events.extend(stuff) events.extend(stuff)
from_token = new_token from_token = from_token.copy_and_replace(keyname, new_key)
end_token = from_token end_token = from_token

View File

@ -28,8 +28,8 @@ class NullSource(object):
def __init__(self, hs): def __init__(self, hs):
pass pass
def get_new_events_for_user(self, user, from_token, limit): def get_new_events_for_user(self, user, from_key, limit):
return defer.succeed(([], from_token)) return defer.succeed(([], from_key))
def get_current_token_part(self): def get_current_token_part(self):
return defer.succeed(0) return defer.succeed(0)
@ -68,7 +68,8 @@ class EventSources(object):
class StreamSource(object): class StreamSource(object):
def get_new_events_for_user(self, user, from_token, limit): def get_new_events_for_user(self, user, from_key, limit):
"""from_key is the key within this event source."""
raise NotImplementedError("get_new_events_for_user") raise NotImplementedError("get_new_events_for_user")
def get_current_token_part(self): def get_current_token_part(self):