Fix backwards compatibility with upcoming threads schema changes. (#14045)

Ensure that the upsert will work properly by first updating any existing
rows (in the same way that the background update to backfill data works).
This commit is contained in:
Patrick Cloke 2022-10-05 07:56:05 -04:00 committed by GitHub
parent 17bc4ecff2
commit e3d4755454
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 11 deletions

1
changelog.d/14045.misc Normal file
View File

@ -0,0 +1 @@
Ensure Synapse v1.69 works with upcoming database changes in v1.70.

View File

@ -1103,19 +1103,26 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
) )
# First ensure that the existing rows have an updated thread_id field.
self.db_pool.simple_update_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id, "thread_id": None},
updatevalues={"thread_id": "main"},
)
# Replace the previous summary with the new counts. # Replace the previous summary with the new counts.
# #
# TODO(threads): Upsert per-thread instead of setting them all to main. # TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="event_push_summary", table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id}, keyvalues={"room_id": room_id, "user_id": user_id, "thread_id": "main"},
values={ values={
"notif_count": notif_count, "notif_count": notif_count,
"unread_count": unread_count, "unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering, "stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering, "last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
}, },
) )
@ -1264,20 +1271,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications, handling %d rows", len(summaries)) logger.info("Rotating notifications, handling %d rows", len(summaries))
# Ensure that any updated threads have an updated thread_id.
self.db_pool.simple_update_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id", "thread_id"),
key_values=[(user_id, room_id, None) for user_id, room_id in summaries],
value_names=("thread_id",),
value_values=[("main",) for _ in summaries],
)
# TODO(threads): Update on a per-thread basis. # TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn( self.db_pool.simple_upsert_many_txn(
txn, txn,
table="event_push_summary", table="event_push_summary",
key_names=("user_id", "room_id"), key_names=("user_id", "room_id", "thread_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries], key_values=[(user_id, room_id, "main") for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"), value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[ value_values=[
( (summary.notif_count, summary.unread_count, summary.stream_ordering)
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values() for summary in summaries.values()
], ],
) )