mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 11:06:07 -04:00
Add cache to room stream
This commit is contained in:
parent
f93ecf8783
commit
b97f6626b6
5 changed files with 254 additions and 74 deletions
|
@ -37,6 +37,7 @@ from twisted.internet import defer
|
|||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.caches.room_change_cache import RoomStreamChangeCache
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.logutils import log_function
|
||||
|
@ -77,6 +78,12 @@ def upper_bound(token):
|
|||
|
||||
|
||||
class StreamStore(SQLBaseStore):
|
||||
def __init__(self, hs):
|
||||
super(StreamStore, self).__init__(hs)
|
||||
|
||||
self._events_stream_cache = RoomStreamChangeCache(
|
||||
"EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
|
||||
|
@ -157,6 +164,132 @@ class StreamStore(SQLBaseStore):
|
|||
results = yield self.runInteraction("get_appservice_room_stream", f)
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
|
||||
room_ids = yield self._events_stream_cache.get_rooms_changed(
|
||||
self, room_ids, from_id
|
||||
)
|
||||
|
||||
if not room_ids:
|
||||
defer.returnValue({})
|
||||
|
||||
results = {}
|
||||
room_ids = list(room_ids)
|
||||
for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
|
||||
res = yield defer.gatherResults([
|
||||
self.get_recent_room_events_stream_for_room(
|
||||
room_id, from_key, to_key, limit
|
||||
).addCallback(lambda r, rm: (rm, r), room_id)
|
||||
for room_id in room_ids
|
||||
])
|
||||
results.update(dict(res))
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
|
||||
if from_key is not None:
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
else:
|
||||
from_id = None
|
||||
to_id = RoomStreamToken.parse_stream_token(to_key).stream
|
||||
|
||||
if from_key == to_key:
|
||||
defer.returnValue(([], from_key))
|
||||
|
||||
has_changed = yield self._events_stream_cache.get_room_has_changed(
|
||||
room_id, from_id
|
||||
)
|
||||
|
||||
if not has_changed:
|
||||
defer.returnValue(([], from_key))
|
||||
|
||||
def f(txn):
|
||||
if from_id is not None:
|
||||
sql = (
|
||||
"SELECT event_id, stream_ordering FROM events WHERE"
|
||||
" room_id = ?"
|
||||
" AND not outlier"
|
||||
" AND stream_ordering > ? AND stream_ordering <= ?"
|
||||
" ORDER BY stream_ordering DESC LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (room_id, from_id, to_id, limit))
|
||||
else:
|
||||
sql = (
|
||||
"SELECT event_id, stream_ordering FROM events WHERE"
|
||||
" room_id = ?"
|
||||
" AND not outlier"
|
||||
" AND stream_ordering <= ?"
|
||||
" ORDER BY stream_ordering DESC LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (room_id, to_id, limit))
|
||||
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
||||
ret = self._get_events_txn(
|
||||
txn,
|
||||
[r["event_id"] for r in rows],
|
||||
get_prev_content=True
|
||||
)
|
||||
|
||||
ret.reverse()
|
||||
|
||||
self._set_before_and_after(ret, rows)
|
||||
|
||||
if rows:
|
||||
key = "s%d" % min(r["stream_ordering"] for r in rows)
|
||||
else:
|
||||
# Assume we didn't get anything because there was nothing to
|
||||
# get.
|
||||
key = from_key
|
||||
|
||||
return ret, key
|
||||
res = yield self.runInteraction("get_recent_room_events_stream_for_room", f)
|
||||
defer.returnValue(res)
|
||||
|
||||
def get_room_changes_for_user(self, user_id, from_key, to_key):
|
||||
if from_key is not None:
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
else:
|
||||
from_id = None
|
||||
to_id = RoomStreamToken.parse_stream_token(to_key).stream
|
||||
|
||||
if from_key == to_key:
|
||||
return defer.succeed([])
|
||||
|
||||
def f(txn):
|
||||
if from_id is not None:
|
||||
sql = (
|
||||
"SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m"
|
||||
" WHERE e.event_id = m.event_id"
|
||||
" AND m.user_id = ?"
|
||||
" AND e.stream_ordering > ? AND e.stream_ordering <= ?"
|
||||
" ORDER BY e.stream_ordering ASC"
|
||||
)
|
||||
txn.execute(sql, (user_id, from_id, to_id,))
|
||||
else:
|
||||
sql = (
|
||||
"SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m"
|
||||
" WHERE e.event_id = m.event_id"
|
||||
" AND m.user_id = ?"
|
||||
" AND stream_ordering <= ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
)
|
||||
txn.execute(sql, (user_id, to_id,))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
||||
ret = self._get_events_txn(
|
||||
txn,
|
||||
[r["event_id"] for r in rows],
|
||||
get_prev_content=True
|
||||
)
|
||||
|
||||
return ret
|
||||
|
||||
return self.runInteraction("get_room_changes_for_user", f)
|
||||
|
||||
@log_function
|
||||
def get_room_events_stream(
|
||||
self,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue