diff --git a/changelog.d/17192.misc b/changelog.d/17192.misc new file mode 100644 index 000000000..25e157a50 --- /dev/null +++ b/changelog.d/17192.misc @@ -0,0 +1 @@ +Improve performance by fixing a reactor pause. diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ba257d34e..5e5387fdc 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import ( ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -150,9 +151,15 @@ class ReplicationDataHandler: if row.entity.startswith("@") and not row.is_signature: room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event( - StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids - ) + + # `all_room_ids` can be large, so let's wake up those streams in batches + for batched_room_ids in batch_iter(all_room_ids, 100): + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids + ) + + # Yield to reactor so that we don't block. + await self._clock.sleep(0) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: