Merge pull request #1121 from matrix-org/erikj/public_room_paginate

Add pagination support to publicRooms
This commit is contained in:
Erik Johnston 2016-09-15 13:27:09 +01:00 committed by GitHub
commit e457034e99
18 changed files with 589 additions and 133 deletions

View file

@ -113,6 +113,9 @@ class DataStore(RoomMemberStore, RoomStore,
self._device_inbox_id_gen = StreamIdGenerator(
db_conn, "device_max_stream_id", "stream_id"
)
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")

View file

@ -16,6 +16,7 @@
from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
@ -36,6 +37,13 @@ class EventFederationStore(SQLBaseStore):
and backfilling from another server respectively.
"""
def __init__(self, hs):
super(EventFederationStore, self).__init__(hs)
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)
def get_auth_chain(self, event_ids):
return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
@ -270,6 +278,37 @@ class EventFederationStore(SQLBaseStore):
]
)
# We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
max_stream_ord = max(
ev.internal_metadata.stream_ordering for ev in events
)
new_extrem = {}
for room_id in events_by_room:
event_ids = self._simple_select_onecol_txn(
txn,
table="event_forward_extremities",
keyvalues={"room_id": room_id},
retcol="event_id",
)
new_extrem[room_id] = event_ids
self._simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
{
"room_id": room_id,
"event_id": event_id,
"stream_ordering": max_stream_ord,
}
for room_id, extrem_evs in new_extrem.items()
for event_id in extrem_evs
]
)
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
@ -305,6 +344,48 @@ class EventFederationStore(SQLBaseStore):
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
Throws a StoreError if we have since purged the index for
stream_orderings from that point.
"""
if stream_ordering <= self.stream_ordering_month_ago:
raise StoreError(400, "stream_ordering too old")
sql = ("""
SELECT event_id FROM stream_ordering_to_exterm
INNER JOIN (
SELECT room_id, MAX(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
WHERE stream_ordering < ? GROUP BY room_id
) AS rms USING (room_id, stream_ordering)
WHERE room_id = ?
""")
def get_forward_extremeties_for_room_txn(txn):
txn.execute(sql, (stream_ordering, room_id))
rows = txn.fetchall()
return [event_id for event_id, in rows]
return self.runInteraction(
"get_forward_extremeties_for_room",
get_forward_extremeties_for_room_txn
)
def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
txn.execute(
"DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
(self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn
)
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`

View file

@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore):
StoreError if the room could not be stored.
"""
try:
yield self._simple_insert(
"rooms",
{
"room_id": room_id,
"creator": room_creator_user_id,
"is_public": is_public,
},
desc="store_room",
)
def store_room_txn(txn, next_id):
self._simple_insert_txn(
txn,
"rooms",
{
"room_id": room_id,
"creator": room_creator_user_id,
"is_public": is_public,
},
)
if is_public:
self._simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
}
)
with self._public_room_id_gen.get_next() as next_id:
yield self.runInteraction(
"store_room_txn",
store_room_txn, next_id,
)
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore):
allow_none=True,
)
@defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
return self._simple_update_one(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
desc="set_room_is_public",
)
def set_room_is_public_txn(txn, next_id):
self._simple_update_one_txn(
txn,
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
)
entries = self._simple_select_list_txn(
txn,
table="public_room_list_stream",
keyvalues={"room_id": room_id},
retcols=("stream_id", "visibility"),
)
entries.sort(key=lambda r: r["stream_id"])
add_to_stream = True
if entries:
add_to_stream = bool(entries[-1]["visibility"]) != is_public
if add_to_stream:
self._simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
}
)
with self._public_room_id_gen.get_next() as next_id:
yield self.runInteraction(
"set_room_is_public",
set_room_is_public_txn, next_id,
)
def get_public_room_ids(self):
return self._simple_select_onecol(
@ -207,3 +255,71 @@ class RoomStore(SQLBaseStore):
},
desc="add_event_report"
)
def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()
def get_public_room_ids_at_stream_id(self, stream_id):
return self.runInteraction(
"get_public_room_ids_at_stream_id",
self.get_public_room_ids_at_stream_id_txn, stream_id
)
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
return {
rm
for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
if vis
}
def get_published_at_stream_id_txn(self, txn, stream_id):
sql = ("""
SELECT room_id, visibility FROM public_room_list_stream
INNER JOIN (
SELECT room_id, max(stream_id) AS stream_id
FROM public_room_list_stream
WHERE stream_id <= ?
GROUP BY room_id
) grouped USING (room_id, stream_id)
""")
txn.execute(sql, (stream_id,))
return dict(txn.fetchall())
def get_public_room_changes(self, prev_stream_id, new_stream_id):
def get_public_room_changes_txn(txn):
then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
now_rooms_visible = set(
rm for rm, vis in now_rooms_dict.items() if vis
)
now_rooms_not_visible = set(
rm for rm, vis in now_rooms_dict.items() if not vis
)
newly_visible = now_rooms_visible - then_rooms
newly_unpublished = now_rooms_not_visible & then_rooms
return newly_visible, newly_unpublished
return self.runInteraction(
"get_public_room_changes", get_public_room_changes_txn
)
def get_all_new_public_rooms(self, prev_id, current_id, limit):
def get_all_new_public_rooms(txn):
sql = ("""
SELECT stream_id, room_id, visibility FROM public_room_list_stream
WHERE stream_id > ? AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
""")
txn.execute(sql, (prev_id, current_id, limit,))
return txn.fetchall()
return self.runInteraction(
"get_all_new_public_rooms", get_all_new_public_rooms
)

View file

@ -0,0 +1,33 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE public_room_list_stream (
stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
visibility BOOLEAN NOT NULL
);
INSERT INTO public_room_list_stream (stream_id, room_id, visibility)
SELECT 1, room_id, is_public FROM rooms
WHERE is_public = CAST(1 AS BOOLEAN);
CREATE INDEX public_room_list_stream_idx on public_room_list_stream(
stream_id
);
CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream(
room_id, stream_id
);

View file

@ -0,0 +1,37 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE stream_ordering_to_exterm (
stream_ordering BIGINT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL
);
INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id)
SELECT stream_ordering, room_id, event_id FROM event_forward_extremities
INNER JOIN (
SELECT room_id, max(stream_ordering) as stream_ordering FROM events
INNER JOIN event_forward_extremities USING (room_id, event_id)
GROUP BY room_id
) AS rms USING (room_id);
CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm(
stream_ordering
);
CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm(
room_id, stream_ordering
);

View file

@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue("t%d-%d" % (topo, token))
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
Args: