mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-05 23:54:14 -04:00
to_device updates could be dropped when consuming the replication stream (#15349)
Co-authored-by: reivilibre <oliverw@matrix.org>
This commit is contained in:
parent
91c3f32673
commit
6f68e32bfb
5 changed files with 98 additions and 15 deletions
|
@ -617,14 +617,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
# We limit like this as we might have multiple rows per stream_id, and
|
||||
# we want to make sure we always get all entries for any stream_id
|
||||
# we return.
|
||||
upper_pos = min(current_id, last_id + limit)
|
||||
upto_token = min(current_id, last_id + limit)
|
||||
sql = (
|
||||
"SELECT max(stream_id), user_id"
|
||||
" FROM device_inbox"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" GROUP BY user_id"
|
||||
)
|
||||
txn.execute(sql, (last_id, upper_pos))
|
||||
txn.execute(sql, (last_id, upto_token))
|
||||
updates = [(row[0], row[1:]) for row in txn]
|
||||
|
||||
sql = (
|
||||
|
@ -633,19 +633,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" GROUP BY destination"
|
||||
)
|
||||
txn.execute(sql, (last_id, upper_pos))
|
||||
txn.execute(sql, (last_id, upto_token))
|
||||
updates.extend((row[0], row[1:]) for row in txn)
|
||||
|
||||
# Order by ascending stream ordering
|
||||
updates.sort()
|
||||
|
||||
limited = False
|
||||
upto_token = current_id
|
||||
if len(updates) >= limit:
|
||||
upto_token = updates[-1][0]
|
||||
limited = True
|
||||
|
||||
return updates, upto_token, limited
|
||||
return updates, upto_token, upto_token < current_id
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_all_new_device_messages", get_all_new_device_messages_txn
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue