mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-11-13 00:00:41 -05:00
Merge branch 'release-v1.41' into develop
This commit is contained in:
commit
78a70a2e0b
84 changed files with 807 additions and 951 deletions
|
|
@ -127,9 +127,6 @@ class DataStore(
|
|||
self._clock = hs.get_clock()
|
||||
self.database_engine = database.engine
|
||||
|
||||
self._public_room_id_gen = StreamIdGenerator(
|
||||
db_conn, "public_room_list_stream", "stream_id"
|
||||
)
|
||||
self._device_list_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"device_lists_stream",
|
||||
|
|
@ -170,6 +167,7 @@ class DataStore(
|
|||
sequence_name="cache_invalidation_stream_seq",
|
||||
writers=[],
|
||||
)
|
||||
|
||||
else:
|
||||
self._cache_id_gen = None
|
||||
|
||||
|
|
|
|||
|
|
@ -73,6 +73,40 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
|
||||
self.config = hs.config
|
||||
|
||||
async def store_room(
|
||||
self,
|
||||
room_id: str,
|
||||
room_creator_user_id: str,
|
||||
is_public: bool,
|
||||
room_version: RoomVersion,
|
||||
):
|
||||
"""Stores a room.
|
||||
|
||||
Args:
|
||||
room_id: The desired room ID, can be None.
|
||||
room_creator_user_id: The user ID of the room creator.
|
||||
is_public: True to indicate that this room should appear in
|
||||
public room lists.
|
||||
room_version: The version of the room
|
||||
Raises:
|
||||
StoreError if the room could not be stored.
|
||||
"""
|
||||
try:
|
||||
await self.db_pool.simple_insert(
|
||||
"rooms",
|
||||
{
|
||||
"room_id": room_id,
|
||||
"creator": room_creator_user_id,
|
||||
"is_public": is_public,
|
||||
"room_version": room_version.identifier,
|
||||
"has_auth_chain_index": True,
|
||||
},
|
||||
desc="store_room",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("store_room with room_id=%s failed: %s", room_id, e)
|
||||
raise StoreError(500, "Problem creating room.")
|
||||
|
||||
async def get_room(self, room_id: str) -> dict:
|
||||
"""Retrieve a room.
|
||||
|
||||
|
|
@ -890,55 +924,6 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
|
||||
return total_media_quarantined
|
||||
|
||||
async def get_all_new_public_rooms(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
||||
"""Get updates for public rooms replication stream.
|
||||
|
||||
Args:
|
||||
instance_name: The writer we want to fetch updates from. Unused
|
||||
here since there is only ever one writer.
|
||||
last_id: The token to fetch updates from. Exclusive.
|
||||
current_id: The token to fetch updates up to. Inclusive.
|
||||
limit: The requested limit for the number of rows to return. The
|
||||
function may return more or fewer rows.
|
||||
|
||||
Returns:
|
||||
A tuple consisting of: the updates, a token to use to fetch
|
||||
subsequent updates, and whether we returned fewer rows than exists
|
||||
between the requested tokens due to the limit.
|
||||
|
||||
The token returned can be used in a subsequent call to this
|
||||
function to get further updatees.
|
||||
|
||||
The updates are a list of 2-tuples of stream ID and the row data
|
||||
"""
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
def get_all_new_public_rooms(txn):
|
||||
sql = """
|
||||
SELECT stream_id, room_id, visibility, appservice_id, network_id
|
||||
FROM public_room_list_stream
|
||||
WHERE stream_id > ? AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
updates = [(row[0], row[1:]) for row in txn]
|
||||
limited = False
|
||||
upto_token = current_id
|
||||
if len(updates) >= limit:
|
||||
upto_token = updates[-1][0]
|
||||
limited = True
|
||||
|
||||
return updates, upto_token, limited
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_all_new_public_rooms", get_all_new_public_rooms
|
||||
)
|
||||
|
||||
async def get_rooms_for_retention_period_in_range(
|
||||
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
|
||||
) -> Dict[str, dict]:
|
||||
|
|
@ -1391,57 +1376,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||
lock=False,
|
||||
)
|
||||
|
||||
async def store_room(
|
||||
self,
|
||||
room_id: str,
|
||||
room_creator_user_id: str,
|
||||
is_public: bool,
|
||||
room_version: RoomVersion,
|
||||
):
|
||||
"""Stores a room.
|
||||
|
||||
Args:
|
||||
room_id: The desired room ID, can be None.
|
||||
room_creator_user_id: The user ID of the room creator.
|
||||
is_public: True to indicate that this room should appear in
|
||||
public room lists.
|
||||
room_version: The version of the room
|
||||
Raises:
|
||||
StoreError if the room could not be stored.
|
||||
"""
|
||||
try:
|
||||
|
||||
def store_room_txn(txn, next_id):
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
"rooms",
|
||||
{
|
||||
"room_id": room_id,
|
||||
"creator": room_creator_user_id,
|
||||
"is_public": is_public,
|
||||
"room_version": room_version.identifier,
|
||||
"has_auth_chain_index": True,
|
||||
},
|
||||
)
|
||||
if is_public:
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="public_room_list_stream",
|
||||
values={
|
||||
"stream_id": next_id,
|
||||
"room_id": room_id,
|
||||
"visibility": is_public,
|
||||
},
|
||||
)
|
||||
|
||||
async with self._public_room_id_gen.get_next() as next_id:
|
||||
await self.db_pool.runInteraction(
|
||||
"store_room_txn", store_room_txn, next_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("store_room with room_id=%s failed: %s", room_id, e)
|
||||
raise StoreError(500, "Problem creating room.")
|
||||
|
||||
async def maybe_store_room_on_outlier_membership(
|
||||
self, room_id: str, room_version: RoomVersion
|
||||
):
|
||||
|
|
@ -1470,49 +1404,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||
lock=False,
|
||||
)
|
||||
|
||||
async def set_room_is_public(self, room_id, is_public):
|
||||
def set_room_is_public_txn(txn, next_id):
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
updatevalues={"is_public": is_public},
|
||||
)
|
||||
async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
|
||||
await self.db_pool.simple_update_one(
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
updatevalues={"is_public": is_public},
|
||||
desc="set_room_is_public",
|
||||
)
|
||||
|
||||
entries = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="public_room_list_stream",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"appservice_id": None,
|
||||
"network_id": None,
|
||||
},
|
||||
retcols=("stream_id", "visibility"),
|
||||
)
|
||||
|
||||
entries.sort(key=lambda r: r["stream_id"])
|
||||
|
||||
add_to_stream = True
|
||||
if entries:
|
||||
add_to_stream = bool(entries[-1]["visibility"]) != is_public
|
||||
|
||||
if add_to_stream:
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="public_room_list_stream",
|
||||
values={
|
||||
"stream_id": next_id,
|
||||
"room_id": room_id,
|
||||
"visibility": is_public,
|
||||
"appservice_id": None,
|
||||
"network_id": None,
|
||||
},
|
||||
)
|
||||
|
||||
async with self._public_room_id_gen.get_next() as next_id:
|
||||
await self.db_pool.runInteraction(
|
||||
"set_room_is_public", set_room_is_public_txn, next_id
|
||||
)
|
||||
self.hs.get_notifier().on_new_replication_data()
|
||||
|
||||
async def set_room_is_public_appservice(
|
||||
|
|
@ -1533,68 +1432,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||
list.
|
||||
"""
|
||||
|
||||
def set_room_is_public_appservice_txn(txn, next_id):
|
||||
if is_public:
|
||||
try:
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="appservice_room_list",
|
||||
values={
|
||||
"appservice_id": appservice_id,
|
||||
"network_id": network_id,
|
||||
"room_id": room_id,
|
||||
},
|
||||
)
|
||||
except self.database_engine.module.IntegrityError:
|
||||
# We've already inserted, nothing to do.
|
||||
return
|
||||
else:
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="appservice_room_list",
|
||||
keyvalues={
|
||||
"appservice_id": appservice_id,
|
||||
"network_id": network_id,
|
||||
"room_id": room_id,
|
||||
},
|
||||
)
|
||||
|
||||
entries = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="public_room_list_stream",
|
||||
if is_public:
|
||||
await self.db_pool.simple_upsert(
|
||||
table="appservice_room_list",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"appservice_id": appservice_id,
|
||||
"network_id": network_id,
|
||||
"room_id": room_id,
|
||||
},
|
||||
retcols=("stream_id", "visibility"),
|
||||
values={},
|
||||
insertion_values={
|
||||
"appservice_id": appservice_id,
|
||||
"network_id": network_id,
|
||||
"room_id": room_id,
|
||||
},
|
||||
desc="set_room_is_public_appservice_true",
|
||||
)
|
||||
else:
|
||||
await self.db_pool.simple_delete(
|
||||
table="appservice_room_list",
|
||||
keyvalues={
|
||||
"appservice_id": appservice_id,
|
||||
"network_id": network_id,
|
||||
"room_id": room_id,
|
||||
},
|
||||
desc="set_room_is_public_appservice_false",
|
||||
)
|
||||
|
||||
entries.sort(key=lambda r: r["stream_id"])
|
||||
|
||||
add_to_stream = True
|
||||
if entries:
|
||||
add_to_stream = bool(entries[-1]["visibility"]) != is_public
|
||||
|
||||
if add_to_stream:
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="public_room_list_stream",
|
||||
values={
|
||||
"stream_id": next_id,
|
||||
"room_id": room_id,
|
||||
"visibility": is_public,
|
||||
"appservice_id": appservice_id,
|
||||
"network_id": network_id,
|
||||
},
|
||||
)
|
||||
|
||||
async with self._public_room_id_gen.get_next() as next_id:
|
||||
await self.db_pool.runInteraction(
|
||||
"set_room_is_public_appservice",
|
||||
set_room_is_public_appservice_txn,
|
||||
next_id,
|
||||
)
|
||||
self.hs.get_notifier().on_new_replication_data()
|
||||
|
||||
async def add_event_report(
|
||||
|
|
@ -1787,9 +1651,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||
"get_event_reports_paginate", _get_event_reports_paginate_txn
|
||||
)
|
||||
|
||||
def get_current_public_room_stream_id(self):
|
||||
return self._public_room_id_gen.get_current_token()
|
||||
|
||||
async def block_room(self, room_id: str, user_id: str) -> None:
|
||||
"""Marks the room as blocked. Can be called multiple times.
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
SCHEMA_VERSION = 62
|
||||
SCHEMA_VERSION = 63
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
|
|
@ -25,6 +25,11 @@ for more information on how this works.
|
|||
Changes in SCHEMA_VERSION = 61:
|
||||
- The `user_stats_historical` and `room_stats_historical` tables are not written and
|
||||
are not read (previously, they were written but not read).
|
||||
|
||||
Changes in SCHEMA_VERSION = 63:
|
||||
- The `public_room_list_stream` table is not written nor read to
|
||||
(previously, it was written and read to, but not for any significant purpose).
|
||||
https://github.com/matrix-org/synapse/pull/10565
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue