concurrency work in prep for speeding things up

refactor splash screen to process initialization in a better way
more async tools for async cubit constructors
greatly improved StateMapFollower class
This commit is contained in:
Christien Rioux 2024-04-03 21:55:49 -04:00
parent 8da1dc7d32
commit 9bb20f4dd2
47 changed files with 886 additions and 579 deletions

View file

@ -1,3 +1,4 @@
export 'default_dht_record_cubit.dart';
export 'dht_record_crypto.dart';
export 'dht_record_cubit.dart';
export 'dht_record_pool.dart';

View file

@ -0,0 +1,66 @@
import 'dart:typed_data';
import '../../../veilid_support.dart';
/// Cubit that watches the default subkey value of a dhtrecord
class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
DefaultDHTRecordCubit({
required super.open,
required T Function(List<int> data) decodeState,
}) : super(
initialStateFunction: _makeInitialStateFunction(decodeState),
stateFunction: _makeStateFunction(decodeState),
watchFunction: _makeWatchFunction());
// DefaultDHTRecordCubit.value({
// required super.record,
// required T Function(List<int> data) decodeState,
// }) : super.value(
// initialStateFunction: _makeInitialStateFunction(decodeState),
// stateFunction: _makeStateFunction(decodeState),
// watchFunction: _makeWatchFunction());
static InitialStateFunction<T> _makeInitialStateFunction<T>(
T Function(List<int> data) decodeState) =>
(record) async {
final initialData = await record.get();
if (initialData == null) {
return null;
}
return decodeState(initialData);
};
static StateFunction<T> _makeStateFunction<T>(
T Function(List<int> data) decodeState) =>
(record, subkeys, updatedata) async {
final defaultSubkey = record.subkeyOrDefault(-1);
if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey || updatedata == null) {
final maybeData = await record.get(forceRefresh: true);
if (maybeData == null) {
return null;
}
data = maybeData;
} else {
data = updatedata;
}
final newState = decodeState(data);
return newState;
}
return null;
};
static WatchFunction _makeWatchFunction() => (record) async {
final defaultSubkey = record.subkeyOrDefault(-1);
await record.watch(subkeys: [ValueSubkeyRange.single(defaultSubkey)]);
};
Future<void> refreshDefault() async {
await initWait();
final defaultSubkey = record.subkeyOrDefault(-1);
await refresh([ValueSubkeyRange(low: defaultSubkey, high: defaultSubkey)]);
}
}

View file

@ -16,12 +16,13 @@ class DHTRecordWatchChange extends Equatable {
/////////////////////////////////////////////////
class DHTRecord {
DHTRecord(
DHTRecord._(
{required VeilidRoutingContext routingContext,
required SharedDHTRecordData sharedDHTRecordData,
required int defaultSubkey,
required KeyPair? writer,
required DHTRecordCrypto crypto})
required DHTRecordCrypto crypto,
required this.debugName})
: _crypto = crypto,
_routingContext = routingContext,
_defaultSubkey = defaultSubkey,
@ -34,6 +35,7 @@ class DHTRecord {
final int _defaultSubkey;
final KeyPair? _writer;
final DHTRecordCrypto _crypto;
final String debugName;
bool _open;
@internal

View file

@ -3,6 +3,7 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:meta/meta.dart';
import '../../../veilid_support.dart';
@ -20,7 +21,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
}) : _wantsCloseRecord = false,
_stateFunction = stateFunction,
super(const AsyncValue.loading()) {
Future.delayed(Duration.zero, () async {
initWait.add(() async {
// Do record open/create
_record = await open();
_wantsCloseRecord = true;
@ -73,6 +74,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
@override
Future<void> close() async {
await initWait();
await _record.cancelWatch();
await _subscription?.cancel();
_subscription = null;
@ -84,6 +86,8 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
}
Future<void> refresh(List<ValueSubkeyRange> subkeys) async {
await initWait();
var updateSubkeys = [...subkeys];
for (final skr in subkeys) {
@ -107,69 +111,11 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecord get record => _record;
@protected
final WaitSet initWait = WaitSet();
StreamSubscription<DHTRecordWatchChange>? _subscription;
late DHTRecord _record;
bool _wantsCloseRecord;
final StateFunction<T> _stateFunction;
}
// Cubit that watches the default subkey value of a dhtrecord
class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
DefaultDHTRecordCubit({
required super.open,
required T Function(List<int> data) decodeState,
}) : super(
initialStateFunction: _makeInitialStateFunction(decodeState),
stateFunction: _makeStateFunction(decodeState),
watchFunction: _makeWatchFunction());
// DefaultDHTRecordCubit.value({
// required super.record,
// required T Function(List<int> data) decodeState,
// }) : super.value(
// initialStateFunction: _makeInitialStateFunction(decodeState),
// stateFunction: _makeStateFunction(decodeState),
// watchFunction: _makeWatchFunction());
static InitialStateFunction<T> _makeInitialStateFunction<T>(
T Function(List<int> data) decodeState) =>
(record) async {
final initialData = await record.get();
if (initialData == null) {
return null;
}
return decodeState(initialData);
};
static StateFunction<T> _makeStateFunction<T>(
T Function(List<int> data) decodeState) =>
(record, subkeys, updatedata) async {
final defaultSubkey = record.subkeyOrDefault(-1);
if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey || updatedata == null) {
final maybeData = await record.get(forceRefresh: true);
if (maybeData == null) {
return null;
}
data = maybeData;
} else {
data = updatedata;
}
final newState = decodeState(data);
return newState;
}
return null;
};
static WatchFunction _makeWatchFunction() => (record) async {
final defaultSubkey = record.subkeyOrDefault(-1);
await record.watch(subkeys: [ValueSubkeyRange.single(defaultSubkey)]);
};
Future<void> refreshDefault() async {
final defaultSubkey = _record.subkeyOrDefault(-1);
await refresh([ValueSubkeyRange(low: defaultSubkey, high: defaultSubkey)]);
}
}

View file

@ -18,14 +18,16 @@ const int watchBackoffMultiplier = 2;
const int watchBackoffMax = 30;
/// Record pool that managed DHTRecords and allows for tagged deletion
/// String versions of keys due to IMap<> json unsupported in key
@freezed
class DHTRecordPoolAllocations with _$DHTRecordPoolAllocations {
const factory DHTRecordPoolAllocations({
required IMap<String, ISet<TypedKey>>
childrenByParent, // String key due to IMap<> json unsupported in key
required IMap<String, TypedKey>
parentByChild, // String key due to IMap<> json unsupported in key
required ISet<TypedKey> rootRecords,
@Default(IMapConst<String, ISet<TypedKey>>({}))
IMap<String, ISet<TypedKey>> childrenByParent,
@Default(IMapConst<String, TypedKey>({}))
IMap<String, TypedKey> parentByChild,
@Default(ISetConst<TypedKey>({})) ISet<TypedKey> rootRecords,
@Default(IMapConst<String, String>({})) IMap<String, String> debugNames,
}) = _DHTRecordPoolAllocations;
factory DHTRecordPoolAllocations.fromJson(dynamic json) =>
@ -92,10 +94,7 @@ class OpenedRecordInfo {
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
: _state = DHTRecordPoolAllocations(
childrenByParent: IMap(),
parentByChild: IMap(),
rootRecords: ISet()),
: _state = const DHTRecordPoolAllocations(),
_mutex = Mutex(),
_opened = <TypedKey, OpenedRecordInfo>{},
_routingContext = routingContext,
@ -129,8 +128,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
@override
DHTRecordPoolAllocations valueFromJson(Object? obj) => obj != null
? DHTRecordPoolAllocations.fromJson(obj)
: DHTRecordPoolAllocations(
childrenByParent: IMap(), parentByChild: IMap(), rootRecords: ISet());
: const DHTRecordPoolAllocations();
@override
Object? valueToJson(DHTRecordPoolAllocations val) => val.toJson();
@ -148,7 +146,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
Veilid get veilid => _veilid;
Future<OpenedRecordInfo> _recordCreateInner(
{required VeilidRoutingContext dhtctx,
{required String debugName,
required VeilidRoutingContext dhtctx,
required DHTSchema schema,
KeyPair? writer,
TypedKey? parent}) async {
@ -169,13 +168,18 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
_opened[recordDescriptor.key] = openedRecordInfo;
// Register the dependency
await _addDependencyInner(parent, recordDescriptor.key);
await _addDependencyInner(
parent,
recordDescriptor.key,
debugName: debugName,
);
return openedRecordInfo;
}
Future<OpenedRecordInfo> _recordOpenInner(
{required VeilidRoutingContext dhtctx,
{required String debugName,
required VeilidRoutingContext dhtctx,
required TypedKey recordKey,
KeyPair? writer,
TypedKey? parent}) async {
@ -198,7 +202,11 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
_opened[recordDescriptor.key] = newOpenedRecordInfo;
// Register the dependency
await _addDependencyInner(parent, recordKey);
await _addDependencyInner(
parent,
recordKey,
debugName: debugName,
);
return newOpenedRecordInfo;
}
@ -218,7 +226,11 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}
// Register the dependency
await _addDependencyInner(parent, recordKey);
await _addDependencyInner(
parent,
recordKey,
debugName: debugName,
);
return openedRecordInfo;
}
@ -259,6 +271,18 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
return allDeps.reversedView;
}
void _debugPrintChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
allDeps ??= _collectChildrenInner(recordKey);
// ignore: avoid_print
print('Parent: $recordKey (${_state.debugNames[recordKey.toString()]})');
for (final dep in allDeps) {
if (dep != recordKey) {
// ignore: avoid_print
print(' Child: $dep (${_state.debugNames[dep.toString()]})');
}
}
}
Future<void> _deleteInner(TypedKey recordKey) async {
// Remove this child from parents
await _removeDependenciesInner([recordKey]);
@ -269,7 +293,10 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
await _mutex.protect(() async {
final allDeps = _collectChildrenInner(recordKey);
assert(allDeps.singleOrNull == recordKey, 'must delete children first');
if (allDeps.singleOrNull != recordKey) {
_debugPrintChildren(recordKey, allDeps: allDeps);
assert(false, 'must delete children first');
}
final ori = _opened[recordKey];
if (ori != null) {
@ -301,15 +328,17 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}
}
Future<void> _addDependencyInner(TypedKey? parent, TypedKey child) async {
Future<void> _addDependencyInner(TypedKey? parent, TypedKey child,
{required String debugName}) async {
assert(_mutex.isLocked, 'should be locked here');
if (parent == null) {
if (_state.rootRecords.contains(child)) {
// Dependency already added
return;
}
_state = await store(
_state.copyWith(rootRecords: _state.rootRecords.add(child)));
_state = await store(_state.copyWith(
rootRecords: _state.rootRecords.add(child),
debugNames: _state.debugNames.add(child.toJson(), debugName)));
} else {
final childrenOfParent =
_state.childrenByParent[parent.toJson()] ?? ISet<TypedKey>();
@ -320,7 +349,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
_state = await store(_state.copyWith(
childrenByParent: _state.childrenByParent
.add(parent.toJson(), childrenOfParent.add(child)),
parentByChild: _state.parentByChild.add(child.toJson(), parent)));
parentByChild: _state.parentByChild.add(child.toJson(), parent),
debugNames: _state.debugNames.add(child.toJson(), debugName)));
}
}
@ -331,7 +361,9 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
for (final child in childList) {
if (_state.rootRecords.contains(child)) {
state = state.copyWith(rootRecords: state.rootRecords.remove(child));
state = state.copyWith(
rootRecords: state.rootRecords.remove(child),
debugNames: state.debugNames.remove(child.toJson()));
} else {
final parent = state.parentByChild[child.toJson()];
if (parent == null) {
@ -341,12 +373,14 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
if (children.isEmpty) {
state = state.copyWith(
childrenByParent: state.childrenByParent.remove(parent.toJson()),
parentByChild: state.parentByChild.remove(child.toJson()));
parentByChild: state.parentByChild.remove(child.toJson()),
debugNames: state.debugNames.remove(child.toJson()));
} else {
state = state.copyWith(
childrenByParent:
state.childrenByParent.add(parent.toJson(), children),
parentByChild: state.parentByChild.remove(child.toJson()));
parentByChild: state.parentByChild.remove(child.toJson()),
debugNames: state.debugNames.remove(child.toJson()));
}
}
}
@ -360,6 +394,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
/// Create a root DHTRecord that has no dependent records
Future<DHTRecord> create({
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTSchema schema = const DHTSchema.dflt(oCnt: 1),
@ -371,9 +406,14 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
final dhtctx = routingContext ?? _routingContext;
final openedRecordInfo = await _recordCreateInner(
dhtctx: dhtctx, schema: schema, writer: writer, parent: parent);
debugName: debugName,
dhtctx: dhtctx,
schema: schema,
writer: writer,
parent: parent);
final rec = DHTRecord(
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
sharedDHTRecordData: openedRecordInfo.shared,
@ -391,7 +431,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
/// Open a DHTRecord readonly
Future<DHTRecord> openRead(TypedKey recordKey,
{VeilidRoutingContext? routingContext,
{required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
int defaultSubkey = 0,
DHTRecordCrypto? crypto}) async =>
@ -399,9 +440,13 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
final dhtctx = routingContext ?? _routingContext;
final openedRecordInfo = await _recordOpenInner(
dhtctx: dhtctx, recordKey: recordKey, parent: parent);
debugName: debugName,
dhtctx: dhtctx,
recordKey: recordKey,
parent: parent);
final rec = DHTRecord(
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
sharedDHTRecordData: openedRecordInfo.shared,
@ -417,6 +462,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
Future<DHTRecord> openWrite(
TypedKey recordKey,
KeyPair writer, {
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
int defaultSubkey = 0,
@ -426,12 +472,14 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
final dhtctx = routingContext ?? _routingContext;
final openedRecordInfo = await _recordOpenInner(
debugName: debugName,
dhtctx: dhtctx,
recordKey: recordKey,
parent: parent,
writer: writer);
final rec = DHTRecord(
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
writer: writer,
@ -453,6 +501,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
/// parent must be specified.
Future<DHTRecord> openOwned(
OwnedDHTRecordPointer ownedDHTRecordPointer, {
required String debugName,
required TypedKey parent,
VeilidRoutingContext? routingContext,
int defaultSubkey = 0,
@ -461,6 +510,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
openWrite(
ownedDHTRecordPointer.recordKey,
ownedDHTRecordPointer.owner,
debugName: debugName,
routingContext: routingContext,
parent: parent,
defaultSubkey: defaultSubkey,

View file

@ -22,11 +22,12 @@ DHTRecordPoolAllocations _$DHTRecordPoolAllocationsFromJson(
/// @nodoc
mixin _$DHTRecordPoolAllocations {
IMap<String, ISet<Typed<FixedEncodedString43>>> get childrenByParent =>
throw _privateConstructorUsedError; // String key due to IMap<> json unsupported in key
throw _privateConstructorUsedError;
IMap<String, Typed<FixedEncodedString43>> get parentByChild =>
throw _privateConstructorUsedError; // String key due to IMap<> json unsupported in key
throw _privateConstructorUsedError;
ISet<Typed<FixedEncodedString43>> get rootRecords =>
throw _privateConstructorUsedError;
IMap<String, String> get debugNames => throw _privateConstructorUsedError;
Map<String, dynamic> toJson() => throw _privateConstructorUsedError;
@JsonKey(ignore: true)
@ -43,7 +44,8 @@ abstract class $DHTRecordPoolAllocationsCopyWith<$Res> {
$Res call(
{IMap<String, ISet<Typed<FixedEncodedString43>>> childrenByParent,
IMap<String, Typed<FixedEncodedString43>> parentByChild,
ISet<Typed<FixedEncodedString43>> rootRecords});
ISet<Typed<FixedEncodedString43>> rootRecords,
IMap<String, String> debugNames});
}
/// @nodoc
@ -63,6 +65,7 @@ class _$DHTRecordPoolAllocationsCopyWithImpl<$Res,
Object? childrenByParent = null,
Object? parentByChild = null,
Object? rootRecords = null,
Object? debugNames = null,
}) {
return _then(_value.copyWith(
childrenByParent: null == childrenByParent
@ -77,6 +80,10 @@ class _$DHTRecordPoolAllocationsCopyWithImpl<$Res,
? _value.rootRecords
: rootRecords // ignore: cast_nullable_to_non_nullable
as ISet<Typed<FixedEncodedString43>>,
debugNames: null == debugNames
? _value.debugNames
: debugNames // ignore: cast_nullable_to_non_nullable
as IMap<String, String>,
) as $Val);
}
}
@ -93,7 +100,8 @@ abstract class _$$DHTRecordPoolAllocationsImplCopyWith<$Res>
$Res call(
{IMap<String, ISet<Typed<FixedEncodedString43>>> childrenByParent,
IMap<String, Typed<FixedEncodedString43>> parentByChild,
ISet<Typed<FixedEncodedString43>> rootRecords});
ISet<Typed<FixedEncodedString43>> rootRecords,
IMap<String, String> debugNames});
}
/// @nodoc
@ -112,6 +120,7 @@ class __$$DHTRecordPoolAllocationsImplCopyWithImpl<$Res>
Object? childrenByParent = null,
Object? parentByChild = null,
Object? rootRecords = null,
Object? debugNames = null,
}) {
return _then(_$DHTRecordPoolAllocationsImpl(
childrenByParent: null == childrenByParent
@ -126,6 +135,10 @@ class __$$DHTRecordPoolAllocationsImplCopyWithImpl<$Res>
? _value.rootRecords
: rootRecords // ignore: cast_nullable_to_non_nullable
as ISet<Typed<FixedEncodedString43>>,
debugNames: null == debugNames
? _value.debugNames
: debugNames // ignore: cast_nullable_to_non_nullable
as IMap<String, String>,
));
}
}
@ -134,25 +147,30 @@ class __$$DHTRecordPoolAllocationsImplCopyWithImpl<$Res>
@JsonSerializable()
class _$DHTRecordPoolAllocationsImpl implements _DHTRecordPoolAllocations {
const _$DHTRecordPoolAllocationsImpl(
{required this.childrenByParent,
required this.parentByChild,
required this.rootRecords});
{this.childrenByParent = const IMapConst<String, ISet<TypedKey>>({}),
this.parentByChild = const IMapConst<String, TypedKey>({}),
this.rootRecords = const ISetConst<TypedKey>({}),
this.debugNames = const IMapConst<String, String>({})});
factory _$DHTRecordPoolAllocationsImpl.fromJson(Map<String, dynamic> json) =>
_$$DHTRecordPoolAllocationsImplFromJson(json);
@override
@JsonKey()
final IMap<String, ISet<Typed<FixedEncodedString43>>> childrenByParent;
// String key due to IMap<> json unsupported in key
@override
@JsonKey()
final IMap<String, Typed<FixedEncodedString43>> parentByChild;
// String key due to IMap<> json unsupported in key
@override
@JsonKey()
final ISet<Typed<FixedEncodedString43>> rootRecords;
@override
@JsonKey()
final IMap<String, String> debugNames;
@override
String toString() {
return 'DHTRecordPoolAllocations(childrenByParent: $childrenByParent, parentByChild: $parentByChild, rootRecords: $rootRecords)';
return 'DHTRecordPoolAllocations(childrenByParent: $childrenByParent, parentByChild: $parentByChild, rootRecords: $rootRecords, debugNames: $debugNames)';
}
@override
@ -165,13 +183,15 @@ class _$DHTRecordPoolAllocationsImpl implements _DHTRecordPoolAllocations {
(identical(other.parentByChild, parentByChild) ||
other.parentByChild == parentByChild) &&
const DeepCollectionEquality()
.equals(other.rootRecords, rootRecords));
.equals(other.rootRecords, rootRecords) &&
(identical(other.debugNames, debugNames) ||
other.debugNames == debugNames));
}
@JsonKey(ignore: true)
@override
int get hashCode => Object.hash(runtimeType, childrenByParent, parentByChild,
const DeepCollectionEquality().hash(rootRecords));
const DeepCollectionEquality().hash(rootRecords), debugNames);
@JsonKey(ignore: true)
@override
@ -190,22 +210,23 @@ class _$DHTRecordPoolAllocationsImpl implements _DHTRecordPoolAllocations {
abstract class _DHTRecordPoolAllocations implements DHTRecordPoolAllocations {
const factory _DHTRecordPoolAllocations(
{required final IMap<String, ISet<Typed<FixedEncodedString43>>>
childrenByParent,
required final IMap<String, Typed<FixedEncodedString43>> parentByChild,
required final ISet<Typed<FixedEncodedString43>>
rootRecords}) = _$DHTRecordPoolAllocationsImpl;
{final IMap<String, ISet<Typed<FixedEncodedString43>>> childrenByParent,
final IMap<String, Typed<FixedEncodedString43>> parentByChild,
final ISet<Typed<FixedEncodedString43>> rootRecords,
final IMap<String, String> debugNames}) = _$DHTRecordPoolAllocationsImpl;
factory _DHTRecordPoolAllocations.fromJson(Map<String, dynamic> json) =
_$DHTRecordPoolAllocationsImpl.fromJson;
@override
IMap<String, ISet<Typed<FixedEncodedString43>>> get childrenByParent;
@override // String key due to IMap<> json unsupported in key
@override
IMap<String, Typed<FixedEncodedString43>> get parentByChild;
@override // String key due to IMap<> json unsupported in key
@override
ISet<Typed<FixedEncodedString43>> get rootRecords;
@override
IMap<String, String> get debugNames;
@override
@JsonKey(ignore: true)
_$$DHTRecordPoolAllocationsImplCopyWith<_$DHTRecordPoolAllocationsImpl>
get copyWith => throw _privateConstructorUsedError;

View file

@ -9,19 +9,29 @@ part of 'dht_record_pool.dart';
_$DHTRecordPoolAllocationsImpl _$$DHTRecordPoolAllocationsImplFromJson(
Map<String, dynamic> json) =>
_$DHTRecordPoolAllocationsImpl(
childrenByParent:
IMap<String, ISet<Typed<FixedEncodedString43>>>.fromJson(
childrenByParent: json['childrenByParent'] == null
? const IMapConst<String, ISet<TypedKey>>({})
: IMap<String, ISet<Typed<FixedEncodedString43>>>.fromJson(
json['childrenByParent'] as Map<String, dynamic>,
(value) => value as String,
(value) => ISet<Typed<FixedEncodedString43>>.fromJson(value,
(value) => Typed<FixedEncodedString43>.fromJson(value))),
parentByChild: IMap<String, Typed<FixedEncodedString43>>.fromJson(
json['parentByChild'] as Map<String, dynamic>,
(value) => value as String,
(value) => Typed<FixedEncodedString43>.fromJson(value)),
rootRecords: ISet<Typed<FixedEncodedString43>>.fromJson(
json['rootRecords'],
(value) => Typed<FixedEncodedString43>.fromJson(value)),
parentByChild: json['parentByChild'] == null
? const IMapConst<String, TypedKey>({})
: IMap<String, Typed<FixedEncodedString43>>.fromJson(
json['parentByChild'] as Map<String, dynamic>,
(value) => value as String,
(value) => Typed<FixedEncodedString43>.fromJson(value)),
rootRecords: json['rootRecords'] == null
? const ISetConst<TypedKey>({})
: ISet<Typed<FixedEncodedString43>>.fromJson(json['rootRecords'],
(value) => Typed<FixedEncodedString43>.fromJson(value)),
debugNames: json['debugNames'] == null
? const IMapConst<String, String>({})
: IMap<String, String>.fromJson(
json['debugNames'] as Map<String, dynamic>,
(value) => value as String,
(value) => value as String),
);
Map<String, dynamic> _$$DHTRecordPoolAllocationsImplToJson(
@ -40,6 +50,10 @@ Map<String, dynamic> _$$DHTRecordPoolAllocationsImplToJson(
'rootRecords': instance.rootRecords.toJson(
(value) => value,
),
'debugNames': instance.debugNames.toJson(
(value) => value,
(value) => value,
),
};
_$OwnedDHTRecordPointerImpl _$$OwnedDHTRecordPointerImplFromJson(

View file

@ -28,7 +28,8 @@ class DHTShortArray {
// if smplWriter is specified, uses a SMPL schema with a single writer
// rather than the key owner
static Future<DHTShortArray> create(
{int stride = maxElements,
{required String debugName,
int stride = maxElements,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
@ -42,6 +43,7 @@ class DHTShortArray {
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: stride + 1)]);
dhtRecord = await pool.create(
debugName: debugName,
parent: parent,
routingContext: routingContext,
schema: schema,
@ -50,6 +52,7 @@ class DHTShortArray {
} else {
final schema = DHTSchema.dflt(oCnt: stride + 1);
dhtRecord = await pool.create(
debugName: debugName,
parent: parent,
routingContext: routingContext,
schema: schema,
@ -72,11 +75,15 @@ class DHTShortArray {
}
static Future<DHTShortArray> openRead(TypedKey headRecordKey,
{VeilidRoutingContext? routingContext,
{required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto}) async {
final dhtRecord = await DHTRecordPool.instance.openRead(headRecordKey,
parent: parent, routingContext: routingContext, crypto: crypto);
debugName: debugName,
parent: parent,
routingContext: routingContext,
crypto: crypto);
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
await dhtShortArray._head.operate((head) => head._loadHead());
@ -90,13 +97,17 @@ class DHTShortArray {
static Future<DHTShortArray> openWrite(
TypedKey headRecordKey,
KeyPair writer, {
required String debugName,
VeilidRoutingContext? routingContext,
TypedKey? parent,
DHTRecordCrypto? crypto,
}) async {
final dhtRecord = await DHTRecordPool.instance.openWrite(
headRecordKey, writer,
parent: parent, routingContext: routingContext, crypto: crypto);
debugName: debugName,
parent: parent,
routingContext: routingContext,
crypto: crypto);
try {
final dhtShortArray = DHTShortArray._(headRecord: dhtRecord);
await dhtShortArray._head.operate((head) => head._loadHead());
@ -109,6 +120,7 @@ class DHTShortArray {
static Future<DHTShortArray> openOwned(
OwnedDHTRecordPointer ownedDHTRecordPointer, {
required String debugName,
required TypedKey parent,
VeilidRoutingContext? routingContext,
DHTRecordCrypto? crypto,
@ -116,6 +128,7 @@ class DHTShortArray {
openWrite(
ownedDHTRecordPointer.recordKey,
ownedDHTRecordPointer.owner,
debugName: debugName,
routingContext: routingContext,
parent: parent,
crypto: crypto,

View file

@ -17,13 +17,13 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
required T Function(List<int> data) decodeElement,
}) : _decodeElement = decodeElement,
super(const BlocBusyState(AsyncValue.loading())) {
_initFuture = Future(() async {
_initWait.add(() async {
// Open DHT record
_shortArray = await open();
_wantsCloseRecord = true;
// Make initial state update
unawaited(_refreshNoWait());
await _refreshNoWait();
_subscription = await _shortArray.listen(_update);
});
}
@ -42,7 +42,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
// }
Future<void> refresh({bool forceRefresh = false}) async {
await _initFuture;
await _initWait();
await _refreshNoWait(forceRefresh: forceRefresh);
}
@ -75,7 +75,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
@override
Future<void> close() async {
await _initFuture;
await _initWait();
await _subscription?.cancel();
_subscription = null;
if (_wantsCloseRecord) {
@ -85,24 +85,24 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
}
Future<R?> operate<R>(Future<R?> Function(DHTShortArrayRead) closure) async {
await _initFuture;
await _initWait();
return _shortArray.operate(closure);
}
Future<(R?, bool)> operateWrite<R>(
Future<R?> Function(DHTShortArrayWrite) closure) async {
await _initFuture;
await _initWait();
return _shortArray.operateWrite(closure);
}
Future<void> operateWriteEventual(
Future<bool> Function(DHTShortArrayWrite) closure,
{Duration? timeout}) async {
await _initFuture;
await _initWait();
return _shortArray.operateWriteEventual(closure, timeout: timeout);
}
late final Future<void> _initFuture;
final WaitSet _initWait = WaitSet();
late final DHTShortArray _shortArray;
final T Function(List<int> data) _decodeElement;
StreamSubscription<void>? _subscription;

View file

@ -184,7 +184,7 @@ class _DHTShortArrayHead {
final oldRecord = oldRecords[newKey];
if (oldRecord == null) {
// Open the new record
final newRecord = await _openLinkedRecord(newKey);
final newRecord = await _openLinkedRecord(newKey, n);
newRecords[newKey] = newRecord;
updatedLinkedRecords.add(newRecord);
} else {
@ -263,6 +263,7 @@ class _DHTShortArrayHead {
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: _stride)]);
final dhtRecord = await pool.create(
debugName: '${_headRecord.debugName}_linked_$recordNumber',
parent: parent,
routingContext: routingContext,
schema: schema,
@ -279,17 +280,20 @@ class _DHTShortArrayHead {
}
/// Open a linked record for reading or writing, same as the head record
Future<DHTRecord> _openLinkedRecord(TypedKey recordKey) async {
Future<DHTRecord> _openLinkedRecord(
TypedKey recordKey, int recordNumber) async {
final writer = _headRecord.writer;
return (writer != null)
? await DHTRecordPool.instance.openWrite(
recordKey,
writer,
debugName: '${_headRecord.debugName}_linked_$recordNumber',
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
)
: await DHTRecordPool.instance.openRead(
recordKey,
debugName: '${_headRecord.debugName}_linked_$recordNumber',
parent: _headRecord.key,
routingContext: _headRecord.routingContext,
);