record pool / watch work

This commit is contained in:
Christien Rioux 2024-01-06 20:47:10 -05:00
parent 31f562119a
commit ba4ef05a28
4 changed files with 178 additions and 156 deletions

View File

@ -58,6 +58,8 @@ Future<AccountInfo> fetchAccountInfo(FetchAccountInfoRef ref,
return AccountInfo(status: AccountInfoStatus.accountLocked, active: active); return AccountInfo(status: AccountInfoStatus.accountLocked, active: active);
} }
xxx login should open this key and leave it open, logout should close it
// Pull the account DHT key, decode it and return it // Pull the account DHT key, decode it and return it
final pool = await DHTRecordPool.instance(); final pool = await DHTRecordPool.instance();
final account = await (await pool.openOwned( final account = await (await pool.openOwned(

View File

@ -19,7 +19,10 @@ class DHTRecord {
_writer = writer, _writer = writer,
_open = true, _open = true,
_valid = true, _valid = true,
_subkeySeqCache = {}; _subkeySeqCache = {},
needsWatchStateUpdate = false,
inWatchStateUpdate = false;
final VeilidRoutingContext _routingContext; final VeilidRoutingContext _routingContext;
final DHTRecordDescriptor _recordDescriptor; final DHTRecordDescriptor _recordDescriptor;
final int _defaultSubkey; final int _defaultSubkey;
@ -28,7 +31,10 @@ class DHTRecord {
final DHTRecordCrypto _crypto; final DHTRecordCrypto _crypto;
bool _open; bool _open;
bool _valid; bool _valid;
StreamSubscription<VeilidUpdateValueChange>? _watchSubscription; StreamController<VeilidUpdateValueChange>? watchController;
bool needsWatchStateUpdate;
bool inWatchStateUpdate;
WatchState? watchState;
int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey; int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey;
@ -37,6 +43,7 @@ class DHTRecord {
PublicKey get owner => _recordDescriptor.owner; PublicKey get owner => _recordDescriptor.owner;
KeyPair? get ownerKeyPair => _recordDescriptor.ownerKeyPair(); KeyPair? get ownerKeyPair => _recordDescriptor.ownerKeyPair();
DHTSchema get schema => _recordDescriptor.schema; DHTSchema get schema => _recordDescriptor.schema;
int get subkeyCount => _recordDescriptor.schema.subkeyCount();
KeyPair? get writer => _writer; KeyPair? get writer => _writer;
OwnedDHTRecordPointer get ownedDHTRecordPointer => OwnedDHTRecordPointer get ownedDHTRecordPointer =>
OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!); OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!);
@ -48,8 +55,9 @@ class DHTRecord {
if (!_open) { if (!_open) {
return; return;
} }
await watchController?.close();
await _routingContext.closeDHTRecord(_recordDescriptor.key); await _routingContext.closeDHTRecord(_recordDescriptor.key);
await DHTRecordPool.instance.recordClosed(_recordDescriptor.key); DHTRecordPool.instance.recordClosed(_recordDescriptor.key);
_open = false; _open = false;
} }
@ -258,14 +266,36 @@ class DHTRecord {
{List<ValueSubkeyRange>? subkeys, {List<ValueSubkeyRange>? subkeys,
Timestamp? expiration, Timestamp? expiration,
int? count}) async { int? count}) async {
// register watch with pool // Set up watch requirements which will get picked up by the next tick
_watchSubscription = await DHTRecordPool.instance.recordWatch( watchState =
_recordDescriptor.key, onUpdate, WatchState(subkeys: subkeys, expiration: expiration, count: count);
subkeys: subkeys, expiration: expiration, count: count); needsWatchStateUpdate = true;
}
Future<StreamSubscription<VeilidUpdateValueChange>> listen(
Future<void> Function(VeilidUpdateValueChange update) onUpdate,
) async {
// Set up watch requirements
watchController ??=
StreamController<VeilidUpdateValueChange>.broadcast(onCancel: () {
// If there are no more listeners then we can get rid of the controller
watchController = null;
});
return watchController!.stream.listen(
(update) {
Future.delayed(Duration.zero, () => onUpdate(update));
},
cancelOnError: true,
onError: (e) async {
await watchController!.close();
watchController = null;
});
} }
Future<void> cancelWatch() async { Future<void> cancelWatch() async {
// register watch with pool // Tear down watch requirements
await _watchSubscription?.cancel(); watchState = null;
needsWatchStateUpdate = true;
} }
} }

View File

@ -1,3 +1,6 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:bloc/bloc.dart'; import 'package:bloc/bloc.dart';
import '../../veilid_support.dart'; import '../../veilid_support.dart';
@ -5,49 +8,76 @@ import '../../veilid_support.dart';
class DhtRecordCubit<T> extends Cubit<AsyncValue<T>> { class DhtRecordCubit<T> extends Cubit<AsyncValue<T>> {
DhtRecordCubit({ DhtRecordCubit({
required DHTRecord record, required DHTRecord record,
required Future<T?> Function(DHTRecord, VeilidUpdateValueChange) required Future<T?> Function(DHTRecord) initialStateFunction,
required Future<T?> Function(DHTRecord, List<ValueSubkeyRange>, ValueData)
stateFunction, stateFunction,
List<ValueSubkeyRange> watchSubkeys = const [], }) : super(const AsyncValue.loading()) {
}) : _record = record,
super(const AsyncValue.loading()) {
Future.delayed(Duration.zero, () async { Future.delayed(Duration.zero, () async {
await record.watch((update) async { // Make initial state update
try { try {
final newState = await stateFunction(record, update); final initialState = await initialStateFunction(record);
if (initialState != null) {
emit(AsyncValue.data(initialState));
}
} on Exception catch (e) {
emit(AsyncValue.error(e));
}
_subscription = await record.listen((update) async {
try {
final newState =
await stateFunction(record, update.subkeys, update.valueData);
if (newState != null) { if (newState != null) {
emit(AsyncValue.data(newState)); emit(AsyncValue.data(newState));
} }
} on Exception catch (e) { } on Exception catch (e) {
emit(AsyncValue.error(e)); emit(AsyncValue.error(e));
} }
}, subkeys: watchSubkeys); });
}); });
} }
@override @override
Future<void> close() async { Future<void> close() async {
await _record.cancelWatch(); await _subscription?.cancel();
_subscription = null;
await super.close(); await super.close();
} }
DHTRecord _record; StreamSubscription<VeilidUpdateValueChange>? _subscription;
} }
class SingleDHTRecordCubit<T> extends DhtRecordCubit<T> { // Cubit that watches the default subkey value of a dhtrecord
SingleDHTRecordCubit( class DefaultDHTRecordCubit<T> extends DhtRecordCubit<T> {
{required super.record, DefaultDHTRecordCubit({
required T? Function(List<int> data) decodeState, required super.record,
int singleSubkey = 0}) required T Function(List<int> data) decodeState,
: super( }) : super(
stateFunction: (record, update) async { initialStateFunction: (record) async {
// final initialData = await record.get();
if (update.subkeys.isNotEmpty) { if (initialData == null) {
final newState = decodeState(update.valueData.data); return null;
}
return decodeState(initialData);
},
stateFunction: (record, subkeys, valueData) async {
final defaultSubkey = record.subkeyOrDefault(-1);
if (subkeys.containsSubkey(defaultSubkey)) {
final Uint8List data;
final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey) {
final maybeData = await record.get(forceRefresh: true);
if (maybeData == null) {
return null;
}
data = maybeData;
} else {
data = valueData.data;
}
final newState = decodeState(data);
return newState; return newState;
} }
return null; return null;
}, },
watchSubkeys: [ );
ValueSubkeyRange(low: singleSubkey, high: singleSubkey)
]);
} }

View File

@ -2,7 +2,6 @@ import 'dart:async';
import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:freezed_annotation/freezed_annotation.dart'; import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:mutex/mutex.dart';
import '../../../../veilid_support.dart'; import '../../../../veilid_support.dart';
@ -38,8 +37,8 @@ class OwnedDHTRecordPointer with _$OwnedDHTRecordPointer {
} }
/// Watch state /// Watch state
class _WatchState { class WatchState {
_WatchState( WatchState(
{required this.subkeys, required this.expiration, required this.count}); {required this.subkeys, required this.expiration, required this.count});
List<ValueSubkeyRange>? subkeys; List<ValueSubkeyRange>? subkeys;
Timestamp? expiration; Timestamp? expiration;
@ -47,39 +46,20 @@ class _WatchState {
Timestamp? realExpiration; Timestamp? realExpiration;
} }
/// Opened DHTRecord state
class _OpenedDHTRecord {
_OpenedDHTRecord(this.routingContext)
: mutex = Mutex(),
needsWatchStateUpdate = false,
inWatchStateUpdate = false;
Future<void> close() async {
await watchController?.close();
}
Mutex mutex;
StreamController<VeilidUpdateValueChange>? watchController;
bool needsWatchStateUpdate;
bool inWatchStateUpdate;
_WatchState? watchState;
VeilidRoutingContext routingContext;
}
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> { class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext) DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
: _state = DHTRecordPoolAllocations( : _state = DHTRecordPoolAllocations(
childrenByParent: IMap(), childrenByParent: IMap(),
parentByChild: IMap(), parentByChild: IMap(),
rootRecords: ISet()), rootRecords: ISet()),
_opened = <TypedKey, _OpenedDHTRecord>{}, _opened = <TypedKey, DHTRecord>{},
_routingContext = routingContext, _routingContext = routingContext,
_veilid = veilid; _veilid = veilid;
// Persistent DHT record list // Persistent DHT record list
DHTRecordPoolAllocations _state; DHTRecordPoolAllocations _state;
// Which DHT records are currently open // Which DHT records are currently open
final Map<TypedKey, _OpenedDHTRecord> _opened; final Map<TypedKey, DHTRecord> _opened;
// Default routing context to use for new keys // Default routing context to use for new keys
final VeilidRoutingContext _routingContext; final VeilidRoutingContext _routingContext;
// Convenience accessor // Convenience accessor
@ -116,59 +96,18 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
Veilid get veilid => _veilid; Veilid get veilid => _veilid;
Future<void> _recordOpened( void _recordOpened(DHTRecord record) {
TypedKey key, VeilidRoutingContext routingContext) async { if (_opened.containsKey(record.key)) {
// no race because dart is single threaded until async breaks throw StateError('record already opened');
final odr = _opened[key] ?? _OpenedDHTRecord(routingContext); }
_opened[key] = odr; _opened[record.key] = record;
await odr.mutex.acquire();
} }
Future<StreamSubscription<VeilidUpdateValueChange>> recordWatch( void recordClosed(TypedKey key) {
TypedKey key, Future<void> Function(VeilidUpdateValueChange) onUpdate, final rec = _opened.remove(key);
{required List<ValueSubkeyRange>? subkeys, if (rec == null) {
required Timestamp? expiration,
required int? count}) async {
final odr = _opened[key];
if (odr == null) {
throw StateError("can't watch unopened record");
}
// Set up watch requirements
odr
..watchState =
_WatchState(subkeys: subkeys, expiration: expiration, count: count)
..needsWatchStateUpdate = true
..watchController ??=
StreamController<VeilidUpdateValueChange>.broadcast(onCancel: () {
// Request watch state change for cancel
odr
..watchState = null
..needsWatchStateUpdate = true;
// If there are no more listeners then we can get rid of the controller
if (!(odr.watchController?.hasListener ?? true)) {
odr.watchController = null;
}
});
return odr.watchController!.stream.listen(
(update) {
Future.delayed(Duration.zero, () => onUpdate(update));
},
cancelOnError: true,
onError: (e) async {
await odr.watchController!.close();
odr.watchController = null;
});
}
Future<void> recordClosed(TypedKey key) async {
final odr = _opened.remove(key);
if (odr == null) {
throw StateError('record already closed'); throw StateError('record already closed');
} }
await odr.close();
odr.mutex.release();
} }
Future<void> deleteDeep(TypedKey parent) async { Future<void> deleteDeep(TypedKey parent) async {
@ -178,10 +117,6 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
while (currentDeps.isNotEmpty) { while (currentDeps.isNotEmpty) {
final nextDep = currentDeps.removeLast(); final nextDep = currentDeps.removeLast();
// Ensure we get the exclusive lock on this record
// Can use default routing context here because we are only deleting
await _recordOpened(nextDep, _routingContext);
// Remove this child from its parent // Remove this child from its parent
await _removeDependency(nextDep); await _removeDependency(nextDep);
@ -191,11 +126,16 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
currentDeps.addAll(childDeps); currentDeps.addAll(childDeps);
} }
// Delete all records // Delete all dependent records in parallel
final allFutures = <Future<void>>[]; final allFutures = <Future<void>>[];
for (final dep in allDeps) { for (final dep in allDeps) {
// If record is opened, close it first
final rec = _opened[dep];
if (rec != null) {
await rec.close();
}
// Then delete
allFutures.add(_routingContext.deleteDHTRecord(dep)); allFutures.add(_routingContext.deleteDHTRecord(dep));
await recordClosed(dep);
} }
await Future.wait(allFutures); await Future.wait(allFutures);
} }
@ -288,7 +228,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
recordDescriptor.ownerTypedKeyPair()!)); recordDescriptor.ownerTypedKeyPair()!));
await _addDependency(parent, rec.key); await _addDependency(parent, rec.key);
await _recordOpened(rec.key, dhtctx);
_recordOpened(rec);
return rec; return rec;
} }
@ -301,10 +242,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
DHTRecordCrypto? crypto}) async { DHTRecordCrypto? crypto}) async {
final dhtctx = routingContext ?? _routingContext; final dhtctx = routingContext ?? _routingContext;
await _recordOpened(recordKey, dhtctx);
late final DHTRecord rec; late final DHTRecord rec;
try {
// If we are opening a key that already exists // If we are opening a key that already exists
// make sure we are using the same parent if one was specified // make sure we are using the same parent if one was specified
_validateParent(parent, recordKey); _validateParent(parent, recordKey);
@ -319,10 +257,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
// Register the dependency // Register the dependency
await _addDependency(parent, rec.key); await _addDependency(parent, rec.key);
} on Exception catch (_) { _recordOpened(rec);
await recordClosed(recordKey);
rethrow;
}
return rec; return rec;
} }
@ -338,10 +273,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}) async { }) async {
final dhtctx = routingContext ?? _routingContext; final dhtctx = routingContext ?? _routingContext;
await _recordOpened(recordKey, dhtctx);
late final DHTRecord rec; late final DHTRecord rec;
try {
// If we are opening a key that already exists // If we are opening a key that already exists
// make sure we are using the same parent if one was specified // make sure we are using the same parent if one was specified
_validateParent(parent, recordKey); _validateParent(parent, recordKey);
@ -359,10 +291,7 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
// Register the dependency if specified // Register the dependency if specified
await _addDependency(parent, rec.key); await _addDependency(parent, rec.key);
} on Exception catch (_) { _recordOpened(rec);
await recordClosed(recordKey);
rethrow;
}
return rec; return rec;
} }
@ -389,15 +318,46 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
crypto: crypto, crypto: crypto,
); );
/// Look up an opened DHRRecord
DHTRecord? getOpenedRecord(TypedKey recordKey) => _opened[recordKey];
/// Get the parent of a DHTRecord key if it exists /// Get the parent of a DHTRecord key if it exists
TypedKey? getParentRecord(TypedKey child) { TypedKey? getParentRecordKey(TypedKey child) {
final childJson = child.toJson(); final childJson = child.toJson();
return _state.parentByChild[childJson]; return _state.parentByChild[childJson];
} }
/// Handle the DHT record updates coming from Veilid /// Handle the DHT record updates coming from Veilid
void processUpdateValueChange(VeilidUpdateValueChange updateValueChange) { void processUpdateValueChange(VeilidUpdateValueChange updateValueChange) {
if (updateValueChange.subkeys.isNotEmpty) {} if (updateValueChange.subkeys.isNotEmpty) {
// Change
for (final kv in _opened.entries) {
if (kv.key == updateValueChange.key) {
kv.value.watchController?.add(updateValueChange);
break;
}
}
} else {
// Expired, process renewal if desired
for (final kv in _opened.entries) {
if (kv.key == updateValueChange.key) {
// Renew watch state
kv.value.needsWatchStateUpdate = true;
// See if the watch had an expiration and if it has expired
// otherwise the renewal will keep the same parameters
final watchState = kv.value.watchState;
if (watchState != null) {
final exp = watchState.expiration;
if (exp != null && exp.value < Veilid.instance.now().value) {
// Has expiration, and it has expired, clear watch state
kv.value.watchState = null;
}
}
break;
}
}
}
} }
/// Ticker to check watch state change requests /// Ticker to check watch state change requests