mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-11-13 05:00:37 -05:00
Merge branch 'develop' of github.com:matrix-org/synapse into room_config
This commit is contained in:
commit
10efca1a74
62 changed files with 1418 additions and 904 deletions
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
"""Contains the URL paths to prefix various aspects of the server with. """
|
||||
|
||||
CLIENT_PREFIX = "/matrix/client/api/v1"
|
||||
FEDERATION_PREFIX = "/matrix/federation/v1"
|
||||
WEB_CLIENT_PREFIX = "/matrix/client"
|
||||
CONTENT_REPO_PREFIX = "/matrix/content"
|
||||
CLIENT_PREFIX = "/_matrix/client/api/v1"
|
||||
FEDERATION_PREFIX = "/_matrix/federation/v1"
|
||||
WEB_CLIENT_PREFIX = "/_matrix/client"
|
||||
CONTENT_REPO_PREFIX = "/_matrix/content"
|
||||
|
|
@ -274,11 +274,11 @@ class MessageHandler(BaseRoomHandler):
|
|||
messages, token = yield self.store.get_recent_events_for_room(
|
||||
event.room_id,
|
||||
limit=limit,
|
||||
end_token=now_token.events_key,
|
||||
end_token=now_token.room_key,
|
||||
)
|
||||
|
||||
start_token = now_token.copy_and_replace("events_key", token[0])
|
||||
end_token = now_token.copy_and_replace("events_key", token[1])
|
||||
start_token = now_token.copy_and_replace("room_key", token[0])
|
||||
end_token = now_token.copy_and_replace("room_key", token[1])
|
||||
|
||||
d["messages"] = {
|
||||
"chunk": [m.get_dict() for m in messages],
|
||||
|
|
|
|||
|
|
@ -260,19 +260,18 @@ class PresenceHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def user_joined_room(self, user, room_id):
|
||||
|
||||
if user.is_mine:
|
||||
statuscache = self._get_or_make_usercache(user)
|
||||
|
||||
# No actual update but we need to bump the serial anyway for the
|
||||
# event source
|
||||
self._user_cachemap_latest_serial += 1
|
||||
statuscache.update({}, serial=self._user_cachemap_latest_serial)
|
||||
|
||||
self.push_update_to_local_and_remote(
|
||||
observed_user=user,
|
||||
room_ids=[room_id],
|
||||
statuscache=self._get_or_offline_usercache(user),
|
||||
)
|
||||
|
||||
else:
|
||||
self.push_update_to_clients(
|
||||
observed_user=user,
|
||||
room_ids=[room_id],
|
||||
statuscache=self._get_or_offline_usercache(user),
|
||||
statuscache=statuscache,
|
||||
)
|
||||
|
||||
# We also want to tell them about current presence of people.
|
||||
|
|
@ -722,6 +721,78 @@ class PresenceHandler(BaseHandler):
|
|||
)
|
||||
|
||||
|
||||
class PresenceEventSource(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
from_key = int(from_key)
|
||||
|
||||
presence = self.hs.get_handlers().presence_handler
|
||||
cachemap = presence._user_cachemap
|
||||
|
||||
# TODO(paul): limit, and filter by visibility
|
||||
updates = [(k, cachemap[k]) for k in cachemap
|
||||
if from_key < cachemap[k].serial]
|
||||
|
||||
if updates:
|
||||
clock = self.clock
|
||||
|
||||
latest_serial = max([x[1].serial for x in updates])
|
||||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
|
||||
|
||||
return ((data, latest_serial))
|
||||
else:
|
||||
return (([], presence._user_cachemap_latest_serial))
|
||||
|
||||
def get_current_key(self):
|
||||
presence = self.hs.get_handlers().presence_handler
|
||||
return presence._user_cachemap_latest_serial
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
# TODO (erikj): Does this make sense? Ordering?
|
||||
|
||||
from_token = pagination_config.from_token
|
||||
to_token = pagination_config.to_token
|
||||
|
||||
from_key = int(from_token.presence_key)
|
||||
|
||||
if to_token:
|
||||
to_key = int(to_token.presence_key)
|
||||
else:
|
||||
to_key = -1
|
||||
|
||||
presence = self.hs.get_handlers().presence_handler
|
||||
cachemap = presence._user_cachemap
|
||||
|
||||
# TODO(paul): limit, and filter by visibility
|
||||
updates = [(k, cachemap[k]) for k in cachemap
|
||||
if to_key < cachemap[k].serial < from_key]
|
||||
|
||||
if updates:
|
||||
clock = self.clock
|
||||
|
||||
earliest_serial = max([x[1].serial for x in updates])
|
||||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
|
||||
|
||||
if to_token:
|
||||
next_token = to_token
|
||||
else:
|
||||
next_token = from_token
|
||||
|
||||
next_token = next_token.copy_and_replace(
|
||||
"presence_key", earliest_serial
|
||||
)
|
||||
return ((data, next_token))
|
||||
else:
|
||||
if not to_token:
|
||||
to_token = from_token.copy_and_replace(
|
||||
"presence_key", 0
|
||||
)
|
||||
return (([], to_token))
|
||||
|
||||
|
||||
class UserPresenceCache(object):
|
||||
"""Store an observed user's state and status message.
|
||||
|
||||
|
|
|
|||
|
|
@ -497,3 +497,49 @@ class RoomListHandler(BaseRoomHandler):
|
|||
chunk = yield self.store.get_rooms(is_public=True)
|
||||
# FIXME (erikj): START is no longer a valid value
|
||||
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
|
||||
|
||||
|
||||
class RoomEventSource(object):
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
# We just ignore the key for now.
|
||||
|
||||
to_key = yield self.get_current_key()
|
||||
|
||||
events, end_key = yield self.store.get_room_events_stream(
|
||||
user_id=user.to_string(),
|
||||
from_key=from_key,
|
||||
to_key=to_key,
|
||||
room_id=None,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
defer.returnValue((events, end_key))
|
||||
|
||||
def get_current_key(self):
|
||||
return self.store.get_room_events_max_id()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
from_token = pagination_config.from_token
|
||||
to_token = pagination_config.to_token
|
||||
limit = pagination_config.limit
|
||||
direction = pagination_config.direction
|
||||
|
||||
to_key = to_token.room_key if to_token else None
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=key,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_key,
|
||||
direction=direction,
|
||||
limit=limit,
|
||||
with_feedback=True
|
||||
)
|
||||
|
||||
next_token = from_token.copy_and_replace("room_key", next_key)
|
||||
|
||||
defer.returnValue((events, next_token))
|
||||
|
|
|
|||
|
|
@ -145,3 +145,17 @@ class TypingNotificationHandler(BaseHandler):
|
|||
typing):
|
||||
# TODO(paul) steal this from presence.py
|
||||
pass
|
||||
|
||||
|
||||
class TypingNotificationEventSource(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
|
||||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
return ([], from_key)
|
||||
|
||||
def get_current_key(self):
|
||||
return 0
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
return ([], pagination_config.from_token)
|
||||
|
|
|
|||
|
|
@ -325,7 +325,7 @@ class ContentRepoResource(resource.Resource):
|
|||
|
||||
# FIXME (erikj): These should use constants.
|
||||
file_name = os.path.basename(fname)
|
||||
url = "http://%s/matrix/content/%s" % (
|
||||
url = "http://%s/_matrix/content/%s" % (
|
||||
self.hs.domain_with_port, file_name
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ class Notifier(object):
|
|||
"""
|
||||
room_id = event.room_id
|
||||
|
||||
source = self.event_sources.sources["room"]
|
||||
room_source = self.event_sources.sources["room"]
|
||||
|
||||
listeners = self.rooms_to_listeners.get(room_id, set()).copy()
|
||||
|
||||
|
|
@ -107,13 +107,17 @@ class Notifier(object):
|
|||
# TODO (erikj): Can we make this more efficient by hitting the
|
||||
# db once?
|
||||
for listener in listeners:
|
||||
events, end_token = yield source.get_new_events_for_user(
|
||||
events, end_key = yield room_source.get_new_events_for_user(
|
||||
listener.user,
|
||||
listener.from_token,
|
||||
listener.from_token.room_key,
|
||||
listener.limit,
|
||||
)
|
||||
|
||||
if events:
|
||||
end_token = listener.from_token.copy_and_replace(
|
||||
"room_key", end_key
|
||||
)
|
||||
|
||||
listener.notify(
|
||||
self, events, listener.from_token, end_token
|
||||
)
|
||||
|
|
@ -126,7 +130,7 @@ class Notifier(object):
|
|||
|
||||
Will wake up all listeners for the given users and rooms.
|
||||
"""
|
||||
source = self.event_sources.sources["presence"]
|
||||
presence_source = self.event_sources.sources["presence"]
|
||||
|
||||
listeners = set()
|
||||
|
||||
|
|
@ -137,13 +141,17 @@ class Notifier(object):
|
|||
listeners |= self.rooms_to_listeners.get(room, set()).copy()
|
||||
|
||||
for listener in listeners:
|
||||
events, end_token = yield source.get_new_events_for_user(
|
||||
events, end_key = yield presence_source.get_new_events_for_user(
|
||||
listener.user,
|
||||
listener.from_token,
|
||||
listener.from_token.presence_key,
|
||||
listener.limit,
|
||||
)
|
||||
|
||||
if events:
|
||||
end_token = listener.from_token.copy_and_replace(
|
||||
"presence_key", end_key
|
||||
)
|
||||
|
||||
listener.notify(
|
||||
self, events, listener.from_token, end_token
|
||||
)
|
||||
|
|
@ -216,16 +224,18 @@ class Notifier(object):
|
|||
limit = listener.limit
|
||||
|
||||
# TODO (erikj): DeferredList?
|
||||
for source in self.event_sources.sources.values():
|
||||
stuff, new_token = yield source.get_new_events_for_user(
|
||||
for name, source in self.event_sources.sources.items():
|
||||
keyname = "%s_key" % name
|
||||
|
||||
stuff, new_key = yield source.get_new_events_for_user(
|
||||
listener.user,
|
||||
from_token,
|
||||
getattr(from_token, keyname),
|
||||
limit,
|
||||
)
|
||||
|
||||
events.extend(stuff)
|
||||
|
||||
from_token = new_token
|
||||
from_token = from_token.copy_and_replace(keyname, new_key)
|
||||
|
||||
end_token = from_token
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,10 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.types import StreamToken
|
||||
|
||||
from synapse.handlers.presence import PresenceEventSource
|
||||
from synapse.handlers.room import RoomEventSource
|
||||
from synapse.handlers.typing import TypingNotificationEventSource
|
||||
|
||||
|
||||
class NullSource(object):
|
||||
"""This event source never yields any events and its token remains at
|
||||
|
|
@ -24,146 +28,21 @@ class NullSource(object):
|
|||
def __init__(self, hs):
|
||||
pass
|
||||
|
||||
def get_new_events_for_user(self, user, from_token, limit):
|
||||
return defer.succeed(([], from_token))
|
||||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
return defer.succeed(([], from_key))
|
||||
|
||||
def get_current_token_part(self):
|
||||
def get_current_key(self):
|
||||
return defer.succeed(0)
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
return defer.succeed(([], pagination_config.from_token))
|
||||
|
||||
|
||||
class RoomEventSource(object):
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_new_events_for_user(self, user, from_token, limit):
|
||||
# We just ignore the key for now.
|
||||
|
||||
to_key = yield self.get_current_token_part()
|
||||
|
||||
events, end_key = yield self.store.get_room_events_stream(
|
||||
user_id=user.to_string(),
|
||||
from_key=from_token.events_key,
|
||||
to_key=to_key,
|
||||
room_id=None,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
end_token = from_token.copy_and_replace("events_key", end_key)
|
||||
|
||||
defer.returnValue((events, end_token))
|
||||
|
||||
def get_current_token_part(self):
|
||||
return self.store.get_room_events_max_id()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
from_token = pagination_config.from_token
|
||||
to_token = pagination_config.to_token
|
||||
limit = pagination_config.limit
|
||||
direction = pagination_config.direction
|
||||
|
||||
to_key = to_token.events_key if to_token else None
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=key,
|
||||
from_key=from_token.events_key,
|
||||
to_key=to_key,
|
||||
direction=direction,
|
||||
limit=limit,
|
||||
with_feedback=True
|
||||
)
|
||||
|
||||
next_token = from_token.copy_and_replace("events_key", next_key)
|
||||
|
||||
defer.returnValue((events, next_token))
|
||||
|
||||
|
||||
class PresenceSource(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
def get_new_events_for_user(self, user, from_token, limit):
|
||||
from_key = int(from_token.presence_key)
|
||||
|
||||
presence = self.hs.get_handlers().presence_handler
|
||||
cachemap = presence._user_cachemap
|
||||
|
||||
# TODO(paul): limit, and filter by visibility
|
||||
updates = [(k, cachemap[k]) for k in cachemap
|
||||
if from_key < cachemap[k].serial]
|
||||
|
||||
if updates:
|
||||
clock = self.clock
|
||||
|
||||
latest_serial = max([x[1].serial for x in updates])
|
||||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
|
||||
|
||||
end_token = from_token.copy_and_replace(
|
||||
"presence_key", latest_serial
|
||||
)
|
||||
return ((data, end_token))
|
||||
else:
|
||||
end_token = from_token.copy_and_replace(
|
||||
"presence_key", presence._user_cachemap_latest_serial
|
||||
)
|
||||
return (([], end_token))
|
||||
|
||||
def get_current_token_part(self):
|
||||
presence = self.hs.get_handlers().presence_handler
|
||||
return presence._user_cachemap_latest_serial
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
# TODO (erikj): Does this make sense? Ordering?
|
||||
|
||||
from_token = pagination_config.from_token
|
||||
to_token = pagination_config.to_token
|
||||
|
||||
from_key = int(from_token.presence_key)
|
||||
|
||||
if to_token:
|
||||
to_key = int(to_token.presence_key)
|
||||
else:
|
||||
to_key = -1
|
||||
|
||||
presence = self.hs.get_handlers().presence_handler
|
||||
cachemap = presence._user_cachemap
|
||||
|
||||
# TODO(paul): limit, and filter by visibility
|
||||
updates = [(k, cachemap[k]) for k in cachemap
|
||||
if to_key < cachemap[k].serial < from_key]
|
||||
|
||||
if updates:
|
||||
clock = self.clock
|
||||
|
||||
earliest_serial = max([x[1].serial for x in updates])
|
||||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
|
||||
|
||||
if to_token:
|
||||
next_token = to_token
|
||||
else:
|
||||
next_token = from_token
|
||||
|
||||
next_token = next_token.copy_and_replace(
|
||||
"presence_key", earliest_serial
|
||||
)
|
||||
return ((data, next_token))
|
||||
else:
|
||||
if not to_token:
|
||||
to_token = from_token.copy_and_replace(
|
||||
"presence_key", 0
|
||||
)
|
||||
return (([], to_token))
|
||||
|
||||
|
||||
class EventSources(object):
|
||||
SOURCE_TYPES = {
|
||||
"room": RoomEventSource,
|
||||
"presence": PresenceSource,
|
||||
"presence": PresenceEventSource,
|
||||
"typing": TypingNotificationEventSource,
|
||||
}
|
||||
|
||||
def __init__(self, hs):
|
||||
|
|
@ -172,24 +51,29 @@ class EventSources(object):
|
|||
for name, cls in EventSources.SOURCE_TYPES.items()
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def create_token(events_key, presence_key):
|
||||
return StreamToken(events_key=events_key, presence_key=presence_key)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_current_token(self):
|
||||
events_key = yield self.sources["room"].get_current_token_part()
|
||||
presence_key = yield self.sources["presence"].get_current_token_part()
|
||||
token = EventSources.create_token(events_key, presence_key)
|
||||
token = StreamToken(
|
||||
room_key=(
|
||||
yield self.sources["room"].get_current_key()
|
||||
),
|
||||
presence_key=(
|
||||
yield self.sources["presence"].get_current_key()
|
||||
),
|
||||
typing_key=(
|
||||
yield self.sources["typing"].get_current_key()
|
||||
)
|
||||
)
|
||||
defer.returnValue(token)
|
||||
|
||||
|
||||
class StreamSource(object):
|
||||
def get_new_events_for_user(self, user, from_token, limit):
|
||||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
"""from_key is the key within this event source."""
|
||||
raise NotImplementedError("get_new_events_for_user")
|
||||
|
||||
def get_current_token_part(self):
|
||||
raise NotImplementedError("get_current_token_part")
|
||||
def get_current_key(self):
|
||||
raise NotImplementedError("get_current_key")
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
raise NotImplementedError("get_rows")
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ class RoomID(DomainSpecificString):
|
|||
class StreamToken(
|
||||
namedtuple(
|
||||
"Token",
|
||||
("events_key", "presence_key")
|
||||
("room_key", "presence_key", "typing_key")
|
||||
)
|
||||
):
|
||||
_SEPARATOR = "_"
|
||||
|
|
@ -105,21 +105,14 @@ class StreamToken(
|
|||
@classmethod
|
||||
def from_string(cls, string):
|
||||
try:
|
||||
events_key, presence_key = string.split(cls._SEPARATOR)
|
||||
keys = string.split(cls._SEPARATOR)
|
||||
|
||||
return cls(
|
||||
events_key=events_key,
|
||||
presence_key=presence_key,
|
||||
)
|
||||
return cls(*keys)
|
||||
except:
|
||||
raise SynapseError(400, "Invalid Token")
|
||||
|
||||
def to_string(self):
|
||||
return "".join([
|
||||
str(self.events_key),
|
||||
self._SEPARATOR,
|
||||
str(self.presence_key),
|
||||
])
|
||||
return self._SEPARATOR.join([str(k) for k in self])
|
||||
|
||||
def copy_and_replace(self, key, new_value):
|
||||
d = self._asdict()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue