watch value fix, invitation fix, logging

This commit is contained in:
Christien Rioux 2024-04-06 22:36:30 -04:00
parent b3e9cbd4f3
commit 8335e36876
7 changed files with 133 additions and 77 deletions

View file

@ -17,6 +17,8 @@ part 'dht_record.dart';
const int watchBackoffMultiplier = 2;
const int watchBackoffMax = 30;
typedef DHTRecordPoolLogger = void Function(String message);
/// Record pool that managed DHTRecords and allows for tagged deletion
/// String versions of keys due to IMap<> json unsupported in key
@freezed
@ -90,6 +92,12 @@ class OpenedRecordInfo {
defaultRoutingContext: defaultRoutingContext);
SharedDHTRecordData shared;
Set<DHTRecord> records = {};
String get debugNames {
final r = records.toList()
..sort((a, b) => a.key.toString().compareTo(b.key.toString()));
return '[${r.map((x) => x.debugName).join(',')}]';
}
}
class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
@ -100,6 +108,9 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
_routingContext = routingContext,
_veilid = veilid;
// Logger
DHTRecordPoolLogger? _logger;
// Persistent DHT record list
DHTRecordPoolAllocations _state;
// Create/open Mutex
@ -136,15 +147,21 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
static DHTRecordPool get instance => _singleton!;
static Future<void> init() async {
static Future<void> init({DHTRecordPoolLogger? logger}) async {
final routingContext = await Veilid.instance.routingContext();
final globalPool = DHTRecordPool._(Veilid.instance, routingContext);
globalPool._state = await globalPool.load();
globalPool
.._logger = logger
.._state = await globalPool.load();
_singleton = globalPool;
}
Veilid get veilid => _veilid;
void log(String message) {
_logger?.call(message);
}
Future<OpenedRecordInfo> _recordCreateInner(
{required String debugName,
required VeilidRoutingContext dhtctx,
@ -156,6 +173,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
// Create the record
final recordDescriptor = await dhtctx.createDHTRecord(schema);
log('createDHTRecord: debugName=$debugName key=${recordDescriptor.key}');
// Reopen if a writer is specified to ensure
// we switch the default writer
if (writer != null) {
@ -185,6 +204,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
TypedKey? parent}) async {
assert(_mutex.isLocked, 'should be locked here');
log('openDHTRecord: debugName=$debugName key=$recordKey');
// If we are opening a key that already exists
// make sure we are using the same parent if one was specified
_validateParentInner(parent, recordKey);
@ -238,6 +259,9 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
Future<void> _recordClosed(DHTRecord record) async {
await _mutex.protect(() async {
final key = record.key;
log('closeDHTRecord: debugName=${record.debugName} key=$key');
final openedRecordInfo = _opened[key];
if (openedRecordInfo == null ||
!openedRecordInfo.records.remove(record)) {
@ -284,6 +308,8 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
}
Future<void> _deleteInner(TypedKey recordKey) async {
log('deleteDHTRecord: key=$recordKey');
// Remove this child from parents
await _removeDependenciesInner([recordKey]);
await _routingContext.deleteDHTRecord(recordKey);
@ -676,9 +702,14 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
var success = false;
try {
success = await dhtctx.cancelDHTWatch(openedRecordKey);
log('cancelDHTWatch: key=$openedRecordKey, success=$success, '
'debugNames=${openedRecordInfo.debugNames}');
openedRecordInfo.shared.needsWatchStateUpdate = false;
} on VeilidAPIException {
} on VeilidAPIException catch (e) {
// Failed to cancel DHT watch, try again next tick
log('Exception in watch cancel: $e');
}
return success;
});
@ -687,12 +718,21 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
// Record needs new watch
var success = false;
try {
final subkeys = watchState.subkeys?.toList();
final count = watchState.count;
final expiration = watchState.expiration;
final realExpiration = await dhtctx.watchDHTValues(
openedRecordKey,
subkeys: watchState.subkeys?.toList(),
count: watchState.count,
expiration: watchState.expiration);
log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, '
'count=$count, expiration=$expiration, '
'realExpiration=$realExpiration, '
'debugNames=${openedRecordInfo.debugNames}');
// Update watch states with real expiration
if (realExpiration.value != BigInt.zero) {
openedRecordInfo.shared.needsWatchStateUpdate = false;
@ -700,8 +740,9 @@ class DHTRecordPool with TableDBBacked<DHTRecordPoolAllocations> {
openedRecordInfo.records, realExpiration);
success = true;
}
} on VeilidAPIException {
} on VeilidAPIException catch (e) {
// Failed to cancel DHT watch, try again next tick
log('Exception in watch update: $e');
}
return success;
});