From 79fb64e417686c91eeb64016e91dffeac9115a80 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Feb 2022 13:38:05 +0000 Subject: [PATCH] Fix to-device being dropped in limited sync in SQLite. (#11966) If ther are more than 100 to-device messages pending for a device `/sync` will only return the first 100, however the next batch token was incorrectly calculated and so all other pending messages would be dropped. This is due to `txn.rowcount` only returning the number of rows that *changed*, rather than the number *selected* in SQLite. --- changelog.d/11966.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 5 ++- tests/rest/client/test_sendtodevice.py | 40 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11966.bugfix diff --git a/changelog.d/11966.bugfix b/changelog.d/11966.bugfix new file mode 100644 index 000000000..af8e09666 --- /dev/null +++ b/changelog.d/11966.bugfix @@ -0,0 +1 @@ +Fix to-device messages being dropped during limited sync when using SQLite. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 8801b7b2d..1392363de 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -362,7 +362,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): # intended for each device. last_processed_stream_pos = to_stream_id recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {} + rowcount = 0 for row in txn: + rowcount += 1 + last_processed_stream_pos = row[0] recipient_user_id = row[1] recipient_device_id = row[2] @@ -373,7 +376,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): (recipient_user_id, recipient_device_id), [] ).append(message_dict) - if limit is not None and txn.rowcount == limit: + if limit is not None and rowcount == limit: # We ended up bumping up against the message limit. There may be more messages # to retrieve. Return what we have, as well as the last stream position that # was processed. diff --git a/tests/rest/client/test_sendtodevice.py b/tests/rest/client/test_sendtodevice.py index 6db7062a8..e2ed14457 100644 --- a/tests/rest/client/test_sendtodevice.py +++ b/tests/rest/client/test_sendtodevice.py @@ -198,3 +198,43 @@ class SendToDeviceTestCase(HomeserverTestCase): "content": {"idx": 3}, }, ) + + def test_limited_sync(self): + """If a limited sync for to-devices happens the next /sync should respond immediately.""" + + self.register_user("u1", "pass") + user1_tok = self.login("u1", "pass", "d1") + + user2 = self.register_user("u2", "pass") + user2_tok = self.login("u2", "pass", "d2") + + # Do an initial sync + channel = self.make_request("GET", "/sync", access_token=user2_tok) + self.assertEqual(channel.code, 200, channel.result) + sync_token = channel.json_body["next_batch"] + + # Send 150 to-device messages. We limit to 100 in `/sync` + for i in range(150): + test_msg = {"foo": "bar"} + chan = self.make_request( + "PUT", + f"/_matrix/client/r0/sendToDevice/m.test/1234-{i}", + content={"messages": {user2: {"d2": test_msg}}}, + access_token=user1_tok, + ) + self.assertEqual(chan.code, 200, chan.result) + + channel = self.make_request( + "GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok + ) + self.assertEqual(channel.code, 200, channel.result) + messages = channel.json_body.get("to_device", {}).get("events", []) + self.assertEqual(len(messages), 100) + sync_token = channel.json_body["next_batch"] + + channel = self.make_request( + "GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok + ) + self.assertEqual(channel.code, 200, channel.result) + messages = channel.json_body.get("to_device", {}).get("events", []) + self.assertEqual(len(messages), 50)