diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index 0be4f25..608131c 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -24,6 +24,9 @@ const int? defaultWatchDurationSecs = null; // 600 const int watchRenewalNumerator = 4; const int watchRenewalDenominator = 5; +// Maximum number of concurrent DHT operations to perform on the network +const int maxDHTConcurrency = 8; + typedef DHTRecordPoolLogger = void Function(String message); /// Record pool that managed DHTRecords and allows for tagged deletion diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index 5a4210f..de17b62 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:typed_data'; +import 'package:collection/collection.dart'; import 'package:mutex/mutex.dart'; import 'package:protobuf/protobuf.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 0dbe51e..342e67a 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -93,12 +93,17 @@ class _DHTShortArrayRead implements DHTShortArrayRead { Future?> getAllItems({bool forceRefresh = false}) async { final out = []; - for (var pos = 0; pos < _head.length; pos++) { - final elem = await getItem(pos, forceRefresh: forceRefresh); - if (elem == null) { + final chunks = Iterable.generate(_head.length) + .slices(maxDHTConcurrency) + .map((chunk) => + chunk.map((pos) => getItem(pos, forceRefresh: forceRefresh))); + + for (final chunk in chunks) { + final elems = await chunk.wait; + if (elems.contains(null)) { return null; } - out.add(elem); + out.addAll(elems.cast()); } return out; diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index 00f3cdd..9dcb93e 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -168,7 +168,7 @@ packages: source: hosted version: "4.10.0" collection: - dependency: transitive + dependency: "direct main" description: name: collection sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a diff --git a/packages/veilid_support/pubspec.yaml b/packages/veilid_support/pubspec.yaml index d7344ef..0d2d439 100644 --- a/packages/veilid_support/pubspec.yaml +++ b/packages/veilid_support/pubspec.yaml @@ -12,6 +12,7 @@ dependencies: bloc: ^8.1.3 bloc_tools: path: ../bloc_tools + collection: ^1.18.0 equatable: ^2.0.5 fast_immutable_collections: ^10.1.1 freezed_annotation: ^2.4.1