Compare commits

...

4 Commits

Author SHA1 Message Date
Tulir Asokan
467976a74a Merge remote-tracking branch 'upstream/release-v1.115' 2024-09-12 14:41:46 +03:00
Andrew Morgan
4c66a7cbed 1.115.0rc2 2024-09-12 11:10:31 +01:00
Erik Johnston
76f7c91e44 Sliding sync: don't fetch room summary for named rooms. (#17683)
For rooms with a name we can skip fetching a full room summary, as we
don't need to calculate heroes, and instead just fetch the room counts
directly.

This also changes things to not return counts and heroes for non-joined
rooms. For left/banned rooms we were returning zero values anyway, and
for invite/knock rooms we don't really want to leak such information
(even if some of is included in the stripped state).
2024-09-11 16:42:50 +01:00
Erik Johnston
b732d13d4c Sliding sync: various fixups to the background update (#17652) 2024-09-11 16:42:15 +01:00
12 changed files with 322 additions and 257 deletions

View File

@ -1,3 +1,13 @@
# Synapse 1.115.0rc2 (2024-09-12)
### Internal Changes
- Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. ([\#17652](https://github.com/element-hq/synapse/issues/17652))
- Speed up sliding sync by reducing amount of data pulled out of the database for large rooms. ([\#17683](https://github.com/element-hq/synapse/issues/17683))
# Synapse 1.115.0rc1 (2024-09-10) # Synapse 1.115.0rc1 (2024-09-10)
### Features ### Features

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.115.0~rc2) stable; urgency=medium
* New Synapse release 1.115.0rc2.
-- Synapse Packaging team <packages@matrix.org> Thu, 12 Sep 2024 11:10:15 +0100
matrix-synapse-py3 (1.115.0~rc1) stable; urgency=medium matrix-synapse-py3 (1.115.0~rc1) stable; urgency=medium
* New Synapse release 1.115.0rc1. * New Synapse release 1.115.0rc1.

View File

@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.115.0rc1" version = "1.115.0rc2"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later" license = "AGPL-3.0-or-later"

View File

@ -784,32 +784,10 @@ class SlidingSyncHandler:
): ):
avatar_changed = True avatar_changed = True
# We only need the room summary for calculating heroes, however if we do
# fetch it then we can use it to calculate `joined_count` and
# `invited_count`.
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
empty_membership_summary = MemberSummary([], 0)
# We need the room summary for:
# - Always for initial syncs (or the first time we send down the room)
# - When the room has no name, we need `heroes`
# - When the membership has changed so we need to give updated `heroes` and
# `joined_count`/`invited_count`.
#
# Ideally, instead of just looking at `name_changed`, we'd check if the room
# name is not set but this is a good enough approximation that saves us from
# having to pull out the full event. This just means, we're generating the
# summary whenever the room name changes instead of only when it changes to
# `None`.
if initial or name_changed or membership_changed:
# We can't trace the function directly because it's cached and the `@cached`
# decorator doesn't mix with `@trace` yet.
with start_active_span("get_room_summary"):
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
# TODO: Figure out how to get the membership summary for left/banned rooms
room_membership_summary = {}
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
# `heroes` are required if the room name is not set. # `heroes` are required if the room name is not set.
# #
@ -828,11 +806,45 @@ class SlidingSyncHandler:
# get them on initial syncs (or the first time we send down the room) or if the # get them on initial syncs (or the first time we send down the room) or if the
# membership has changed which may change the heroes. # membership has changed which may change the heroes.
if name_event_id is None and (initial or (not initial and membership_changed)): if name_event_id is None and (initial or (not initial and membership_changed)):
assert room_membership_summary is not None # We need the room summary to extract the heroes from
if room_membership_for_user_at_to_token.membership != Membership.JOIN:
# TODO: Figure out how to get the membership summary for left/banned rooms
# For invite/knock rooms we don't include the information.
room_membership_summary = {}
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
hero_user_ids = extract_heroes_from_room_summary( hero_user_ids = extract_heroes_from_room_summary(
room_membership_summary, me=user.to_string() room_membership_summary, me=user.to_string()
) )
# Fetch the membership counts for rooms we're joined to.
#
# Similarly to other metadata, we only need to calculate the member
# counts if this is an initial sync or the memberships have changed.
joined_count: Optional[int] = None
invited_count: Optional[int] = None
if (
initial or membership_changed
) and room_membership_for_user_at_to_token.membership == Membership.JOIN:
# If we have the room summary (because we calculated heroes above)
# then we can simply pull the counts from there.
if room_membership_summary is not None:
empty_membership_summary = MemberSummary([], 0)
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
else:
member_counts = await self.store.get_member_counts(room_id)
joined_count = member_counts.get(Membership.JOIN, 0)
invited_count = member_counts.get(Membership.INVITE, 0)
# Fetch the `required_state` for the room # Fetch the `required_state` for the room
# #
# No `required_state` for invite/knock rooms (just `stripped_state`) # No `required_state` for invite/knock rooms (just `stripped_state`)
@ -1090,20 +1102,6 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
joined_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
return SlidingSyncResult.RoomResult( return SlidingSyncResult.RoomResult(
name=room_name, name=room_name,
avatar=room_avatar, avatar=room_avatar,

View File

@ -112,6 +112,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache( self._attempt_to_invalidate_cache(
"get_number_joined_users_in_room", (room_id,) "get_number_joined_users_in_room", (room_id,)
) )
self._attempt_to_invalidate_cache("get_member_counts", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
# There's no easy way of invalidating this cache for just the users # There's no easy way of invalidating this cache for just the users
@ -153,6 +154,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_member_counts", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)

View File

@ -1980,7 +1980,12 @@ class PersistEventsStore:
if state_key == (EventTypes.Create, ""): if state_key == (EventTypes.Create, ""):
room_type = event.content.get(EventContentFields.ROOM_TYPE) room_type = event.content.get(EventContentFields.ROOM_TYPE)
# Scrutinize JSON values # Scrutinize JSON values
if room_type is None or isinstance(room_type, str): if room_type is None or (
isinstance(room_type, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_type
):
sliding_sync_insert_map["room_type"] = room_type sliding_sync_insert_map["room_type"] = room_type
elif state_key == (EventTypes.RoomEncryption, ""): elif state_key == (EventTypes.RoomEncryption, ""):
encryption_algorithm = event.content.get( encryption_algorithm = event.content.get(
@ -1990,15 +1995,26 @@ class PersistEventsStore:
sliding_sync_insert_map["is_encrypted"] = is_encrypted sliding_sync_insert_map["is_encrypted"] = is_encrypted
elif state_key == (EventTypes.Name, ""): elif state_key == (EventTypes.Name, ""):
room_name = event.content.get(EventContentFields.ROOM_NAME) room_name = event.content.get(EventContentFields.ROOM_NAME)
# Scrutinize JSON values # Scrutinize JSON values. We ignore values with nulls as
if room_name is None or isinstance(room_name, str): # postgres doesn't allow null bytes in text columns.
if room_name is None or (
isinstance(room_name, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_name
):
sliding_sync_insert_map["room_name"] = room_name sliding_sync_insert_map["room_name"] = room_name
elif state_key == (EventTypes.Tombstone, ""): elif state_key == (EventTypes.Tombstone, ""):
successor_room_id = event.content.get( successor_room_id = event.content.get(
EventContentFields.TOMBSTONE_SUCCESSOR_ROOM EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
) )
# Scrutinize JSON values # Scrutinize JSON values
if successor_room_id is None or isinstance(successor_room_id, str): if successor_room_id is None or (
isinstance(successor_room_id, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in successor_room_id
):
sliding_sync_insert_map["tombstone_successor_room_id"] = ( sliding_sync_insert_map["tombstone_successor_room_id"] = (
successor_room_id successor_room_id
) )
@ -2081,6 +2097,21 @@ class PersistEventsStore:
else None else None
) )
# Check for null bytes in the room name and type. We have to
# ignore values with null bytes as Postgres doesn't allow them
# in text columns.
if (
sliding_sync_insert_map["room_name"] is not None
and "\0" in sliding_sync_insert_map["room_name"]
):
sliding_sync_insert_map.pop("room_name")
if (
sliding_sync_insert_map["room_type"] is not None
and "\0" in sliding_sync_insert_map["room_type"]
):
sliding_sync_insert_map.pop("room_type")
# Find the tombstone_successor_room_id # Find the tombstone_successor_room_id
# Note: This isn't one of the stripped state events according to the spec # Note: This isn't one of the stripped state events according to the spec
# but seems like there is no reason not to support this kind of thing. # but seems like there is no reason not to support this kind of thing.
@ -2095,6 +2126,12 @@ class PersistEventsStore:
else None else None
) )
if (
sliding_sync_insert_map["tombstone_successor_room_id"] is not None
and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"]
):
sliding_sync_insert_map.pop("tombstone_successor_room_id")
else: else:
# No stripped state provided # No stripped state provided
sliding_sync_insert_map["has_known_state"] = False sliding_sync_insert_map["has_known_state"] = False

View File

@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_worker import (
) )
from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
@ -1877,9 +1878,29 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
def _find_memberships_to_update_txn( def _find_memberships_to_update_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[ ) -> List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] Tuple[
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
]: ]:
# Fetch the set of event IDs that we want to update # Fetch the set of event IDs that we want to update
#
# We skip over rows which we've already handled, i.e. have a
# matching row in `sliding_sync_membership_snapshots` with the same
# room, user and event ID.
#
# We also ignore rooms that the user has left themselves (i.e. not
# kicked). This is to avoid having to port lots of old rooms that we
# will never send down sliding sync (as we exclude such rooms from
# initial syncs).
if initial_phase: if initial_phase:
# There are some old out-of-band memberships (before # There are some old out-of-band memberships (before
@ -1892,6 +1913,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
SELECT SELECT
c.room_id, c.room_id,
r.room_id, r.room_id,
r.room_version,
c.user_id, c.user_id,
e.sender, e.sender,
c.event_id, c.event_id,
@ -1900,9 +1922,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
e.instance_name, e.instance_name,
e.outlier e.outlier
FROM local_current_membership AS c FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id) INNER JOIN events AS e USING (event_id)
LEFT JOIN rooms AS r ON (c.room_id = r.room_id) LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE (c.room_id, c.user_id) > (?, ?) WHERE (c.room_id, c.user_id) > (?, ?)
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.room_id ASC, c.user_id ASC ORDER BY c.room_id ASC, c.user_id ASC
LIMIT ? LIMIT ?
""", """,
@ -1922,7 +1946,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
""" """
SELECT SELECT
c.room_id, c.room_id,
c.room_id, r.room_id,
r.room_version,
c.user_id, c.user_id,
e.sender, e.sender,
c.event_id, c.event_id,
@ -1931,9 +1956,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
e.instance_name, e.instance_name,
e.outlier e.outlier
FROM local_current_membership AS c FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id) INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ? LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
ORDER BY event_stream_ordering ASC WHERE c.event_stream_ordering > ?
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.event_stream_ordering ASC
LIMIT ? LIMIT ?
""", """,
(last_event_stream_ordering, batch_size), (last_event_stream_ordering, batch_size),
@ -1944,7 +1972,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
memberships_to_update_rows = cast( memberships_to_update_rows = cast(
List[ List[
Tuple[ Tuple[
str, Optional[str], str, str, str, str, int, Optional[str], bool str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
] ]
], ],
txn.fetchall(), txn.fetchall(),
@ -1977,7 +2014,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
def _find_previous_invite_or_knock_membership_txn( def _find_previous_invite_or_knock_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
) -> Tuple[str, str]: ) -> Optional[Tuple[str, str]]:
# Find the previous invite/knock event before the leave event # Find the previous invite/knock event before the leave event
# #
# Here are some notes on how we landed on this query: # Here are some notes on how we landed on this query:
@ -2027,8 +2064,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
row = txn.fetchone() row = txn.fetchone()
# We should see a corresponding previous invite/knock event if row is None:
assert row is not None # Generally we should have an invite or knock event for leaves
# that are outliers, however this may not always be the case
# (e.g. a local user got kicked but the kick event got pulled in
# as an outlier).
return None
event_id, membership = row event_id, membership = row
return event_id, membership return event_id, membership
@ -2043,6 +2085,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
for ( for (
room_id, room_id,
room_id_from_rooms_table, room_id_from_rooms_table,
room_version_id,
user_id, user_id,
sender, sender,
membership_event_id, membership_event_id,
@ -2061,6 +2104,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
Membership.BAN, Membership.BAN,
) )
if (
room_version_id is not None
and room_version_id not in KNOWN_ROOM_VERSIONS
):
# Ignore rooms with unknown room versions (these were
# experimental rooms, that we no longer support).
continue
# There are some old out-of-band memberships (before # There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the # https://github.com/matrix-org/synapse/issues/6983) where we don't have the
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
@ -2148,14 +2199,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# in the events table though. We'll just say that we don't # in the events table though. We'll just say that we don't
# know the state for these rooms and continue on with our # know the state for these rooms and continue on with our
# day. # day.
sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( sliding_sync_membership_snapshots_insert_map = {
False "has_known_state": False,
) "room_type": None,
"room_name": None,
"is_encrypted": False,
}
elif membership in (Membership.INVITE, Membership.KNOCK) or ( elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership in (Membership.LEAVE, Membership.BAN) and is_outlier membership in (Membership.LEAVE, Membership.BAN) and is_outlier
): ):
invite_or_knock_event_id = membership_event_id invite_or_knock_event_id = None
invite_or_knock_membership = membership invite_or_knock_membership = None
# If the event is an `out_of_band_membership` (special case of # If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from # `outlier`), we never had historical state so we have to pull from
@ -2164,35 +2218,55 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# membership (i.e. the room shouldn't disappear if your using the # membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave). # `is_encrypted` filter and you leave).
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
( previous_membership = await self.db_pool.runInteraction(
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn", "sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn, _find_previous_invite_or_knock_membership_txn,
room_id, room_id,
user_id, user_id,
membership_event_id, membership_event_id,
) )
if previous_membership is not None:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = previous_membership
else:
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
# Pull from the stripped state on the invite/knock event if (
invite_or_knock_event = await self.get_event(invite_or_knock_event_id) invite_or_knock_event_id is not None
and invite_or_knock_membership is not None
raw_stripped_state_events = None ):
if invite_or_knock_membership == Membership.INVITE: # Pull from the stripped state on the invite/knock event
invite_room_state = invite_or_knock_event.unsigned.get( invite_or_knock_event = await self.get_event(
"invite_room_state" invite_or_knock_event_id
) )
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( raw_stripped_state_events = None
raw_stripped_state_events if invite_or_knock_membership == Membership.INVITE:
) invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
else:
# We couldn't find any state for the membership, so we just have to
# leave it as empty.
sliding_sync_membership_snapshots_insert_map = {
"has_known_state": False,
"room_type": None,
"room_name": None,
"is_encrypted": False,
}
# We should have some insert values for each room, even if no # We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record # stripped state is on the event because we still want to record
@ -2311,19 +2385,42 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
# We need to find the `forgotten` value during the transaction because # We need to find the `forgotten` value during the transaction because
# we can't risk inserting stale data. # we can't risk inserting stale data.
txn.execute( if isinstance(txn.database_engine, PostgresEngine):
""" txn.execute(
UPDATE sliding_sync_membership_snapshots """
SET UPDATE sliding_sync_membership_snapshots
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) SET
WHERE room_id = ? and user_id = ? forgotten = m.forgotten
""", FROM room_memberships AS m
( WHERE sliding_sync_membership_snapshots.room_id = ?
membership_event_id, AND sliding_sync_membership_snapshots.user_id = ?
room_id, AND membership_event_id = ?
user_id, AND membership_event_id = m.event_id
), AND m.event_id IS NOT NULL
) """,
(
room_id,
user_id,
membership_event_id,
),
)
else:
# SQLite doesn't support UPDATE FROM before 3.33.0, so we do
# this via sub-selects.
txn.execute(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ? AND membership_event_id = ?
""",
(
membership_event_id,
room_id,
user_id,
membership_event_id,
),
)
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update", _fill_table_txn "sliding_sync_membership_snapshots_bg_update", _fill_table_txn
@ -2333,6 +2430,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
( (
room_id, room_id,
_room_id_from_rooms_table, _room_id_from_rooms_table,
_room_version_id,
user_id, user_id,
_sender, _sender,
_membership_event_id, _membership_event_id,

View File

@ -312,18 +312,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
# We do this all in one transaction to keep the cache small. # We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats # FIXME: get rid of this when we have room_stats
# Note, rejected events will have a null membership field, so counts = self._get_member_counts_txn(txn, room_id)
# we we manually filter them out.
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
GROUP BY membership
"""
txn.execute(sql, (room_id,))
res: Dict[str, MemberSummary] = {} res: Dict[str, MemberSummary] = {}
for count, membership in txn: for membership, count in counts.items():
res.setdefault(membership, MemberSummary([], count)) res.setdefault(membership, MemberSummary([], count))
# Order by membership (joins -> invites -> leave (former insiders) -> # Order by membership (joins -> invites -> leave (former insiders) ->
@ -369,6 +361,31 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"get_room_summary", _get_room_summary_txn "get_room_summary", _get_room_summary_txn
) )
@cached()
async def get_member_counts(self, room_id: str) -> Mapping[str, int]:
"""Get a mapping of number of users by membership"""
return await self.db_pool.runInteraction(
"get_member_counts", self._get_member_counts_txn, room_id
)
def _get_member_counts_txn(
self, txn: LoggingTransaction, room_id: str
) -> Dict[str, int]:
"""Get a mapping of number of users by membership"""
# Note, rejected events will have a null membership field, so
# we we manually filter them out.
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
GROUP BY membership
"""
txn.execute(sql, (room_id,))
return {membership: count for count, membership in txn}
@cached() @cached()
async def get_number_joined_users_in_room(self, room_id: str) -> int: async def get_number_joined_users_in_room(self, room_id: str) -> int:
return await self.db_pool.simple_select_one_onecol( return await self.db_pool.simple_select_one_onecol(

View File

@ -736,6 +736,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events" DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
MEMBERS_CURRENT_STATE_UPDATE_NAME = "current_state_events_members_room_index"
def __init__( def __init__(
self, self,
@ -764,6 +765,13 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
self.DELETE_CURRENT_STATE_UPDATE_NAME, self.DELETE_CURRENT_STATE_UPDATE_NAME,
self._background_remove_left_rooms, self._background_remove_left_rooms,
) )
self.db_pool.updates.register_background_index_update(
self.MEMBERS_CURRENT_STATE_UPDATE_NAME,
index_name="current_state_events_members_room_index",
table="current_state_events",
columns=["room_id", "membership"],
where_clause="type='m.room.member'",
)
async def _background_remove_left_rooms( async def _background_remove_left_rooms(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int

View File

@ -0,0 +1,19 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a background updates to add a new index:
-- `current_state_events(room_id, membership) WHERE type = 'm.room.member'
-- This makes counting membership in rooms (for syncs) much faster
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'current_state_events_members_room_index', '{}');

View File

@ -371,14 +371,17 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
"mxc://UPDATED_DUMMY_MEDIA_ID", "mxc://UPDATED_DUMMY_MEDIA_ID",
response_body["rooms"][room_id1], response_body["rooms"][room_id1],
) )
self.assertEqual(
response_body["rooms"][room_id1]["joined_count"], # We don't give extra room information to invitees
1, self.assertNotIn(
"joined_count",
response_body["rooms"][room_id1],
) )
self.assertEqual( self.assertNotIn(
response_body["rooms"][room_id1]["invited_count"], "invited_count",
1, response_body["rooms"][room_id1],
) )
self.assertIsNone( self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"), response_body["rooms"][room_id1].get("is_dm"),
) )
@ -450,15 +453,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
"mxc://DUMMY_MEDIA_ID", "mxc://DUMMY_MEDIA_ID",
response_body["rooms"][room_id1], response_body["rooms"][room_id1],
) )
self.assertEqual(
response_body["rooms"][room_id1]["joined_count"], # FIXME: We possibly want to return joined and invited counts for rooms
# FIXME: The actual number should be "1" (user2) but we currently don't # you're banned form
# support this for rooms where the user has left/been banned. self.assertNotIn(
0, "joined_count",
response_body["rooms"][room_id1],
) )
self.assertEqual( self.assertNotIn(
response_body["rooms"][room_id1]["invited_count"], "invited_count",
0, response_body["rooms"][room_id1],
) )
self.assertIsNone( self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"), response_body["rooms"][room_id1].get("is_dm"),
@ -692,19 +696,15 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
[], [],
) )
self.assertEqual( # FIXME: We possibly want to return joined and invited counts for rooms
response_body["rooms"][room_id1]["joined_count"], # you're banned form
# FIXME: The actual number should be "1" (user2) but we currently don't self.assertNotIn(
# support this for rooms where the user has left/been banned. "joined_count",
0, response_body["rooms"][room_id1],
) )
self.assertEqual( self.assertNotIn(
response_body["rooms"][room_id1]["invited_count"], "invited_count",
# We shouldn't see user5 since they were invited after user1 was banned. response_body["rooms"][room_id1],
#
# FIXME: The actual number should be "1" (user3) but we currently don't
# support this for rooms where the user has left/been banned.
0,
) )
def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None: def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None:

View File

@ -4416,136 +4416,6 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
), ),
) )
def test_membership_snapshots_background_update_forgotten_partial(self) -> None:
"""
Test an existing `sliding_sync_membership_snapshots` row is updated with the
latest `forgotten` status after the background update passes over it.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
# User1 joins the room
self.helper.join(room_id, user1_id, tok=user1_tok)
# User1 leaves the room (we have to leave in order to forget the room)
self.helper.leave(room_id, user1_id, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
# Forget the room
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{room_id}/forget",
content={},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)
# Clean-up the `sliding_sync_joined_rooms` table as if the forgotten status
# never made it into the table.
self.get_success(
self.store.db_pool.simple_update(
table="sliding_sync_membership_snapshots",
keyvalues={"room_id": room_id},
updatevalues={"forgotten": 0},
desc="sliding_sync_membership_snapshots.test_membership_snapshots_background_update_forgotten_partial",
)
)
# We should see the partial row that we made in preparation for the test.
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
user1_snapshot = _SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user1_id,
sender=user1_id,
membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id,
membership=Membership.LEAVE,
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
tombstone_successor_room_id=None,
# Room is *not* forgotten because of our test preparation
forgotten=False,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
user1_snapshot,
)
user2_snapshot = _SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user2_id,
sender=user2_id,
membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
tombstone_successor_room_id=None,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
user2_snapshot,
)
# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
# Make sure the table is populated
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
# Forgotten status is now updated
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
attr.evolve(user1_snapshot, forgotten=True),
)
# Holds the info according to the current state when the user joined
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
user2_snapshot,
)
class SlidingSyncTablesCatchUpBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): class SlidingSyncTablesCatchUpBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
""" """