2019-10-21 07:56:42 -04:00
|
|
|
#
|
2023-11-21 15:29:58 -05:00
|
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
|
|
#
|
2024-01-23 06:26:48 -05:00
|
|
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
|
|
# Copyright 2014-2016 OpenMarket Ltd
|
2023-11-21 15:29:58 -05:00
|
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as
|
|
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
|
|
# License, or (at your option) any later version.
|
|
|
|
#
|
|
|
|
# See the GNU Affero General Public License for more details:
|
|
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
|
|
#
|
|
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
|
|
#
|
|
|
|
# [This file includes modifications made by New Vector Limited]
|
2019-10-21 07:56:42 -04:00
|
|
|
#
|
|
|
|
#
|
2022-04-27 09:00:07 -04:00
|
|
|
import collections.abc
|
2019-10-21 07:56:42 -04:00
|
|
|
import logging
|
2023-09-19 15:26:44 -04:00
|
|
|
from typing import (
|
|
|
|
TYPE_CHECKING,
|
|
|
|
Any,
|
|
|
|
Collection,
|
|
|
|
Dict,
|
2024-01-10 10:11:59 -05:00
|
|
|
FrozenSet,
|
2023-09-19 15:26:44 -04:00
|
|
|
Iterable,
|
2023-10-11 13:24:56 -04:00
|
|
|
List,
|
2023-09-19 15:26:44 -04:00
|
|
|
Mapping,
|
2024-07-30 14:20:29 -04:00
|
|
|
MutableMapping,
|
2023-09-19 15:26:44 -04:00
|
|
|
Optional,
|
|
|
|
Set,
|
|
|
|
Tuple,
|
2024-01-10 09:31:35 -05:00
|
|
|
TypeVar,
|
|
|
|
Union,
|
2023-10-11 13:24:56 -04:00
|
|
|
cast,
|
2024-01-10 09:31:35 -05:00
|
|
|
overload,
|
2023-09-19 15:26:44 -04:00
|
|
|
)
|
2022-03-31 13:38:09 -04:00
|
|
|
|
2022-05-26 05:48:12 -04:00
|
|
|
import attr
|
|
|
|
|
2024-07-11 11:00:44 -04:00
|
|
|
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
2020-01-31 05:28:15 -05:00
|
|
|
from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
|
|
|
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
2020-07-30 07:20:41 -04:00
|
|
|
from synapse.events import EventBase
|
2022-04-12 09:23:43 -04:00
|
|
|
from synapse.events.snapshot import EventContext
|
2022-09-07 12:41:52 -04:00
|
|
|
from synapse.logging.opentracing import trace
|
2022-12-19 09:57:51 -05:00
|
|
|
from synapse.replication.tcp.streams import UnPartialStatedEventStream
|
|
|
|
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
|
2019-10-21 07:56:42 -04:00
|
|
|
from synapse.storage._base import SQLBaseStore
|
2021-12-13 12:05:00 -05:00
|
|
|
from synapse.storage.database import (
|
|
|
|
DatabasePool,
|
|
|
|
LoggingDatabaseConnection,
|
|
|
|
LoggingTransaction,
|
2022-05-26 05:48:12 -04:00
|
|
|
make_in_list_sql_clause,
|
2021-12-13 12:05:00 -05:00
|
|
|
)
|
2020-08-05 16:38:57 -04:00
|
|
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
|
|
|
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
2024-01-10 10:11:59 -05:00
|
|
|
from synapse.types import JsonDict, JsonMapping, StateKey, StateMap, StrCollection
|
2022-12-12 11:19:30 -05:00
|
|
|
from synapse.types.state import StateFilter
|
2019-12-20 05:48:24 -05:00
|
|
|
from synapse.util.caches import intern_string
|
2019-10-21 07:56:42 -04:00
|
|
|
from synapse.util.caches.descriptors import cached, cachedList
|
2022-09-07 07:03:32 -04:00
|
|
|
from synapse.util.cancellation import cancellable
|
2022-05-26 05:48:12 -04:00
|
|
|
from synapse.util.iterutils import batch_iter
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2021-10-22 13:15:41 -04:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
from synapse.server import HomeServer
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2024-01-10 09:31:35 -05:00
|
|
|
_T = TypeVar("_T")
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
MAX_STATE_DELTA_HOPS = 100
|
|
|
|
|
|
|
|
|
2024-07-30 14:20:29 -04:00
|
|
|
# Freeze so it's immutable and we can use it as a cache value
|
|
|
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
|
|
class Sentinel:
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
ROOM_UNKNOWN_SENTINEL = Sentinel()
|
|
|
|
|
|
|
|
|
2022-05-26 05:48:12 -04:00
|
|
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
|
|
class EventMetadata:
|
|
|
|
"""Returned by `get_metadata_for_events`"""
|
|
|
|
|
|
|
|
room_id: str
|
|
|
|
event_type: str
|
|
|
|
state_key: Optional[str]
|
2022-06-01 07:29:51 -04:00
|
|
|
rejection_reason: Optional[str]
|
2022-05-26 05:48:12 -04:00
|
|
|
|
|
|
|
|
2022-03-02 05:35:34 -05:00
|
|
|
def _retrieve_and_check_room_version(room_id: str, room_version_id: str) -> RoomVersion:
|
|
|
|
v = KNOWN_ROOM_VERSIONS.get(room_version_id)
|
|
|
|
if not v:
|
|
|
|
raise UnsupportedRoomVersionError(
|
|
|
|
"Room %s uses a room version %s which is no longer supported"
|
|
|
|
% (room_id, room_version_id)
|
|
|
|
)
|
|
|
|
return v
|
|
|
|
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
# this inherits from EventsWorkerStore because it calls self.get_events
|
2019-12-20 05:48:24 -05:00
|
|
|
class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
2019-10-21 07:56:42 -04:00
|
|
|
"""The parts of StateGroupStore that can be called from workers."""
|
|
|
|
|
2021-12-13 12:05:00 -05:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
database: DatabasePool,
|
|
|
|
db_conn: LoggingDatabaseConnection,
|
|
|
|
hs: "HomeServer",
|
|
|
|
):
|
2020-09-18 09:56:44 -04:00
|
|
|
super().__init__(database, db_conn, hs)
|
2022-12-14 09:47:11 -05:00
|
|
|
self._instance_name: str = hs.get_instance_name()
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2022-12-19 09:57:51 -05:00
|
|
|
def process_replication_rows(
|
|
|
|
self,
|
|
|
|
stream_name: str,
|
|
|
|
instance_name: str,
|
|
|
|
token: int,
|
|
|
|
rows: Iterable[Any],
|
|
|
|
) -> None:
|
|
|
|
if stream_name == UnPartialStatedEventStream.NAME:
|
|
|
|
for row in rows:
|
|
|
|
assert isinstance(row, UnPartialStatedEventStreamRow)
|
|
|
|
self._get_state_group_for_event.invalidate((row.event_id,))
|
2023-01-22 16:10:11 -05:00
|
|
|
self.is_partial_state_event.invalidate((row.event_id,))
|
2022-12-19 09:57:51 -05:00
|
|
|
|
|
|
|
super().process_replication_rows(stream_name, instance_name, token, rows)
|
|
|
|
|
2020-01-31 05:28:15 -05:00
|
|
|
async def get_room_version(self, room_id: str) -> RoomVersion:
|
|
|
|
"""Get the room_version of a given room
|
|
|
|
Raises:
|
|
|
|
NotFoundError: if the room is unknown
|
2021-07-28 11:46:37 -04:00
|
|
|
UnsupportedRoomVersionError: if the room uses an unknown room version.
|
|
|
|
Typically this happens if support for the room's version has been
|
|
|
|
removed from Synapse.
|
|
|
|
"""
|
2022-03-02 05:35:34 -05:00
|
|
|
room_version_id = await self.get_room_version_id(room_id)
|
|
|
|
return _retrieve_and_check_room_version(room_id, room_version_id)
|
2020-01-31 05:28:15 -05:00
|
|
|
|
2021-07-28 11:46:37 -04:00
|
|
|
def get_room_version_txn(
|
|
|
|
self, txn: LoggingTransaction, room_id: str
|
|
|
|
) -> RoomVersion:
|
|
|
|
"""Get the room_version of a given room
|
|
|
|
Args:
|
|
|
|
txn: Transaction object
|
|
|
|
room_id: The room_id of the room you are trying to get the version for
|
|
|
|
Raises:
|
|
|
|
NotFoundError: if the room is unknown
|
2020-01-31 05:28:15 -05:00
|
|
|
UnsupportedRoomVersionError: if the room uses an unknown room version.
|
|
|
|
Typically this happens if support for the room's version has been
|
|
|
|
removed from Synapse.
|
|
|
|
"""
|
2021-07-28 11:46:37 -04:00
|
|
|
room_version_id = self.get_room_version_id_txn(txn, room_id)
|
2022-03-02 05:35:34 -05:00
|
|
|
return _retrieve_and_check_room_version(room_id, room_version_id)
|
2020-01-31 05:28:15 -05:00
|
|
|
|
2020-01-27 09:30:57 -05:00
|
|
|
@cached(max_entries=10000)
|
2020-01-31 05:06:21 -05:00
|
|
|
async def get_room_version_id(self, room_id: str) -> str:
|
2019-10-21 07:56:42 -04:00
|
|
|
"""Get the room_version of a given room
|
2021-07-28 11:46:37 -04:00
|
|
|
Raises:
|
|
|
|
NotFoundError: if the room is unknown
|
|
|
|
"""
|
|
|
|
return await self.db_pool.runInteraction(
|
|
|
|
"get_room_version_id_txn",
|
|
|
|
self.get_room_version_id_txn,
|
|
|
|
room_id,
|
|
|
|
)
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2021-07-28 11:46:37 -04:00
|
|
|
def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
|
|
|
|
"""Get the room_version of a given room
|
|
|
|
Args:
|
|
|
|
txn: Transaction object
|
|
|
|
room_id: The room_id of the room you are trying to get the version for
|
2019-10-21 07:56:42 -04:00
|
|
|
Raises:
|
2020-01-27 09:30:57 -05:00
|
|
|
NotFoundError: if the room is unknown
|
2019-10-21 07:56:42 -04:00
|
|
|
"""
|
2020-01-27 09:30:57 -05:00
|
|
|
|
|
|
|
# We really should have an entry in the rooms table for every room we
|
2022-06-07 07:44:31 -04:00
|
|
|
# care about, but let's be a bit paranoid.
|
2021-07-28 11:46:37 -04:00
|
|
|
room_version = self.db_pool.simple_select_one_onecol_txn(
|
|
|
|
txn,
|
2020-01-27 09:30:57 -05:00
|
|
|
table="rooms",
|
|
|
|
keyvalues={"room_id": room_id},
|
|
|
|
retcol="room_version",
|
|
|
|
allow_none=True,
|
|
|
|
)
|
|
|
|
|
2021-07-28 11:46:37 -04:00
|
|
|
if room_version is None:
|
2022-04-18 14:41:55 -04:00
|
|
|
raise NotFoundError("Could not find room_version for %s" % (room_id,))
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2021-07-28 11:46:37 -04:00
|
|
|
return room_version
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2022-09-07 12:41:52 -04:00
|
|
|
@trace
|
2022-05-26 05:48:12 -04:00
|
|
|
async def get_metadata_for_events(
|
|
|
|
self, event_ids: Collection[str]
|
|
|
|
) -> Dict[str, EventMetadata]:
|
|
|
|
"""Get some metadata (room_id, type, state_key) for the given events.
|
|
|
|
|
|
|
|
This method is a faster alternative than fetching the full events from
|
|
|
|
the DB, and should be used when the full event is not needed.
|
|
|
|
|
|
|
|
Returns metadata for rejected and redacted events. Events that have not
|
|
|
|
been persisted are omitted from the returned dict.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def get_metadata_for_events_txn(
|
|
|
|
txn: LoggingTransaction,
|
|
|
|
batch_ids: Collection[str],
|
|
|
|
) -> Dict[str, EventMetadata]:
|
|
|
|
clause, args = make_in_list_sql_clause(
|
|
|
|
self.database_engine, "e.event_id", batch_ids
|
|
|
|
)
|
|
|
|
|
|
|
|
sql = f"""
|
2022-06-01 07:29:51 -04:00
|
|
|
SELECT e.event_id, e.room_id, e.type, se.state_key, r.reason
|
|
|
|
FROM events AS e
|
2022-05-30 05:51:09 -04:00
|
|
|
LEFT JOIN state_events se USING (event_id)
|
2022-06-01 07:29:51 -04:00
|
|
|
LEFT JOIN rejections r USING (event_id)
|
2022-05-26 05:48:12 -04:00
|
|
|
WHERE {clause}
|
|
|
|
"""
|
|
|
|
|
|
|
|
txn.execute(sql, args)
|
|
|
|
return {
|
|
|
|
event_id: EventMetadata(
|
2022-06-01 07:29:51 -04:00
|
|
|
room_id=room_id,
|
|
|
|
event_type=event_type,
|
|
|
|
state_key=state_key,
|
|
|
|
rejection_reason=rejection_reason,
|
2022-05-26 05:48:12 -04:00
|
|
|
)
|
2022-06-01 07:29:51 -04:00
|
|
|
for event_id, room_id, event_type, state_key, rejection_reason in txn
|
2022-05-26 05:48:12 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
result_map: Dict[str, EventMetadata] = {}
|
|
|
|
for batch_ids in batch_iter(event_ids, 1000):
|
|
|
|
result_map.update(
|
|
|
|
await self.db_pool.runInteraction(
|
|
|
|
"get_metadata_for_events",
|
|
|
|
get_metadata_for_events_txn,
|
|
|
|
batch_ids=batch_ids,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
return result_map
|
|
|
|
|
2022-03-31 13:38:09 -04:00
|
|
|
async def get_room_predecessor(self, room_id: str) -> Optional[JsonMapping]:
|
2019-12-11 08:07:25 -05:00
|
|
|
"""Get the predecessor of an upgraded room if it exists.
|
2019-10-21 07:56:42 -04:00
|
|
|
Otherwise return None.
|
|
|
|
|
|
|
|
Args:
|
2020-07-30 07:20:41 -04:00
|
|
|
room_id: The room ID.
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
Returns:
|
2020-07-30 07:20:41 -04:00
|
|
|
A dictionary containing the structure of the predecessor
|
|
|
|
field from the room's create event. The structure is subject to other servers,
|
|
|
|
but it is expected to be:
|
|
|
|
* room_id (str): The room ID of the predecessor room
|
|
|
|
* event_id (str): The ID of the tombstone event in the predecessor room
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2020-07-30 07:20:41 -04:00
|
|
|
None if a predecessor key is not found, or is not a dictionary.
|
2019-12-11 08:07:25 -05:00
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
Raises:
|
2019-12-11 08:07:25 -05:00
|
|
|
NotFoundError if the given room is unknown
|
2019-10-21 07:56:42 -04:00
|
|
|
"""
|
|
|
|
# Retrieve the room's create event
|
2020-07-30 07:20:41 -04:00
|
|
|
create_event = await self.get_create_event_for_room(room_id)
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2019-12-11 08:07:25 -05:00
|
|
|
# Retrieve the predecessor key of the create event
|
|
|
|
predecessor = create_event.content.get("predecessor", None)
|
|
|
|
|
|
|
|
# Ensure the key is a dictionary
|
2022-04-27 09:00:07 -04:00
|
|
|
if not isinstance(predecessor, collections.abc.Mapping):
|
2019-12-11 08:07:25 -05:00
|
|
|
return None
|
|
|
|
|
2022-03-31 13:38:09 -04:00
|
|
|
# The keys must be strings since the data is JSON.
|
2019-12-11 08:07:25 -05:00
|
|
|
return predecessor
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2020-07-30 07:20:41 -04:00
|
|
|
async def get_create_event_for_room(self, room_id: str) -> EventBase:
|
2019-10-21 07:56:42 -04:00
|
|
|
"""Get the create state event for a room.
|
|
|
|
|
|
|
|
Args:
|
2020-07-30 07:20:41 -04:00
|
|
|
room_id: The room ID.
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
Returns:
|
2020-07-30 07:20:41 -04:00
|
|
|
The room creation event.
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
Raises:
|
|
|
|
NotFoundError if the room is unknown
|
|
|
|
"""
|
2022-06-01 11:02:53 -04:00
|
|
|
state_ids = await self.get_partial_current_state_ids(room_id)
|
2022-01-04 11:10:05 -05:00
|
|
|
|
|
|
|
if not state_ids:
|
|
|
|
raise NotFoundError(f"Current state for room {room_id} is empty")
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
create_id = state_ids.get((EventTypes.Create, ""))
|
|
|
|
|
|
|
|
# If we can't find the create event, assume we've hit a dead end
|
|
|
|
if not create_id:
|
2022-01-04 11:10:05 -05:00
|
|
|
raise NotFoundError(f"No create event in current state for room {room_id}")
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
# Retrieve the room's create event and return
|
2020-07-30 07:20:41 -04:00
|
|
|
create_event = await self.get_event(create_id)
|
2019-10-21 07:56:42 -04:00
|
|
|
return create_event
|
|
|
|
|
2024-07-11 11:00:44 -04:00
|
|
|
@cached(max_entries=10000)
|
|
|
|
async def get_room_type(self, room_id: str) -> Optional[str]:
|
2024-07-30 14:20:29 -04:00
|
|
|
raise NotImplementedError()
|
2024-07-11 11:00:44 -04:00
|
|
|
|
|
|
|
@cachedList(cached_method_name="get_room_type", list_name="room_ids")
|
|
|
|
async def bulk_get_room_type(
|
|
|
|
self, room_ids: Set[str]
|
2024-07-30 14:20:29 -04:00
|
|
|
) -> Mapping[str, Union[Optional[str], Sentinel]]:
|
2024-07-11 11:00:44 -04:00
|
|
|
"""
|
2024-07-30 14:20:29 -04:00
|
|
|
Bulk fetch room types for the given rooms (via current state).
|
|
|
|
|
|
|
|
Since this function is cached, any missing values would be cached as `None`. In
|
|
|
|
order to distinguish between an unencrypted room that has `None` encryption and
|
|
|
|
a room that is unknown to the server where we might want to omit the value
|
|
|
|
(which would make it cached as `None`), instead we use the sentinel value
|
|
|
|
`ROOM_UNKNOWN_SENTINEL`.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
A mapping from room ID to the room's type (`None` is a valid room type).
|
|
|
|
Rooms unknown to this server will return `ROOM_UNKNOWN_SENTINEL`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def txn(
|
|
|
|
txn: LoggingTransaction,
|
|
|
|
) -> MutableMapping[str, Union[Optional[str], Sentinel]]:
|
|
|
|
clause, args = make_in_list_sql_clause(
|
|
|
|
txn.database_engine, "room_id", room_ids
|
|
|
|
)
|
|
|
|
|
|
|
|
# We can't rely on `room_stats_state.room_type` if the server has left the
|
|
|
|
# room because the `room_id` will still be in the table but everything will
|
|
|
|
# be set to `None` but `None` is a valid room type value. We join against
|
|
|
|
# the `room_stats_current` table which keeps track of the
|
|
|
|
# `current_state_events` count (and a proxy value `local_users_in_room`
|
|
|
|
# which can used to assume the server is participating in the room and has
|
|
|
|
# current state) to ensure that the data in `room_stats_state` is up-to-date
|
|
|
|
# with the current state.
|
|
|
|
#
|
|
|
|
# FIXME: Use `room_stats_current.current_state_events` instead of
|
|
|
|
# `room_stats_current.local_users_in_room` once
|
|
|
|
# https://github.com/element-hq/synapse/issues/17457 is fixed.
|
|
|
|
sql = f"""
|
|
|
|
SELECT room_id, room_type
|
|
|
|
FROM room_stats_state
|
|
|
|
INNER JOIN room_stats_current USING (room_id)
|
|
|
|
WHERE
|
|
|
|
{clause}
|
|
|
|
AND local_users_in_room > 0
|
|
|
|
"""
|
|
|
|
|
|
|
|
txn.execute(sql, args)
|
|
|
|
|
|
|
|
room_id_to_type_map = {}
|
|
|
|
for row in txn:
|
|
|
|
room_id_to_type_map[row[0]] = row[1]
|
|
|
|
|
|
|
|
return room_id_to_type_map
|
2024-07-11 11:00:44 -04:00
|
|
|
|
2024-07-30 14:20:29 -04:00
|
|
|
results = await self.db_pool.runInteraction(
|
|
|
|
"bulk_get_room_type",
|
|
|
|
txn,
|
2024-07-11 11:00:44 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
# If we haven't updated `room_stats_state` with the room yet, query the
|
|
|
|
# create events directly. This should happen only rarely so we don't
|
|
|
|
# mind if we do this in a loop.
|
|
|
|
for room_id in room_ids - results.keys():
|
2024-07-30 14:20:29 -04:00
|
|
|
try:
|
|
|
|
create_event = await self.get_create_event_for_room(room_id)
|
|
|
|
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
|
|
|
|
results[room_id] = room_type
|
|
|
|
except NotFoundError:
|
|
|
|
# We use the sentinel value to distinguish between `None` which is a
|
|
|
|
# valid room type and a room that is unknown to the server so the value
|
|
|
|
# is just unset.
|
|
|
|
results[room_id] = ROOM_UNKNOWN_SENTINEL
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
@cached(max_entries=10000)
|
|
|
|
async def get_room_encryption(self, room_id: str) -> Optional[str]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@cachedList(cached_method_name="get_room_encryption", list_name="room_ids")
|
|
|
|
async def bulk_get_room_encryption(
|
|
|
|
self, room_ids: Set[str]
|
|
|
|
) -> Mapping[str, Union[Optional[str], Sentinel]]:
|
|
|
|
"""
|
|
|
|
Bulk fetch room encryption for the given rooms (via current state).
|
|
|
|
|
|
|
|
Since this function is cached, any missing values would be cached as `None`. In
|
|
|
|
order to distinguish between an unencrypted room that has `None` encryption and
|
|
|
|
a room that is unknown to the server where we might want to omit the value
|
|
|
|
(which would make it cached as `None`), instead we use the sentinel value
|
|
|
|
`ROOM_UNKNOWN_SENTINEL`.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
A mapping from room ID to the room's encryption algorithm if the room is
|
|
|
|
encrypted, otherwise `None`. Rooms unknown to this server will return
|
|
|
|
`ROOM_UNKNOWN_SENTINEL`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def txn(
|
|
|
|
txn: LoggingTransaction,
|
|
|
|
) -> MutableMapping[str, Union[Optional[str], Sentinel]]:
|
|
|
|
clause, args = make_in_list_sql_clause(
|
|
|
|
txn.database_engine, "room_id", room_ids
|
|
|
|
)
|
|
|
|
|
|
|
|
# We can't rely on `room_stats_state.encryption` if the server has left the
|
|
|
|
# room because the `room_id` will still be in the table but everything will
|
|
|
|
# be set to `None` but `None` is a valid encryption value. We join against
|
|
|
|
# the `room_stats_current` table which keeps track of the
|
|
|
|
# `current_state_events` count (and a proxy value `local_users_in_room`
|
|
|
|
# which can used to assume the server is participating in the room and has
|
|
|
|
# current state) to ensure that the data in `room_stats_state` is up-to-date
|
|
|
|
# with the current state.
|
|
|
|
#
|
|
|
|
# FIXME: Use `room_stats_current.current_state_events` instead of
|
|
|
|
# `room_stats_current.local_users_in_room` once
|
|
|
|
# https://github.com/element-hq/synapse/issues/17457 is fixed.
|
|
|
|
sql = f"""
|
|
|
|
SELECT room_id, encryption
|
|
|
|
FROM room_stats_state
|
|
|
|
INNER JOIN room_stats_current USING (room_id)
|
|
|
|
WHERE
|
|
|
|
{clause}
|
|
|
|
AND local_users_in_room > 0
|
|
|
|
"""
|
|
|
|
|
|
|
|
txn.execute(sql, args)
|
|
|
|
|
|
|
|
room_id_to_encryption_map = {}
|
|
|
|
for row in txn:
|
|
|
|
room_id_to_encryption_map[row[0]] = row[1]
|
|
|
|
|
|
|
|
return room_id_to_encryption_map
|
|
|
|
|
|
|
|
results = await self.db_pool.runInteraction(
|
|
|
|
"bulk_get_room_encryption",
|
|
|
|
txn,
|
|
|
|
)
|
|
|
|
|
|
|
|
# If we haven't updated `room_stats_state` with the room yet, query the state
|
|
|
|
# directly. This should happen only rarely so we don't mind if we do this in a
|
|
|
|
# loop.
|
|
|
|
encryption_event_ids: List[str] = []
|
|
|
|
for room_id in room_ids - results.keys():
|
|
|
|
state_map = await self.get_partial_filtered_current_state_ids(
|
|
|
|
room_id,
|
|
|
|
state_filter=StateFilter.from_types(
|
|
|
|
[
|
|
|
|
(EventTypes.Create, ""),
|
|
|
|
(EventTypes.RoomEncryption, ""),
|
|
|
|
]
|
|
|
|
),
|
|
|
|
)
|
|
|
|
# We can use the create event as a canary to tell whether the server has
|
|
|
|
# seen the room before
|
|
|
|
create_event_id = state_map.get((EventTypes.Create, ""))
|
|
|
|
encryption_event_id = state_map.get((EventTypes.RoomEncryption, ""))
|
|
|
|
|
|
|
|
if create_event_id is None:
|
|
|
|
# We use the sentinel value to distinguish between `None` which is a
|
|
|
|
# valid room type and a room that is unknown to the server so the value
|
|
|
|
# is just unset.
|
|
|
|
results[room_id] = ROOM_UNKNOWN_SENTINEL
|
|
|
|
continue
|
|
|
|
|
|
|
|
if encryption_event_id is None:
|
|
|
|
results[room_id] = None
|
|
|
|
else:
|
|
|
|
encryption_event_ids.append(encryption_event_id)
|
|
|
|
|
|
|
|
encryption_event_map = await self.get_events(encryption_event_ids)
|
|
|
|
|
|
|
|
for encryption_event_id in encryption_event_ids:
|
|
|
|
encryption_event = encryption_event_map.get(encryption_event_id)
|
|
|
|
# If the curent state says there is an encryption event, we should have it
|
|
|
|
# in the database.
|
|
|
|
assert encryption_event is not None
|
|
|
|
|
|
|
|
results[encryption_event.room_id] = encryption_event.content.get(
|
|
|
|
EventContentFields.ENCRYPTION_ALGORITHM
|
|
|
|
)
|
2024-07-11 11:00:44 -04:00
|
|
|
|
|
|
|
return results
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
@cached(max_entries=100000, iterable=True)
|
2022-06-01 11:02:53 -04:00
|
|
|
async def get_partial_current_state_ids(self, room_id: str) -> StateMap[str]:
|
2019-10-21 07:56:42 -04:00
|
|
|
"""Get the current state event ids for a room based on the
|
|
|
|
current_state_events table.
|
|
|
|
|
2022-06-01 11:02:53 -04:00
|
|
|
This may be the partial state if we're lazy joining the room.
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
Args:
|
2020-08-28 09:37:55 -04:00
|
|
|
room_id: The room to get the state IDs of.
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
Returns:
|
2020-08-28 09:37:55 -04:00
|
|
|
The current state of the room.
|
2019-10-21 07:56:42 -04:00
|
|
|
"""
|
|
|
|
|
2022-04-12 06:54:00 -04:00
|
|
|
def _get_current_state_ids_txn(txn: LoggingTransaction) -> StateMap[str]:
|
2019-10-21 07:56:42 -04:00
|
|
|
txn.execute(
|
|
|
|
"""SELECT type, state_key, event_id FROM current_state_events
|
|
|
|
WHERE room_id = ?
|
|
|
|
""",
|
|
|
|
(room_id,),
|
|
|
|
)
|
|
|
|
|
2020-05-15 14:12:03 -04:00
|
|
|
return {(intern_string(r[0]), intern_string(r[1])): r[2] for r in txn}
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2020-08-28 09:37:55 -04:00
|
|
|
return await self.db_pool.runInteraction(
|
2022-06-01 11:02:53 -04:00
|
|
|
"get_partial_current_state_ids", _get_current_state_ids_txn
|
2019-12-04 08:52:46 -05:00
|
|
|
)
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2024-01-10 10:11:59 -05:00
|
|
|
async def check_if_events_in_current_state(
|
|
|
|
self, event_ids: StrCollection
|
|
|
|
) -> FrozenSet[str]:
|
|
|
|
"""Checks and returns which of the given events is part of the current state."""
|
|
|
|
rows = await self.db_pool.simple_select_many_batch(
|
|
|
|
table="current_state_events",
|
|
|
|
column="event_id",
|
|
|
|
iterable=event_ids,
|
|
|
|
retcols=("event_id",),
|
|
|
|
desc="check_if_events_in_current_state",
|
|
|
|
)
|
|
|
|
|
|
|
|
return frozenset(event_id for event_id, in rows)
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
# FIXME: how should this be cached?
|
2022-09-07 07:03:32 -04:00
|
|
|
@cancellable
|
2022-06-01 11:02:53 -04:00
|
|
|
async def get_partial_filtered_current_state_ids(
|
2021-04-08 17:38:54 -04:00
|
|
|
self, room_id: str, state_filter: Optional[StateFilter] = None
|
2020-08-28 09:37:55 -04:00
|
|
|
) -> StateMap[str]:
|
2019-10-21 07:56:42 -04:00
|
|
|
"""Get the current state event of a given type for a room based on the
|
|
|
|
current_state_events table. This may not be as up-to-date as the result
|
|
|
|
of doing a fresh state resolution as per state_handler.get_current_state
|
|
|
|
|
2022-06-01 11:02:53 -04:00
|
|
|
This may be the partial state if we're lazy joining the room.
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
Args:
|
2020-01-16 08:31:22 -05:00
|
|
|
room_id
|
|
|
|
state_filter: The state filter used to fetch state
|
2019-10-21 07:56:42 -04:00
|
|
|
from the database.
|
|
|
|
|
|
|
|
Returns:
|
2020-08-28 09:37:55 -04:00
|
|
|
Map from type/state_key to event ID.
|
2019-10-21 07:56:42 -04:00
|
|
|
"""
|
|
|
|
|
2021-04-08 17:38:54 -04:00
|
|
|
where_clause, where_args = (
|
|
|
|
state_filter or StateFilter.all()
|
|
|
|
).make_sql_filter_clause()
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
if not where_clause:
|
|
|
|
# We delegate to the cached version
|
2022-06-01 11:02:53 -04:00
|
|
|
return await self.get_partial_current_state_ids(room_id)
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2022-03-28 14:11:14 -04:00
|
|
|
def _get_filtered_current_state_ids_txn(
|
|
|
|
txn: LoggingTransaction,
|
|
|
|
) -> StateMap[str]:
|
2024-01-10 09:31:35 -05:00
|
|
|
results = StateMapWrapper(state_filter=state_filter or StateFilter.all())
|
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
sql = """
|
|
|
|
SELECT type, state_key, event_id FROM current_state_events
|
|
|
|
WHERE room_id = ?
|
|
|
|
"""
|
|
|
|
|
|
|
|
if where_clause:
|
|
|
|
sql += " AND (%s)" % (where_clause,)
|
|
|
|
|
|
|
|
args = [room_id]
|
|
|
|
args.extend(where_args)
|
|
|
|
txn.execute(sql, args)
|
|
|
|
for row in txn:
|
|
|
|
typ, state_key, event_id = row
|
|
|
|
key = (intern_string(typ), intern_string(state_key))
|
|
|
|
results[key] = event_id
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
2020-08-28 09:37:55 -04:00
|
|
|
return await self.db_pool.runInteraction(
|
2019-10-21 07:56:42 -04:00
|
|
|
"get_filtered_current_state_ids", _get_filtered_current_state_ids_txn
|
|
|
|
)
|
|
|
|
|
|
|
|
@cached(max_entries=50000)
|
2020-08-26 07:19:32 -04:00
|
|
|
async def _get_state_group_for_event(self, event_id: str) -> Optional[int]:
|
|
|
|
return await self.db_pool.simple_select_one_onecol(
|
2019-10-21 07:56:42 -04:00
|
|
|
table="event_to_state_groups",
|
|
|
|
keyvalues={"event_id": event_id},
|
|
|
|
retcol="state_group",
|
|
|
|
allow_none=True,
|
|
|
|
desc="_get_state_group_for_event",
|
|
|
|
)
|
|
|
|
|
|
|
|
@cachedList(
|
|
|
|
cached_method_name="_get_state_group_for_event",
|
|
|
|
list_name="event_ids",
|
|
|
|
num_args=1,
|
|
|
|
)
|
2022-03-31 13:38:09 -04:00
|
|
|
async def _get_state_group_for_events(
|
2022-04-01 08:01:49 -04:00
|
|
|
self, event_ids: Collection[str]
|
2023-09-19 15:26:44 -04:00
|
|
|
) -> Mapping[str, int]:
|
2022-04-01 08:01:49 -04:00
|
|
|
"""Returns mapping event_id -> state_group.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
RuntimeError if the state is unknown at any of the given events
|
|
|
|
"""
|
2023-10-11 13:24:56 -04:00
|
|
|
rows = cast(
|
|
|
|
List[Tuple[str, int]],
|
|
|
|
await self.db_pool.simple_select_many_batch(
|
|
|
|
table="event_to_state_groups",
|
|
|
|
column="event_id",
|
|
|
|
iterable=event_ids,
|
|
|
|
keyvalues={},
|
|
|
|
retcols=("event_id", "state_group"),
|
|
|
|
desc="_get_state_group_for_events",
|
|
|
|
),
|
2019-10-21 07:56:42 -04:00
|
|
|
)
|
|
|
|
|
2023-10-11 13:24:56 -04:00
|
|
|
res = dict(rows)
|
2022-04-01 08:01:49 -04:00
|
|
|
for e in event_ids:
|
|
|
|
if e not in res:
|
|
|
|
raise RuntimeError("No state group for unknown or outlier event %s" % e)
|
|
|
|
return res
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2020-07-30 07:20:41 -04:00
|
|
|
async def get_referenced_state_groups(
|
|
|
|
self, state_groups: Iterable[int]
|
|
|
|
) -> Set[int]:
|
2019-10-30 11:12:49 -04:00
|
|
|
"""Check if the state groups are referenced by events.
|
|
|
|
|
|
|
|
Args:
|
2020-07-30 07:20:41 -04:00
|
|
|
state_groups
|
2019-10-30 11:12:49 -04:00
|
|
|
|
|
|
|
Returns:
|
2020-07-30 07:20:41 -04:00
|
|
|
The subset of state groups that are referenced.
|
2019-10-30 11:12:49 -04:00
|
|
|
"""
|
|
|
|
|
2023-10-11 13:24:56 -04:00
|
|
|
rows = cast(
|
|
|
|
List[Tuple[int]],
|
|
|
|
await self.db_pool.simple_select_many_batch(
|
|
|
|
table="event_to_state_groups",
|
|
|
|
column="state_group",
|
|
|
|
iterable=state_groups,
|
|
|
|
keyvalues={},
|
|
|
|
retcols=("DISTINCT state_group",),
|
|
|
|
desc="get_referenced_state_groups",
|
|
|
|
),
|
2019-10-30 11:12:49 -04:00
|
|
|
)
|
|
|
|
|
2023-10-11 13:24:56 -04:00
|
|
|
return {row[0] for row in rows}
|
2019-10-30 11:12:49 -04:00
|
|
|
|
2022-04-12 09:23:43 -04:00
|
|
|
async def update_state_for_partial_state_event(
|
|
|
|
self,
|
|
|
|
event: EventBase,
|
|
|
|
context: EventContext,
|
|
|
|
) -> None:
|
|
|
|
"""Update the state group for a partial state event"""
|
2022-12-14 09:47:11 -05:00
|
|
|
async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
|
|
|
|
await self.db_pool.runInteraction(
|
|
|
|
"update_state_for_partial_state_event",
|
|
|
|
self._update_state_for_partial_state_event_txn,
|
|
|
|
event,
|
|
|
|
context,
|
|
|
|
un_partial_state_event_stream_id,
|
|
|
|
)
|
2022-04-12 09:23:43 -04:00
|
|
|
|
|
|
|
def _update_state_for_partial_state_event_txn(
|
|
|
|
self,
|
2022-04-27 08:05:00 -04:00
|
|
|
txn: LoggingTransaction,
|
2022-04-12 09:23:43 -04:00
|
|
|
event: EventBase,
|
|
|
|
context: EventContext,
|
2022-12-14 09:47:11 -05:00
|
|
|
un_partial_state_event_stream_id: int,
|
2022-04-27 08:05:00 -04:00
|
|
|
) -> None:
|
2022-04-12 09:23:43 -04:00
|
|
|
# we shouldn't have any outliers here
|
|
|
|
assert not event.internal_metadata.is_outlier()
|
|
|
|
|
|
|
|
# anything that was rejected should have the same state as its
|
|
|
|
# predecessor.
|
|
|
|
if context.rejected:
|
2022-08-01 06:20:05 -04:00
|
|
|
state_group = context.state_group_before_event
|
|
|
|
else:
|
|
|
|
state_group = context.state_group
|
2022-04-12 09:23:43 -04:00
|
|
|
|
|
|
|
self.db_pool.simple_update_txn(
|
|
|
|
txn,
|
|
|
|
table="event_to_state_groups",
|
|
|
|
keyvalues={"event_id": event.event_id},
|
2022-08-01 06:20:05 -04:00
|
|
|
updatevalues={"state_group": state_group},
|
2022-04-12 09:23:43 -04:00
|
|
|
)
|
|
|
|
|
2022-08-11 06:42:24 -04:00
|
|
|
# the event may now be rejected where it was not before, or vice versa,
|
|
|
|
# in which case we need to update the rejected flags.
|
2022-12-14 09:47:11 -05:00
|
|
|
rejection_status_changed = bool(context.rejected) != (
|
|
|
|
event.rejected_reason is not None
|
|
|
|
)
|
|
|
|
if rejection_status_changed:
|
2022-08-11 06:42:24 -04:00
|
|
|
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
|
|
|
|
|
2022-04-12 09:23:43 -04:00
|
|
|
self.db_pool.simple_delete_one_txn(
|
|
|
|
txn,
|
|
|
|
table="partial_state_events",
|
|
|
|
keyvalues={"event_id": event.event_id},
|
|
|
|
)
|
|
|
|
|
2022-04-21 02:42:03 -04:00
|
|
|
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
|
2022-04-12 09:23:43 -04:00
|
|
|
txn.call_after(
|
|
|
|
self._get_state_group_for_event.prefill,
|
|
|
|
(event.event_id,),
|
2022-08-01 06:20:05 -04:00
|
|
|
state_group,
|
2022-04-12 09:23:43 -04:00
|
|
|
)
|
|
|
|
|
2022-12-14 09:47:11 -05:00
|
|
|
self.db_pool.simple_insert_txn(
|
|
|
|
txn,
|
|
|
|
"un_partial_stated_event_stream",
|
|
|
|
{
|
|
|
|
"stream_id": un_partial_state_event_stream_id,
|
|
|
|
"instance_name": self._instance_name,
|
|
|
|
"event_id": event.event_id,
|
|
|
|
"rejection_status_changed": rejection_status_changed,
|
|
|
|
},
|
|
|
|
)
|
2023-01-22 16:10:11 -05:00
|
|
|
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
2022-12-14 09:47:11 -05:00
|
|
|
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2020-01-30 12:17:44 -05:00
|
|
|
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
|
2019-10-21 07:56:42 -04:00
|
|
|
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
|
|
|
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
2020-01-30 12:17:44 -05:00
|
|
|
DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
|
2019-10-21 07:56:42 -04:00
|
|
|
|
2021-12-13 12:05:00 -05:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
database: DatabasePool,
|
|
|
|
db_conn: LoggingDatabaseConnection,
|
|
|
|
hs: "HomeServer",
|
|
|
|
):
|
2020-09-18 09:56:44 -04:00
|
|
|
super().__init__(database, db_conn, hs)
|
2019-12-20 05:48:24 -05:00
|
|
|
|
2022-03-28 14:11:14 -04:00
|
|
|
self.server_name: str = hs.hostname
|
2020-01-30 12:17:44 -05:00
|
|
|
|
2020-08-05 16:38:57 -04:00
|
|
|
self.db_pool.updates.register_background_index_update(
|
2019-10-21 07:56:42 -04:00
|
|
|
self.CURRENT_STATE_INDEX_UPDATE_NAME,
|
|
|
|
index_name="current_state_events_member_index",
|
|
|
|
table="current_state_events",
|
|
|
|
columns=["state_key"],
|
|
|
|
where_clause="type='m.room.member'",
|
|
|
|
)
|
2020-08-05 16:38:57 -04:00
|
|
|
self.db_pool.updates.register_background_index_update(
|
2019-10-21 07:56:42 -04:00
|
|
|
self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
|
|
|
|
index_name="event_to_state_groups_sg_index",
|
|
|
|
table="event_to_state_groups",
|
|
|
|
columns=["state_group"],
|
|
|
|
)
|
2020-08-05 16:38:57 -04:00
|
|
|
self.db_pool.updates.register_background_update_handler(
|
2020-01-30 12:17:44 -05:00
|
|
|
self.DELETE_CURRENT_STATE_UPDATE_NAME,
|
|
|
|
self._background_remove_left_rooms,
|
|
|
|
)
|
|
|
|
|
2022-03-28 14:11:14 -04:00
|
|
|
async def _background_remove_left_rooms(
|
|
|
|
self, progress: JsonDict, batch_size: int
|
|
|
|
) -> int:
|
2020-01-30 12:17:44 -05:00
|
|
|
"""Background update to delete rows from `current_state_events` and
|
|
|
|
`event_forward_extremities` tables of rooms that the server is no
|
|
|
|
longer joined to.
|
|
|
|
"""
|
|
|
|
|
|
|
|
last_room_id = progress.get("last_room_id", "")
|
|
|
|
|
2022-03-28 14:11:14 -04:00
|
|
|
def _background_remove_left_rooms_txn(
|
|
|
|
txn: LoggingTransaction,
|
|
|
|
) -> Tuple[bool, Set[str]]:
|
2020-07-15 13:33:03 -04:00
|
|
|
# get a batch of room ids to consider
|
2020-01-30 12:17:44 -05:00
|
|
|
sql = """
|
|
|
|
SELECT DISTINCT room_id FROM current_state_events
|
|
|
|
WHERE room_id > ? ORDER BY room_id LIMIT ?
|
|
|
|
"""
|
|
|
|
|
|
|
|
txn.execute(sql, (last_room_id, batch_size))
|
2020-02-21 07:15:07 -05:00
|
|
|
room_ids = [row[0] for row in txn]
|
2020-01-30 12:17:44 -05:00
|
|
|
if not room_ids:
|
|
|
|
return True, set()
|
|
|
|
|
2020-07-15 13:33:03 -04:00
|
|
|
###########################################################################
|
|
|
|
#
|
|
|
|
# exclude rooms where we have active members
|
|
|
|
|
2020-01-30 12:17:44 -05:00
|
|
|
sql = """
|
|
|
|
SELECT room_id
|
2020-07-15 13:33:03 -04:00
|
|
|
FROM local_current_membership
|
2020-01-30 12:17:44 -05:00
|
|
|
WHERE
|
|
|
|
room_id > ? AND room_id <= ?
|
|
|
|
AND membership = 'join'
|
|
|
|
GROUP BY room_id
|
|
|
|
"""
|
|
|
|
|
2020-07-15 13:33:03 -04:00
|
|
|
txn.execute(sql, (last_room_id, room_ids[-1]))
|
2020-02-21 07:15:07 -05:00
|
|
|
joined_room_ids = {row[0] for row in txn}
|
2020-07-15 13:33:03 -04:00
|
|
|
to_delete = set(room_ids) - joined_room_ids
|
|
|
|
|
|
|
|
###########################################################################
|
|
|
|
#
|
|
|
|
# exclude rooms which we are in the process of constructing; these otherwise
|
|
|
|
# qualify as "rooms with no local users", and would have their
|
|
|
|
# forward extremities cleaned up.
|
|
|
|
|
|
|
|
# the following query will return a list of rooms which have forward
|
|
|
|
# extremities that are *not* also the create event in the room - ie
|
|
|
|
# those that are not being created currently.
|
|
|
|
|
|
|
|
sql = """
|
|
|
|
SELECT DISTINCT efe.room_id
|
|
|
|
FROM event_forward_extremities efe
|
|
|
|
LEFT JOIN current_state_events cse ON
|
|
|
|
cse.event_id = efe.event_id
|
|
|
|
AND cse.type = 'm.room.create'
|
|
|
|
AND cse.state_key = ''
|
|
|
|
WHERE
|
|
|
|
cse.event_id IS NULL
|
|
|
|
AND efe.room_id > ? AND efe.room_id <= ?
|
|
|
|
"""
|
|
|
|
|
|
|
|
txn.execute(sql, (last_room_id, room_ids[-1]))
|
|
|
|
|
|
|
|
# build a set of those rooms within `to_delete` that do not appear in
|
|
|
|
# the above, leaving us with the rooms in `to_delete` that *are* being
|
|
|
|
# created.
|
|
|
|
creating_rooms = to_delete.difference(row[0] for row in txn)
|
|
|
|
logger.info("skipping rooms which are being created: %s", creating_rooms)
|
|
|
|
|
|
|
|
# now remove the rooms being created from the list of those to delete.
|
|
|
|
#
|
|
|
|
# (we could have just taken the intersection of `to_delete` with the result
|
|
|
|
# of the sql query, but it's useful to be able to log `creating_rooms`; and
|
|
|
|
# having done so, it's quicker to remove the (few) creating rooms from
|
|
|
|
# `to_delete` than it is to form the intersection with the (larger) list of
|
|
|
|
# not-creating-rooms)
|
|
|
|
|
|
|
|
to_delete -= creating_rooms
|
2020-01-30 12:17:44 -05:00
|
|
|
|
2020-07-15 13:33:03 -04:00
|
|
|
###########################################################################
|
|
|
|
#
|
|
|
|
# now clear the state for the rooms
|
2020-01-30 12:17:44 -05:00
|
|
|
|
2020-07-15 13:33:03 -04:00
|
|
|
logger.info("Deleting current state left rooms: %r", to_delete)
|
2020-01-30 12:55:34 -05:00
|
|
|
|
2020-01-30 12:17:44 -05:00
|
|
|
# First we get all users that we still think were joined to the
|
|
|
|
# room. This is so that we can mark those device lists as
|
|
|
|
# potentially stale, since there may have been a period where the
|
|
|
|
# server didn't share a room with the remote user and therefore may
|
|
|
|
# have missed any device updates.
|
2023-10-11 13:24:56 -04:00
|
|
|
rows = cast(
|
|
|
|
List[Tuple[str]],
|
|
|
|
self.db_pool.simple_select_many_txn(
|
|
|
|
txn,
|
|
|
|
table="current_state_events",
|
|
|
|
column="room_id",
|
|
|
|
iterable=to_delete,
|
|
|
|
keyvalues={
|
|
|
|
"type": EventTypes.Member,
|
|
|
|
"membership": Membership.JOIN,
|
|
|
|
},
|
|
|
|
retcols=("state_key",),
|
|
|
|
),
|
2020-01-30 12:17:44 -05:00
|
|
|
)
|
|
|
|
|
2023-10-11 13:24:56 -04:00
|
|
|
potentially_left_users = {row[0] for row in rows}
|
2020-01-30 12:17:44 -05:00
|
|
|
|
|
|
|
# Now lets actually delete the rooms from the DB.
|
2020-08-05 16:38:57 -04:00
|
|
|
self.db_pool.simple_delete_many_txn(
|
2020-01-30 12:17:44 -05:00
|
|
|
txn,
|
|
|
|
table="current_state_events",
|
|
|
|
column="room_id",
|
2021-09-20 05:26:13 -04:00
|
|
|
values=to_delete,
|
2020-01-30 12:17:44 -05:00
|
|
|
keyvalues={},
|
|
|
|
)
|
|
|
|
|
2020-08-05 16:38:57 -04:00
|
|
|
self.db_pool.simple_delete_many_txn(
|
2020-01-30 12:17:44 -05:00
|
|
|
txn,
|
|
|
|
table="event_forward_extremities",
|
|
|
|
column="room_id",
|
2021-09-20 05:26:13 -04:00
|
|
|
values=to_delete,
|
2020-01-30 12:17:44 -05:00
|
|
|
keyvalues={},
|
|
|
|
)
|
|
|
|
|
2020-08-05 16:38:57 -04:00
|
|
|
self.db_pool.updates._background_update_progress_txn(
|
2020-01-30 12:17:44 -05:00
|
|
|
txn,
|
|
|
|
self.DELETE_CURRENT_STATE_UPDATE_NAME,
|
|
|
|
{"last_room_id": room_ids[-1]},
|
|
|
|
)
|
|
|
|
|
|
|
|
return False, potentially_left_users
|
|
|
|
|
2020-08-05 16:38:57 -04:00
|
|
|
finished, potentially_left_users = await self.db_pool.runInteraction(
|
2020-01-30 12:17:44 -05:00
|
|
|
"_background_remove_left_rooms", _background_remove_left_rooms_txn
|
|
|
|
)
|
|
|
|
|
|
|
|
if finished:
|
2020-08-05 16:38:57 -04:00
|
|
|
await self.db_pool.updates._end_background_update(
|
2020-01-30 12:17:44 -05:00
|
|
|
self.DELETE_CURRENT_STATE_UPDATE_NAME
|
|
|
|
)
|
|
|
|
|
|
|
|
# Now go and check if we still share a room with the remote users in
|
|
|
|
# the deleted rooms. If not mark their device lists as stale.
|
|
|
|
joined_users = await self.get_users_server_still_shares_room_with(
|
|
|
|
potentially_left_users
|
|
|
|
)
|
|
|
|
|
|
|
|
for user_id in potentially_left_users - joined_users:
|
2022-03-31 13:38:09 -04:00
|
|
|
await self.mark_remote_user_device_list_as_unsubscribed(user_id) # type: ignore[attr-defined]
|
2020-01-30 12:17:44 -05:00
|
|
|
|
|
|
|
return batch_size
|
2019-10-21 07:56:42 -04:00
|
|
|
|
|
|
|
|
2019-12-20 05:48:24 -05:00
|
|
|
class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):
|
2019-10-21 07:56:42 -04:00
|
|
|
"""Keeps track of the state at a given event.
|
|
|
|
|
|
|
|
This is done by the concept of `state groups`. Every event is a assigned
|
|
|
|
a state group (identified by an arbitrary string), which references a
|
|
|
|
collection of state events. The current state of an event is then the
|
|
|
|
collection of state events referenced by the event's state group.
|
|
|
|
|
|
|
|
Hence, every change in the current state causes a new state group to be
|
|
|
|
generated. However, if no change happens (e.g., if we get a message event
|
|
|
|
with only one parent it inherits the state group from its parent.)
|
|
|
|
|
|
|
|
There are three tables:
|
|
|
|
* `state_groups`: Stores group name, first event with in the group and
|
|
|
|
room id.
|
|
|
|
* `event_to_state_groups`: Maps events to state groups.
|
|
|
|
* `state_groups_state`: Maps state group to state events.
|
|
|
|
"""
|
|
|
|
|
2021-12-13 12:05:00 -05:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
database: DatabasePool,
|
|
|
|
db_conn: LoggingDatabaseConnection,
|
|
|
|
hs: "HomeServer",
|
|
|
|
):
|
2020-09-18 09:56:44 -04:00
|
|
|
super().__init__(database, db_conn, hs)
|
2024-01-10 09:31:35 -05:00
|
|
|
|
|
|
|
|
|
|
|
@attr.s(auto_attribs=True, slots=True)
|
|
|
|
class StateMapWrapper(Dict[StateKey, str]):
|
|
|
|
"""A wrapper around a StateMap[str] to ensure that we only query for items
|
|
|
|
that were not filtered out.
|
|
|
|
|
|
|
|
This is to help prevent bugs where we filter out state but other bits of the
|
|
|
|
code expect the state to be there.
|
|
|
|
"""
|
|
|
|
|
|
|
|
state_filter: StateFilter
|
|
|
|
|
|
|
|
def __getitem__(self, key: StateKey) -> str:
|
|
|
|
if key not in self.state_filter:
|
|
|
|
raise Exception("State map was filtered and doesn't include: %s", key)
|
|
|
|
return super().__getitem__(key)
|
|
|
|
|
|
|
|
@overload
|
2024-03-13 12:46:44 -04:00
|
|
|
def get(self, key: Tuple[str, str]) -> Optional[str]: ...
|
2024-01-10 09:31:35 -05:00
|
|
|
|
|
|
|
@overload
|
2024-03-13 12:46:44 -04:00
|
|
|
def get(self, key: Tuple[str, str], default: Union[str, _T]) -> Union[str, _T]: ...
|
2024-01-10 09:31:35 -05:00
|
|
|
|
|
|
|
def get(
|
|
|
|
self, key: StateKey, default: Union[str, _T, None] = None
|
|
|
|
) -> Union[str, _T, None]:
|
|
|
|
if key not in self.state_filter:
|
|
|
|
raise Exception("State map was filtered and doesn't include: %s", key)
|
|
|
|
return super().get(key, default)
|
|
|
|
|
|
|
|
def __contains__(self, key: Any) -> bool:
|
|
|
|
if key not in self.state_filter:
|
|
|
|
raise Exception("State map was filtered and doesn't include: %s", key)
|
|
|
|
|
|
|
|
return super().__contains__(key)
|