watch for dhtshortarray

This commit is contained in:
Christien Rioux 2024-01-24 20:59:39 -05:00
parent b99e387dac
commit 1534a77ab1
6 changed files with 190 additions and 43 deletions

View File

@ -1,6 +1,8 @@
import 'dart:async'; import 'dart:async';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart'; import 'package:protobuf/protobuf.dart';
import '../../../veilid_support.dart'; import '../../../veilid_support.dart';
@ -31,9 +33,13 @@ class DHTRecord {
final DHTRecordCrypto _crypto; final DHTRecordCrypto _crypto;
bool _open; bool _open;
bool _valid; bool _valid;
@internal
StreamController<VeilidUpdateValueChange>? watchController; StreamController<VeilidUpdateValueChange>? watchController;
@internal
bool needsWatchStateUpdate; bool needsWatchStateUpdate;
@internal
bool inWatchStateUpdate; bool inWatchStateUpdate;
@internal
WatchState? watchState; WatchState? watchState;
int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey; int subkeyOrDefault(int subkey) => (subkey == -1) ? _defaultSubkey : subkey;
@ -45,6 +51,7 @@ class DHTRecord {
DHTSchema get schema => _recordDescriptor.schema; DHTSchema get schema => _recordDescriptor.schema;
int get subkeyCount => _recordDescriptor.schema.subkeyCount(); int get subkeyCount => _recordDescriptor.schema.subkeyCount();
KeyPair? get writer => _writer; KeyPair? get writer => _writer;
DHTRecordCrypto get crypto => _crypto;
OwnedDHTRecordPointer get ownedDHTRecordPointer => OwnedDHTRecordPointer get ownedDHTRecordPointer =>
OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!); OwnedDHTRecordPointer(recordKey: key, owner: ownerKeyPair!);
@ -266,10 +273,13 @@ class DHTRecord {
Timestamp? expiration, Timestamp? expiration,
int? count}) async { int? count}) async {
// Set up watch requirements which will get picked up by the next tick // Set up watch requirements which will get picked up by the next tick
watchState = final oldWatchState = watchState;
WatchState(subkeys: subkeys, expiration: expiration, count: count); watchState = WatchState(
subkeys: subkeys?.lock, expiration: expiration, count: count);
if (oldWatchState != watchState) {
needsWatchStateUpdate = true; needsWatchStateUpdate = true;
} }
}
Future<StreamSubscription<VeilidUpdateValueChange>> listen( Future<StreamSubscription<VeilidUpdateValueChange>> listen(
Future<void> Function(VeilidUpdateValueChange update) onUpdate, Future<void> Function(VeilidUpdateValueChange update) onUpdate,
@ -294,7 +304,9 @@ class DHTRecord {
Future<void> cancelWatch() async { Future<void> cancelWatch() async {
// Tear down watch requirements // Tear down watch requirements
if (watchState != null) {
watchState = null; watchState = null;
needsWatchStateUpdate = true; needsWatchStateUpdate = true;
} }
} }
}

View File

@ -1,5 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:freezed_annotation/freezed_annotation.dart'; import 'package:freezed_annotation/freezed_annotation.dart';
@ -37,13 +38,19 @@ class OwnedDHTRecordPointer with _$OwnedDHTRecordPointer {
} }
/// Watch state /// Watch state
class WatchState { class WatchState extends Equatable {
WatchState( const WatchState(
{required this.subkeys, required this.expiration, required this.count}); {required this.subkeys,
List<ValueSubkeyRange>? subkeys; required this.expiration,
Timestamp? expiration; required this.count,
int? count; this.realExpiration});
Timestamp? realExpiration; final IList<ValueSubkeyRange>? subkeys;
final Timestamp? expiration;
final int? count;
final Timestamp? realExpiration;
@override
List<Object?> get props => [subkeys, expiration, count, realExpiration];
} }
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> { class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
@ -400,11 +407,17 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
try { try {
final realExpiration = await kv.value.routingContext final realExpiration = await kv.value.routingContext
.watchDHTValues(kv.key, .watchDHTValues(kv.key,
subkeys: ws.subkeys, subkeys: ws.subkeys?.toList(),
count: ws.count, count: ws.count,
expiration: ws.expiration); expiration: ws.expiration);
kv.value.needsWatchStateUpdate = false; kv.value.needsWatchStateUpdate = false;
ws.realExpiration = realExpiration;
// Update watch state with real expiration
kv.value.watchState = WatchState(
subkeys: ws.subkeys,
expiration: ws.expiration,
count: ws.count,
realExpiration: realExpiration);
} on VeilidAPIException { } on VeilidAPIException {
// Failed to cancel DHT watch, try again next tick // Failed to cancel DHT watch, try again next tick
} }

View File

@ -1,6 +1,7 @@
import 'dart:async'; import 'dart:async';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart'; import 'package:protobuf/protobuf.dart';
import '../../../../veilid_support.dart'; import '../../../../veilid_support.dart';
@ -32,7 +33,9 @@ class _DHTShortArrayCache {
class DHTShortArray { class DHTShortArray {
DHTShortArray._({required DHTRecord headRecord}) DHTShortArray._({required DHTRecord headRecord})
: _headRecord = headRecord, : _headRecord = headRecord,
_head = _DHTShortArrayCache() { _head = _DHTShortArrayCache(),
_subscriptions = {},
_listenMutex = Mutex() {
late final int stride; late final int stride;
switch (headRecord.schema) { switch (headRecord.schema) {
case DHTSchemaDFLT(oCnt: final oCnt): case DHTSchemaDFLT(oCnt: final oCnt):
@ -59,6 +62,14 @@ class DHTShortArray {
// Cached representation refreshed from head record // Cached representation refreshed from head record
_DHTShortArrayCache _head; _DHTShortArrayCache _head;
// Subscription to head and linked record internal changes
final Map<TypedKey, StreamSubscription<VeilidUpdateValueChange>>
_subscriptions;
// Stream of external changes
StreamController<void>? _watchController;
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex;
// Create a DHTShortArray // Create a DHTShortArray
// if smplWriter is specified, uses a SMPL schema with a single writer // if smplWriter is specified, uses a SMPL schema with a single writer
// rather than the key owner // rather than the key owner
@ -273,6 +284,11 @@ class DHTShortArray {
linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!)) linkedKeys.map((key) => (sameRecords[key] ?? newRecords[key])!))
..index.addAll(index) ..index.addAll(index)
..free.addAll(free); ..free.addAll(free);
// Update watch if we have one in case linked records have been added
if (_watchController != null) {
await _watchAllRecords();
}
} }
/// Pull the latest or updated copy of the head record from the network /// Pull the latest or updated copy of the head record from the network
@ -280,7 +296,7 @@ class DHTShortArray {
{bool forceRefresh = true, bool onlyUpdates = false}) async { {bool forceRefresh = true, bool onlyUpdates = false}) async {
// Get an updated head record copy if one exists // Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
forceRefresh: forceRefresh, onlyUpdates: onlyUpdates); subkey: 0, forceRefresh: forceRefresh, onlyUpdates: onlyUpdates);
if (head == null) { if (head == null) {
if (onlyUpdates) { if (onlyUpdates) {
// No update // No update
@ -297,6 +313,7 @@ class DHTShortArray {
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
Future<void> close() async { Future<void> close() async {
await _watchController?.close();
final futures = <Future<void>>[_headRecord.close()]; final futures = <Future<void>>[_headRecord.close()];
for (final lr in _head.linkedRecords) { for (final lr in _head.linkedRecords) {
futures.add(lr.close()); futures.add(lr.close());
@ -305,7 +322,9 @@ class DHTShortArray {
} }
Future<void> delete() async { Future<void> delete() async {
final futures = <Future<void>>[_headRecord.close()]; await _watchController?.close();
final futures = <Future<void>>[_headRecord.delete()];
for (final lr in _head.linkedRecords) { for (final lr in _head.linkedRecords) {
futures.add(lr.delete()); futures.add(lr.delete());
} }
@ -332,7 +351,7 @@ class DHTShortArray {
} }
} }
DHTRecord? _getRecord(int recordNumber) { DHTRecord? _getLinkedRecord(int recordNumber) {
if (recordNumber == 0) { if (recordNumber == 0) {
return _headRecord; return _headRecord;
} }
@ -343,6 +362,43 @@ class DHTShortArray {
return _head.linkedRecords[recordNumber]; return _head.linkedRecords[recordNumber];
} }
Future<DHTRecord> _getOrCreateLinkedRecord(int recordNumber) async {
if (recordNumber == 0) {
return _headRecord;
}
final pool = DHTRecordPool.instance;
recordNumber--;
while (recordNumber >= _head.linkedRecords.length) {
// Linked records must use SMPL schema so writer can be specified
// Use the same writer as the head record
final smplWriter = _headRecord.writer!;
final parent = pool.getParentRecordKey(_headRecord.key);
final routingContext = _headRecord.routingContext;
final crypto = _headRecord.crypto;
final schema = DHTSchema.smpl(
oCnt: 0,
members: [DHTSchemaMember(mKey: smplWriter.key, mCnt: _stride)]);
final dhtCreateRecord = await pool.create(
parent: parent,
routingContext: routingContext,
schema: schema,
crypto: crypto,
writer: smplWriter);
// Reopen with SMPL writer
await dhtCreateRecord.close();
final dhtRecord = await pool.openWrite(dhtCreateRecord.key, smplWriter,
parent: parent, routingContext: routingContext, crypto: crypto);
// Add to linked records
_head.linkedRecords.add(dhtRecord);
if (!await _tryWriteHead()) {
await _refreshHead();
}
}
return _head.linkedRecords[recordNumber];
}
int _emptyIndex() { int _emptyIndex() {
if (_head.free.isNotEmpty) { if (_head.free.isNotEmpty) {
return _head.free.removeLast(); return _head.free.removeLast();
@ -368,7 +424,7 @@ class DHTShortArray {
} }
final index = _head.index[pos]; final index = _head.index[pos];
final recordNumber = index ~/ _stride; final recordNumber = index ~/ _stride;
final record = _getRecord(recordNumber); final record = _getLinkedRecord(recordNumber);
assert(record != null, 'Record does not exist'); assert(record != null, 'Record does not exist');
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0); final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
@ -472,7 +528,7 @@ class DHTShortArray {
final removedIdx = _head.index.removeAt(pos); final removedIdx = _head.index.removeAt(pos);
_freeIndex(removedIdx); _freeIndex(removedIdx);
final recordNumber = removedIdx ~/ _stride; final recordNumber = removedIdx ~/ _stride;
final record = _getRecord(recordNumber); final record = _getLinkedRecord(recordNumber);
assert(record != null, 'Record does not exist'); assert(record != null, 'Record does not exist');
final recordSubkey = final recordSubkey =
(removedIdx % _stride) + ((recordNumber == 0) ? 1 : 0); (removedIdx % _stride) + ((recordNumber == 0) ? 1 : 0);
@ -532,11 +588,10 @@ class DHTShortArray {
final index = _head.index[pos]; final index = _head.index[pos];
final recordNumber = index ~/ _stride; final recordNumber = index ~/ _stride;
final record = _getRecord(recordNumber); final record = await _getOrCreateLinkedRecord(recordNumber);
assert(record != null, 'Record does not exist');
final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0); final recordSubkey = (index % _stride) + ((recordNumber == 0) ? 1 : 0);
return record!.tryWriteBytes(newValue, subkey: recordSubkey); return record.tryWriteBytes(newValue, subkey: recordSubkey);
} }
Future<void> eventualWriteItem(int pos, Uint8List newValue) async { Future<void> eventualWriteItem(int pos, Uint8List newValue) async {
@ -609,32 +664,97 @@ class DHTShortArray {
) => ) =>
eventualUpdateItem(pos, protobufUpdate(fromBuffer, update)); eventualUpdateItem(pos, protobufUpdate(fromBuffer, update));
Future<void> watch() async {
// Watch head and all linked records // Watch head and all linked records
Future<void> _watchAllRecords() async {
// This will update any existing watches if necessary
try { try {
await [_headRecord.watch(), ..._head.linkedRecords.map((r) => r.watch())] await [_headRecord.watch(), ..._head.linkedRecords.map((r) => r.watch())]
.wait; .wait;
} finally {
await [ // Update changes to the head record
_headRecord.cancelWatch(), if (!_subscriptions.containsKey(_headRecord.key)) {
..._head.linkedRecords.map((r) => r.cancelWatch()) _subscriptions[_headRecord.key] =
].wait; await _headRecord.listen(_onUpdateRecord);
}
// Update changes to any linked records
for (final lr in _head.linkedRecords) {
if (!_subscriptions.containsKey(lr.key)) {
_subscriptions[lr.key] = await lr.listen(_onUpdateRecord);
}
}
} on Exception {
// If anything fails, try to cancel the watches
await _cancelRecordWatches();
rethrow;
} }
} }
Future<void> listen( // Stop watching for changes to head and linked records
Future<void> Function() onChanged, Future<void> _cancelRecordWatches() async {
) async {
_headRecord.listen((update) => {
xxx
}
}
Future<void> cancelWatch() async {
// Watch head and all linked records
await _headRecord.cancelWatch(); await _headRecord.cancelWatch();
for (final lr in _head.linkedRecords) { for (final lr in _head.linkedRecords) {
await lr.cancelWatch(); await lr.cancelWatch();
} }
await _subscriptions.values.map((s) => s.cancel()).wait;
_subscriptions.clear();
}
// Called when a head or linked record changes
Future<void> _onUpdateRecord(VeilidUpdateValueChange update) async {
final record = _head.linkedRecords.firstWhere(
(element) => element.key == update.key,
orElse: () => _headRecord);
// If head record subkey zero changes, then the layout
// of the dhtshortarray has changed
var updateHead = false;
if (record == _headRecord && update.subkeys.containsSubkey(0)) {
updateHead = true;
}
// If we have any other subkeys to update, do them first
final unord = List<Future<Uint8List?>>.empty(growable: true);
for (final skr in update.subkeys) {
for (var subkey = skr.low; subkey <= skr.high; subkey++) {
// Skip head subkey
if (subkey == 0) {
continue;
}
// Get the subkey, which caches the result in the local record store
unord.add(record.get(subkey: subkey, forceRefresh: true));
} }
} }
await unord.wait;
// Then update the head record
if (updateHead) {
await _refreshHead(forceRefresh: false);
}
// Then commit the change to any listeners
_watchController?.sink.add(null);
}
Future<StreamSubscription<void>> listen(
void Function() onChanged,
) =>
_listenMutex.protect(() async {
// If don't have a controller yet, set it up
if (_watchController == null) {
// Set up watch requirements
_watchController = StreamController<void>.broadcast(onCancel: () {
// If there are no more listeners then we can get
// rid of the controller and drop our subscriptions
unawaited(_listenMutex.protect(() async {
// Cancel watches of head and linked records
await _cancelRecordWatches();
_watchController = null;
}));
});
// Start watching head and linked records
await _watchAllRecords();
}
// Return subscription
return _watchController!.stream.listen((_) => onChanged());
});
}

View File

@ -21,7 +21,7 @@ class DHTShortArrayCubit<T> extends Cubit<AsyncValue<IList<T>>> {
} on Exception catch (e) { } on Exception catch (e) {
emit(AsyncValue.error(e)); emit(AsyncValue.error(e));
} }
xxx do this now
shortArray. xxx add listen to head and linked records in dht_short_array shortArray. xxx add listen to head and linked records in dht_short_array
_subscription = await record.listen((update) async { _subscription = await record.listen((update) async {

View File

@ -194,7 +194,7 @@ packages:
source: hosted source: hosted
version: "2.3.4" version: "2.3.4"
equatable: equatable:
dependency: transitive dependency: "direct main"
description: description:
name: equatable name: equatable
sha256: c2b87cb7756efdf69892005af546c56c0b5037f54d2a88269b4f347a505e3ca2 sha256: c2b87cb7756efdf69892005af546c56c0b5037f54d2a88269b4f347a505e3ca2
@ -388,7 +388,7 @@ packages:
source: hosted source: hosted
version: "0.5.0" version: "0.5.0"
meta: meta:
dependency: transitive dependency: "direct main"
description: description:
name: meta name: meta
sha256: a6e590c838b18133bb482a2745ad77c5bb7715fb0451209e1a7567d416678b8e sha256: a6e590c838b18133bb482a2745ad77c5bb7715fb0451209e1a7567d416678b8e

View File

@ -8,10 +8,12 @@ environment:
dependencies: dependencies:
bloc: ^8.1.2 bloc: ^8.1.2
equatable: ^2.0.5
fast_immutable_collections: ^9.1.5 fast_immutable_collections: ^9.1.5
freezed_annotation: ^2.2.0 freezed_annotation: ^2.2.0
json_annotation: ^4.8.1 json_annotation: ^4.8.1
loggy: ^2.0.3 loggy: ^2.0.3
meta: ^1.10.0
mutex: ^3.1.0 mutex: ^3.1.0
protobuf: ^3.0.0 protobuf: ^3.0.0
veilid: veilid: