parallelize getAllItems

This commit is contained in:
Christien Rioux 2024-04-28 21:18:30 -04:00
parent 0b4de3ad13
commit 7e9254faac
5 changed files with 15 additions and 5 deletions

View File

@ -24,6 +24,9 @@ const int? defaultWatchDurationSecs = null; // 600
const int watchRenewalNumerator = 4;
const int watchRenewalDenominator = 5;
// Maximum number of concurrent DHT operations to perform on the network
const int maxDHTConcurrency = 8;
typedef DHTRecordPoolLogger = void Function(String message);
/// Record pool that managed DHTRecords and allows for tagged deletion

View File

@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:collection/collection.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';

View File

@ -93,12 +93,17 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
Future<List<Uint8List>?> getAllItems({bool forceRefresh = false}) async {
final out = <Uint8List>[];
for (var pos = 0; pos < _head.length; pos++) {
final elem = await getItem(pos, forceRefresh: forceRefresh);
if (elem == null) {
final chunks = Iterable<int>.generate(_head.length)
.slices(maxDHTConcurrency)
.map((chunk) =>
chunk.map((pos) => getItem(pos, forceRefresh: forceRefresh)));
for (final chunk in chunks) {
final elems = await chunk.wait;
if (elems.contains(null)) {
return null;
}
out.add(elem);
out.addAll(elems.cast<Uint8List>());
}
return out;

View File

@ -168,7 +168,7 @@ packages:
source: hosted
version: "4.10.0"
collection:
dependency: transitive
dependency: "direct main"
description:
name: collection
sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a

View File

@ -12,6 +12,7 @@ dependencies:
bloc: ^8.1.3
bloc_tools:
path: ../bloc_tools
collection: ^1.18.0
equatable: ^2.0.5
fast_immutable_collections: ^10.1.1
freezed_annotation: ^2.4.1