mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Sliding Sync: Fix outlier re-persisting causing problems with sliding sync tables (#17635)
Fix outlier re-persisting causing problems with sliding sync tables Follow-up to https://github.com/element-hq/synapse/pull/17512 When running on `matrix.org`, we discovered that a remote invite is first persisted as an `outlier` and then re-persisted again where it is de-outliered. The first the time, the `outlier` is persisted with one `stream_ordering` but when persisted again and de-outliered, it is assigned a different `stream_ordering` that won't end up being used. Since we call `_calculate_sliding_sync_table_changes()` before `_update_outliers_txn()` which fixes this discrepancy (always use the `stream_ordering` from the first time it was persisted), we're working with an unreliable `stream_ordering` value that will possibly be unused and not make it into the `events` table.
This commit is contained in:
parent
d844afdc29
commit
26f81fb5be
1
changelog.d/17635.misc
Normal file
1
changelog.d/17635.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
@ -230,6 +230,8 @@ class EventContentFields:
|
|||||||
|
|
||||||
ROOM_NAME: Final = "name"
|
ROOM_NAME: Final = "name"
|
||||||
|
|
||||||
|
MEMBERSHIP: Final = "membership"
|
||||||
|
|
||||||
# Used in m.room.guest_access events.
|
# Used in m.room.guest_access events.
|
||||||
GUEST_ACCESS: Final = "guest_access"
|
GUEST_ACCESS: Final = "guest_access"
|
||||||
|
|
||||||
|
@ -163,6 +163,15 @@ class SlidingSyncMembershipInfo:
|
|||||||
sender: str
|
sender: str
|
||||||
membership_event_id: str
|
membership_event_id: str
|
||||||
membership: str
|
membership: str
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
|
class SlidingSyncMembershipInfoWithEventPos(SlidingSyncMembershipInfo):
|
||||||
|
"""
|
||||||
|
SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership
|
||||||
|
event
|
||||||
|
"""
|
||||||
|
|
||||||
membership_event_stream_ordering: int
|
membership_event_stream_ordering: int
|
||||||
membership_event_instance_name: str
|
membership_event_instance_name: str
|
||||||
|
|
||||||
@ -170,17 +179,6 @@ class SlidingSyncMembershipInfo:
|
|||||||
@attr.s(slots=True, auto_attribs=True)
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
class SlidingSyncTableChanges:
|
class SlidingSyncTableChanges:
|
||||||
room_id: str
|
room_id: str
|
||||||
# `stream_ordering` of the most recent event being persisted in the room. This doesn't
|
|
||||||
# need to be perfect, we just need *some* answer that points to a real event in the
|
|
||||||
# room in case we are the first ones inserting into the `sliding_sync_joined_rooms`
|
|
||||||
# table because of the `NON NULL` constraint on `event_stream_ordering`. In reality,
|
|
||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
|
|
||||||
# `_update_current_state_txn()` whenever a new event is persisted to update it to the
|
|
||||||
# correct latest value.
|
|
||||||
#
|
|
||||||
# This should be *some* value that points to a real event in the room if we are
|
|
||||||
# still joined to the room and some state is changing (`to_insert` or `to_delete`).
|
|
||||||
joined_room_best_effort_most_recent_stream_ordering: Optional[int]
|
|
||||||
# If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
|
# If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
|
||||||
# fully-insert it which means we also need to include a `bump_stamp` value to use
|
# fully-insert it which means we also need to include a `bump_stamp` value to use
|
||||||
# for the row. This should only be populated when we're trying to fully-insert a
|
# for the row. This should only be populated when we're trying to fully-insert a
|
||||||
@ -401,6 +399,9 @@ class PersistEventsStore:
|
|||||||
`stream_ordering`).
|
`stream_ordering`).
|
||||||
delta_state: Deltas that are going to be used to update the
|
delta_state: Deltas that are going to be used to update the
|
||||||
`current_state_events` table. Changes to the current state of the room.
|
`current_state_events` table. Changes to the current state of the room.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
SlidingSyncTableChanges
|
||||||
"""
|
"""
|
||||||
to_insert = delta_state.to_insert
|
to_insert = delta_state.to_insert
|
||||||
to_delete = delta_state.to_delete
|
to_delete = delta_state.to_delete
|
||||||
@ -410,7 +411,6 @@ class PersistEventsStore:
|
|||||||
if not to_insert and not to_delete:
|
if not to_insert and not to_delete:
|
||||||
return SlidingSyncTableChanges(
|
return SlidingSyncTableChanges(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
joined_room_best_effort_most_recent_stream_ordering=None,
|
|
||||||
joined_room_bump_stamp_to_fully_insert=None,
|
joined_room_bump_stamp_to_fully_insert=None,
|
||||||
joined_room_updates={},
|
joined_room_updates={},
|
||||||
membership_snapshot_shared_insert_values={},
|
membership_snapshot_shared_insert_values={},
|
||||||
@ -469,24 +469,24 @@ class PersistEventsStore:
|
|||||||
membership_event_id,
|
membership_event_id,
|
||||||
user_id,
|
user_id,
|
||||||
) in membership_event_id_to_user_id_map.items():
|
) in membership_event_id_to_user_id_map.items():
|
||||||
# We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point
|
|
||||||
membership_event_stream_ordering = membership_event_map[
|
|
||||||
membership_event_id
|
|
||||||
].internal_metadata.stream_ordering
|
|
||||||
assert membership_event_stream_ordering is not None
|
|
||||||
membership_event_instance_name = membership_event_map[
|
|
||||||
membership_event_id
|
|
||||||
].internal_metadata.instance_name
|
|
||||||
assert membership_event_instance_name is not None
|
|
||||||
|
|
||||||
membership_infos_to_insert_membership_snapshots.append(
|
membership_infos_to_insert_membership_snapshots.append(
|
||||||
|
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
|
||||||
|
# because we're sourcing the event from `events_and_contexts`, we
|
||||||
|
# can't rely on `stream_ordering`/`instance_name` being correct. We
|
||||||
|
# could be working with events that were previously persisted as an
|
||||||
|
# `outlier` with one `stream_ordering` but are now being persisted
|
||||||
|
# again and de-outliered and assigned a different `stream_ordering`
|
||||||
|
# that won't end up being used. Since we call
|
||||||
|
# `_calculate_sliding_sync_table_changes()` before
|
||||||
|
# `_update_outliers_txn()` which fixes this discrepancy (always use
|
||||||
|
# the `stream_ordering` from the first time it was persisted), we're
|
||||||
|
# working with an unreliable `stream_ordering` value that will
|
||||||
|
# possibly be unused and not make it into the `events` table.
|
||||||
SlidingSyncMembershipInfo(
|
SlidingSyncMembershipInfo(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
sender=membership_event_map[membership_event_id].sender,
|
sender=membership_event_map[membership_event_id].sender,
|
||||||
membership_event_id=membership_event_id,
|
membership_event_id=membership_event_id,
|
||||||
membership=membership_event_map[membership_event_id].membership,
|
membership=membership_event_map[membership_event_id].membership,
|
||||||
membership_event_stream_ordering=membership_event_stream_ordering,
|
|
||||||
membership_event_instance_name=membership_event_instance_name,
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -568,7 +568,6 @@ class PersistEventsStore:
|
|||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
|
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
|
||||||
#
|
#
|
||||||
joined_room_updates: SlidingSyncStateInsertValues = {}
|
joined_room_updates: SlidingSyncStateInsertValues = {}
|
||||||
best_effort_most_recent_stream_ordering: Optional[int] = None
|
|
||||||
bump_stamp_to_fully_insert: Optional[int] = None
|
bump_stamp_to_fully_insert: Optional[int] = None
|
||||||
if not delta_state.no_longer_in_room:
|
if not delta_state.no_longer_in_room:
|
||||||
current_state_ids_map = {}
|
current_state_ids_map = {}
|
||||||
@ -632,9 +631,7 @@ class PersistEventsStore:
|
|||||||
|
|
||||||
# Otherwise, we need to find a couple events that we were reset to.
|
# Otherwise, we need to find a couple events that we were reset to.
|
||||||
if missing_event_ids:
|
if missing_event_ids:
|
||||||
remaining_events = await self.store.get_events(
|
remaining_events = await self.store.get_events(missing_event_ids)
|
||||||
current_state_ids_map.values()
|
|
||||||
)
|
|
||||||
# There shouldn't be any missing events
|
# There shouldn't be any missing events
|
||||||
assert (
|
assert (
|
||||||
remaining_events.keys() == missing_event_ids
|
remaining_events.keys() == missing_event_ids
|
||||||
@ -657,52 +654,9 @@ class PersistEventsStore:
|
|||||||
elif state_key == (EventTypes.Name, ""):
|
elif state_key == (EventTypes.Name, ""):
|
||||||
joined_room_updates["room_name"] = None
|
joined_room_updates["room_name"] = None
|
||||||
|
|
||||||
# Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to
|
|
||||||
# be perfect, we just need *some* answer that points to a real event in the
|
|
||||||
# room in case we are the first ones inserting into the
|
|
||||||
# `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on
|
|
||||||
# `event_stream_ordering`. In reality,
|
|
||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
|
|
||||||
# `_update_current_state_txn()` whenever a new event is persisted to update
|
|
||||||
# it to the correct latest value.
|
|
||||||
#
|
|
||||||
if len(events_and_contexts) > 0:
|
|
||||||
# Since the list is sorted ascending by `stream_ordering`, the last event
|
|
||||||
# should have the highest `stream_ordering`.
|
|
||||||
best_effort_most_recent_stream_ordering = events_and_contexts[-1][
|
|
||||||
0
|
|
||||||
].internal_metadata.stream_ordering
|
|
||||||
else:
|
|
||||||
# If there are no `events_and_contexts`, we assume it's one of two scenarios:
|
|
||||||
# 1. If there are new state `to_insert` but no `events_and_contexts`,
|
|
||||||
# then it's a state reset.
|
|
||||||
# 2. Otherwise, it's some partial-state room re-syncing the current state and
|
|
||||||
# going through un-partial process.
|
|
||||||
#
|
|
||||||
# Either way, we assume no new events are being persisted and we can
|
|
||||||
# find the latest already in the database. Since this is a best-effort
|
|
||||||
# value, we don't need to be perfect although I think we're pretty close
|
|
||||||
# here.
|
|
||||||
most_recent_event_pos_results = (
|
|
||||||
await self.store.get_last_event_pos_in_room(
|
|
||||||
room_id, event_types=None
|
|
||||||
)
|
|
||||||
)
|
|
||||||
assert most_recent_event_pos_results, (
|
|
||||||
f"We should not be seeing `None` here because we are still in the room ({room_id}) and "
|
|
||||||
+ "it should at-least have a join membership event that's keeping us here."
|
|
||||||
)
|
|
||||||
best_effort_most_recent_stream_ordering = most_recent_event_pos_results[
|
|
||||||
1
|
|
||||||
].stream
|
|
||||||
|
|
||||||
# We should have found a value if we are still in the room
|
|
||||||
assert best_effort_most_recent_stream_ordering is not None
|
|
||||||
|
|
||||||
return SlidingSyncTableChanges(
|
return SlidingSyncTableChanges(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
# For `sliding_sync_joined_rooms`
|
# For `sliding_sync_joined_rooms`
|
||||||
joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering,
|
|
||||||
joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert,
|
joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert,
|
||||||
joined_room_updates=joined_room_updates,
|
joined_room_updates=joined_room_updates,
|
||||||
# For `sliding_sync_membership_snapshots`
|
# For `sliding_sync_membership_snapshots`
|
||||||
@ -1773,31 +1727,53 @@ class PersistEventsStore:
|
|||||||
#
|
#
|
||||||
# We only need to update when one of the relevant state values has changed
|
# We only need to update when one of the relevant state values has changed
|
||||||
if sliding_sync_table_changes.joined_room_updates:
|
if sliding_sync_table_changes.joined_room_updates:
|
||||||
# This should be *some* value that points to a real event in the room if
|
sliding_sync_updates_keys = (
|
||||||
# we are still joined to the room.
|
sliding_sync_table_changes.joined_room_updates.keys()
|
||||||
assert (
|
)
|
||||||
sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering
|
sliding_sync_updates_values = (
|
||||||
is not None
|
sliding_sync_table_changes.joined_room_updates.values()
|
||||||
)
|
)
|
||||||
|
|
||||||
self.db_pool.simple_upsert_txn(
|
args: List[Any] = [
|
||||||
txn,
|
room_id,
|
||||||
table="sliding_sync_joined_rooms",
|
room_id,
|
||||||
keyvalues={"room_id": room_id},
|
sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
|
||||||
values=sliding_sync_table_changes.joined_room_updates,
|
]
|
||||||
insertion_values={
|
args.extend(iter(sliding_sync_updates_values))
|
||||||
# The reason we're only *inserting* (not *updating*)
|
|
||||||
# `event_stream_ordering` here is because the column has a `NON
|
# XXX: We use a sub-query for `stream_ordering` because it's unreliable to
|
||||||
# NULL` constraint and we need *some* answer. And if the row
|
# pre-calculate from `events_and_contexts` at the time when
|
||||||
# already exists, it already has the correct value and it's
|
# `_calculate_sliding_sync_table_changes()` is ran. We could be working
|
||||||
# better to just rely on
|
# with events that were previously persisted as an `outlier` with one
|
||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`
|
# `stream_ordering` but are now being persisted again and de-outliered
|
||||||
# to do the right thing (same for `bump_stamp`).
|
# and assigned a different `stream_ordering`. Since we call
|
||||||
"event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering,
|
# `_calculate_sliding_sync_table_changes()` before
|
||||||
# If we're trying to fully-insert a row, we need to provide a
|
# `_update_outliers_txn()` which fixes this discrepancy (always use the
|
||||||
# value for `bump_stamp` if it exists for the room.
|
# `stream_ordering` from the first time it was persisted), we're working
|
||||||
"bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
|
# with an unreliable `stream_ordering` value that will possibly be
|
||||||
},
|
# unused and not make it into the `events` table.
|
||||||
|
#
|
||||||
|
# We don't update `event_stream_ordering` `ON CONFLICT` because it's
|
||||||
|
# simpler and we can just rely on
|
||||||
|
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do
|
||||||
|
# the right thing (same for `bump_stamp`). The only reason we're
|
||||||
|
# inserting `event_stream_ordering` here is because the column has a
|
||||||
|
# `NON NULL` constraint and we need some answer.
|
||||||
|
txn.execute(
|
||||||
|
f"""
|
||||||
|
INSERT INTO sliding_sync_joined_rooms
|
||||||
|
(room_id, event_stream_ordering, bump_stamp, {", ".join(sliding_sync_updates_keys)})
|
||||||
|
VALUES (
|
||||||
|
?,
|
||||||
|
(SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1),
|
||||||
|
?,
|
||||||
|
{", ".join("?" for _ in sliding_sync_updates_values)}
|
||||||
|
)
|
||||||
|
ON CONFLICT (room_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
{", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_updates_keys)}
|
||||||
|
""",
|
||||||
|
args,
|
||||||
)
|
)
|
||||||
|
|
||||||
# We now update `local_current_membership`. We do this regardless
|
# We now update `local_current_membership`. We do this regardless
|
||||||
@ -1854,38 +1830,63 @@ class PersistEventsStore:
|
|||||||
if sliding_sync_table_changes.to_insert_membership_snapshots:
|
if sliding_sync_table_changes.to_insert_membership_snapshots:
|
||||||
# Update the `sliding_sync_membership_snapshots` table
|
# Update the `sliding_sync_membership_snapshots` table
|
||||||
#
|
#
|
||||||
# We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys`
|
sliding_sync_snapshot_keys = (
|
||||||
# because there are other fields in the `ON CONFLICT` upsert to run (see
|
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
|
||||||
# inherit case above for more context when this happens).
|
)
|
||||||
self.db_pool.simple_upsert_many_txn(
|
sliding_sync_snapshot_values = (
|
||||||
txn=txn,
|
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
|
||||||
table="sliding_sync_membership_snapshots",
|
)
|
||||||
key_names=("room_id", "user_id"),
|
# We need to insert/update regardless of whether we have
|
||||||
key_values=[
|
# `sliding_sync_snapshot_keys` because there are other fields in the `ON
|
||||||
(room_id, membership_info.user_id)
|
# CONFLICT` upsert to run (see inherit case (explained in
|
||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
|
# `_calculate_sliding_sync_table_changes()`) for more context when this
|
||||||
],
|
# happens).
|
||||||
value_names=[
|
#
|
||||||
"sender",
|
# XXX: We use a sub-query for `stream_ordering` because it's unreliable to
|
||||||
"membership_event_id",
|
# pre-calculate from `events_and_contexts` at the time when
|
||||||
"membership",
|
# `_calculate_sliding_sync_table_changes()` is ran. We could be working with
|
||||||
"event_stream_ordering",
|
# events that were previously persisted as an `outlier` with one
|
||||||
"event_instance_name",
|
# `stream_ordering` but are now being persisted again and de-outliered and
|
||||||
]
|
# assigned a different `stream_ordering` that won't end up being used. Since
|
||||||
+ list(
|
# we call `_calculate_sliding_sync_table_changes()` before
|
||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
|
# `_update_outliers_txn()` which fixes this discrepancy (always use the
|
||||||
),
|
# `stream_ordering` from the first time it was persisted), we're working
|
||||||
value_values=[
|
# with an unreliable `stream_ordering` value that will possibly be unused
|
||||||
|
# and not make it into the `events` table.
|
||||||
|
txn.execute_batch(
|
||||||
|
f"""
|
||||||
|
INSERT INTO sliding_sync_membership_snapshots
|
||||||
|
(room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name
|
||||||
|
{("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""})
|
||||||
|
VALUES (
|
||||||
|
?, ?, ?, ?, ?,
|
||||||
|
(SELECT stream_ordering FROM events WHERE event_id = ?),
|
||||||
|
(SELECT instance_name FROM events WHERE event_id = ?)
|
||||||
|
{("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""}
|
||||||
|
)
|
||||||
|
ON CONFLICT (room_id, user_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
sender = EXCLUDED.sender,
|
||||||
|
membership_event_id = EXCLUDED.membership_event_id,
|
||||||
|
membership = EXCLUDED.membership,
|
||||||
|
event_stream_ordering = EXCLUDED.event_stream_ordering
|
||||||
|
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}
|
||||||
|
""",
|
||||||
|
[
|
||||||
[
|
[
|
||||||
|
room_id,
|
||||||
|
membership_info.user_id,
|
||||||
membership_info.sender,
|
membership_info.sender,
|
||||||
membership_info.membership_event_id,
|
membership_info.membership_event_id,
|
||||||
membership_info.membership,
|
membership_info.membership,
|
||||||
membership_info.membership_event_stream_ordering,
|
# XXX: We do not use `membership_info.membership_event_stream_ordering` here
|
||||||
membership_info.membership_event_instance_name,
|
# because it is an unreliable value. See XXX note above.
|
||||||
|
membership_info.membership_event_id,
|
||||||
|
# XXX: We do not use `membership_info.membership_event_instance_name` here
|
||||||
|
# because it is an unreliable value. See XXX note above.
|
||||||
|
membership_info.membership_event_id,
|
||||||
]
|
]
|
||||||
+ list(
|
+ list(sliding_sync_snapshot_values)
|
||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
|
|
||||||
)
|
|
||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
|
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -37,7 +37,7 @@ from synapse.storage.database import (
|
|||||||
from synapse.storage.databases.main.events import (
|
from synapse.storage.databases.main.events import (
|
||||||
SLIDING_SYNC_RELEVANT_STATE_SET,
|
SLIDING_SYNC_RELEVANT_STATE_SET,
|
||||||
PersistEventsStore,
|
PersistEventsStore,
|
||||||
SlidingSyncMembershipInfo,
|
SlidingSyncMembershipInfoWithEventPos,
|
||||||
SlidingSyncMembershipSnapshotSharedInsertValues,
|
SlidingSyncMembershipSnapshotSharedInsertValues,
|
||||||
SlidingSyncStateInsertValues,
|
SlidingSyncStateInsertValues,
|
||||||
)
|
)
|
||||||
@ -1994,9 +1994,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||||||
to_insert_membership_snapshots: Dict[
|
to_insert_membership_snapshots: Dict[
|
||||||
Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
|
Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
|
||||||
] = {}
|
] = {}
|
||||||
to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfo] = (
|
to_insert_membership_infos: Dict[
|
||||||
{}
|
Tuple[str, str], SlidingSyncMembershipInfoWithEventPos
|
||||||
)
|
] = {}
|
||||||
for (
|
for (
|
||||||
room_id,
|
room_id,
|
||||||
room_id_from_rooms_table,
|
room_id_from_rooms_table,
|
||||||
@ -2185,15 +2185,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||||||
to_insert_membership_snapshots[(room_id, user_id)] = (
|
to_insert_membership_snapshots[(room_id, user_id)] = (
|
||||||
sliding_sync_membership_snapshots_insert_map
|
sliding_sync_membership_snapshots_insert_map
|
||||||
)
|
)
|
||||||
to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfo(
|
to_insert_membership_infos[(room_id, user_id)] = (
|
||||||
user_id=user_id,
|
SlidingSyncMembershipInfoWithEventPos(
|
||||||
sender=sender,
|
user_id=user_id,
|
||||||
membership_event_id=membership_event_id,
|
sender=sender,
|
||||||
membership=membership,
|
membership_event_id=membership_event_id,
|
||||||
membership_event_stream_ordering=membership_event_stream_ordering,
|
membership=membership,
|
||||||
# If instance_name is null we default to "master"
|
membership_event_stream_ordering=membership_event_stream_ordering,
|
||||||
membership_event_instance_name=membership_event_instance_name
|
# If instance_name is null we default to "master"
|
||||||
or "master",
|
membership_event_instance_name=membership_event_instance_name
|
||||||
|
or "master",
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
def _fill_table_txn(txn: LoggingTransaction) -> None:
|
def _fill_table_txn(txn: LoggingTransaction) -> None:
|
||||||
|
@ -38,6 +38,7 @@ from synapse.storage.databases.main.events_bg_updates import (
|
|||||||
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
|
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
|
||||||
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
|
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
|
||||||
)
|
)
|
||||||
|
from synapse.types import create_requester
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.test_utils.event_injection import create_event
|
from tests.test_utils.event_injection import create_event
|
||||||
@ -925,6 +926,128 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
|
|||||||
user2_snapshot,
|
user2_snapshot,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@parameterized.expand(
|
||||||
|
# Test both an insert an upsert into the
|
||||||
|
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise
|
||||||
|
# more possibilities of things going wrong.
|
||||||
|
[
|
||||||
|
("insert", True),
|
||||||
|
("upsert", False),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_joined_room_outlier_and_deoutlier(
|
||||||
|
self, description: str, should_insert: bool
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
This is a regression test.
|
||||||
|
|
||||||
|
This is to simulate the case where an event is first persisted as an outlier
|
||||||
|
(like a remote invite) and then later persisted again to de-outlier it. The
|
||||||
|
first the time, the `outlier` is persisted with one `stream_ordering` but when
|
||||||
|
persisted again and de-outliered, it is assigned a different `stream_ordering`
|
||||||
|
that won't end up being used. Since we call
|
||||||
|
`_calculate_sliding_sync_table_changes()` before `_update_outliers_txn()` which
|
||||||
|
fixes this discrepancy (always use the `stream_ordering` from the first time it
|
||||||
|
was persisted), make sure we're not using an unreliable `stream_ordering` values
|
||||||
|
that will cause `FOREIGN KEY constraint failed` in the
|
||||||
|
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
_user1_tok = self.login(user1_id, "pass")
|
||||||
|
user2_id = self.register_user("user2", "pass")
|
||||||
|
user2_tok = self.login(user2_id, "pass")
|
||||||
|
|
||||||
|
room_version = RoomVersions.V10
|
||||||
|
room_id = self.helper.create_room_as(
|
||||||
|
user2_id, tok=user2_tok, room_version=room_version.identifier
|
||||||
|
)
|
||||||
|
|
||||||
|
if should_insert:
|
||||||
|
# Clear these out so we always insert
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_delete(
|
||||||
|
table="sliding_sync_joined_rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
desc="TODO",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_delete(
|
||||||
|
table="sliding_sync_membership_snapshots",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
desc="TODO",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a membership event (which triggers an insert into
|
||||||
|
# `sliding_sync_membership_snapshots`)
|
||||||
|
membership_event_dict = {
|
||||||
|
"type": EventTypes.Member,
|
||||||
|
"state_key": user1_id,
|
||||||
|
"sender": user1_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
"content": {EventContentFields.MEMBERSHIP: Membership.JOIN},
|
||||||
|
}
|
||||||
|
# Create a relevant state event (which triggers an insert into
|
||||||
|
# `sliding_sync_joined_rooms`)
|
||||||
|
state_event_dict = {
|
||||||
|
"type": EventTypes.Name,
|
||||||
|
"state_key": "",
|
||||||
|
"sender": user2_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
"content": {EventContentFields.ROOM_NAME: "my super room"},
|
||||||
|
}
|
||||||
|
event_dicts_to_persist = [
|
||||||
|
membership_event_dict,
|
||||||
|
state_event_dict,
|
||||||
|
]
|
||||||
|
|
||||||
|
for event_dict in event_dicts_to_persist:
|
||||||
|
events_to_persist = []
|
||||||
|
|
||||||
|
# Create the events as an outliers
|
||||||
|
(
|
||||||
|
event,
|
||||||
|
unpersisted_context,
|
||||||
|
) = self.get_success(
|
||||||
|
self.hs.get_event_creation_handler().create_event(
|
||||||
|
requester=create_requester(user1_id),
|
||||||
|
event_dict=event_dict,
|
||||||
|
outlier=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# FIXME: Should we use an `EventContext.for_outlier(...)` here?
|
||||||
|
# Doesn't seem to matter for this test.
|
||||||
|
context = self.get_success(unpersisted_context.persist(event))
|
||||||
|
events_to_persist.append((event, context))
|
||||||
|
|
||||||
|
# Create the event again but as an non-outlier. This will de-outlier the event
|
||||||
|
# when we persist it.
|
||||||
|
(
|
||||||
|
event,
|
||||||
|
unpersisted_context,
|
||||||
|
) = self.get_success(
|
||||||
|
self.hs.get_event_creation_handler().create_event(
|
||||||
|
requester=create_requester(user1_id),
|
||||||
|
event_dict=event_dict,
|
||||||
|
outlier=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
context = self.get_success(unpersisted_context.persist(event))
|
||||||
|
events_to_persist.append((event, context))
|
||||||
|
|
||||||
|
persist_controller = self.hs.get_storage_controllers().persistence
|
||||||
|
assert persist_controller is not None
|
||||||
|
for event, context in events_to_persist:
|
||||||
|
self.get_success(
|
||||||
|
persist_controller.persist_event(
|
||||||
|
event,
|
||||||
|
context,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# We're just testing that it does not explode
|
||||||
|
|
||||||
def test_joined_room_meta_state_reset(self) -> None:
|
def test_joined_room_meta_state_reset(self) -> None:
|
||||||
"""
|
"""
|
||||||
Test that a state reset on the room name is reflected in the
|
Test that a state reset on the room name is reflected in the
|
||||||
|
Loading…
Reference in New Issue
Block a user