From ae154f1bed097515023e0886c0c07cf6a835e017 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 12 May 2025 10:15:53 -0400 Subject: [PATCH] Dht perf --- CHANGELOG.md | 8 + .../reconciliation/author_input_queue.dart | 13 +- lib/veilid_processor/views/developer.dart | 17 +- packages/veilid_support/example/pubspec.lock | 8 + .../dht_support/src/dht_record/barrel.dart | 1 + .../src/dht_record/dht_record.dart | 301 +++++++++--------- .../src/dht_record/dht_record_pool.dart | 264 ++++++++------- packages/veilid_support/pubspec.lock | 8 + packages/veilid_support/pubspec.yaml | 1 + pubspec.lock | 8 + 10 files changed, 340 insertions(+), 289 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cd3873..9182300 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## UNRELEASED ## + +- Fix reconciliation `advance()` +- Add `pool stats` command +- Fixed issue with Android 'back' button exiting the app (#331) +- Deprecated accounts no longer crash application at startup +- Simplify SingleContactMessagesCubit and MessageReconciliation + ## v0.4.7 ## - *Community Contributions* - Fix getting stuck on splash screen when veilid is already started @bmv437 / @bgrift diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart index 73dddc6..9a65e82 100644 --- a/lib/chat/cubits/reconciliation/author_input_queue.dart +++ b/lib/chat/cubits/reconciliation/author_input_queue.dart @@ -83,16 +83,13 @@ class AuthorInputQueue { } } - /// Remove a reconciled message and move to the next message + /// Move the reconciliation cursor (_inputPosition) forward on the input + /// queue and tees up the next message for processing /// Returns true if there is more work to do + /// Returns false if there are no more messages to reconcile in this queue Future advance() async { - final currentMessage = await getCurrentMessage(); - if (currentMessage == null) { - return false; - } - // Move current message to previous - _previousMessage = _currentMessage; + _previousMessage = await getCurrentMessage(); _currentMessage = null; while (true) { @@ -178,7 +175,7 @@ class AuthorInputQueue { // _inputPosition points to either before the input source starts // or the position of the previous element. We still need to set the - // _currentMessage to the previous element so consume() can compare + // _currentMessage to the previous element so advance() can compare // against it if we can. if (_inputPosition >= 0) { _currentMessage = currentWindow diff --git a/lib/veilid_processor/views/developer.dart b/lib/veilid_processor/views/developer.dart index e329d47..d749a9c 100644 --- a/lib/veilid_processor/views/developer.dart +++ b/lib/veilid_processor/views/developer.dart @@ -98,6 +98,16 @@ class _DeveloperPageState extends State { return true; } + if (debugCommand == 'pool stats') { + try { + DHTRecordPool.instance.debugPrintStats(); + } on Exception catch (e, st) { + _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); + return false; + } + return true; + } + if (debugCommand.startsWith('change_log_ignore ')) { final args = debugCommand.split(' '); if (args.length < 3) { @@ -129,9 +139,10 @@ class _DeveloperPageState extends State { if (debugCommand == 'help') { out = 'VeilidChat Commands:\n' - ' pool allocations - List DHTRecordPool allocations\n' - ' pool opened - List opened DHTRecord instances' - ' from the pool\n' + ' pool \n' + ' allocations - List DHTRecordPool allocations\n' + ' opened - List opened DHTRecord instances\n' + ' stats - Dump DHTRecordPool statistics\n' ' change_log_ignore change the log' ' target ignore list for a tracing layer\n' ' targets to add to the ignore list can be separated by' diff --git a/packages/veilid_support/example/pubspec.lock b/packages/veilid_support/example/pubspec.lock index 2e87d8c..c40540e 100644 --- a/packages/veilid_support/example/pubspec.lock +++ b/packages/veilid_support/example/pubspec.lock @@ -258,6 +258,14 @@ packages: url: "https://pub.dev" source: hosted version: "4.1.2" + indent: + dependency: transitive + description: + name: indent + sha256: "819319a5c185f7fe412750c798953378b37a0d0d32564ce33e7c5acfd1372d2a" + url: "https://pub.dev" + source: hosted + version: "2.0.0" integration_test: dependency: "direct dev" description: flutter diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/barrel.dart b/packages/veilid_support/lib/dht_support/src/dht_record/barrel.dart index 06933be..2d7e677 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/barrel.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/barrel.dart @@ -1,3 +1,4 @@ export 'default_dht_record_cubit.dart'; export 'dht_record_cubit.dart'; export 'dht_record_pool.dart'; +export 'stats.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index d632b58..b9218e4 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -119,55 +119,56 @@ class DHTRecord implements DHTDeleteable { /// * 'outSeqNum' optionally returns the sequence number of the value being /// returned if one was returned. Future get( - {int subkey = -1, - VeilidCrypto? crypto, - DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, - Output? outSeqNum}) async { - subkey = subkeyOrDefault(subkey); + {int subkey = -1, + VeilidCrypto? crypto, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, + Output? outSeqNum}) async => + _wrapStats('get', () async { + subkey = subkeyOrDefault(subkey); - // Get the last sequence number if we need it - final lastSeq = - refreshMode._inspectLocal ? await _localSubkeySeq(subkey) : null; + // Get the last sequence number if we need it + final lastSeq = + refreshMode._inspectLocal ? await _localSubkeySeq(subkey) : null; - // See if we only ever want the locally stored value - if (refreshMode == DHTRecordRefreshMode.local && lastSeq == null) { - // If it's not available locally already just return null now - return null; - } - - var retry = kDHTTryAgainTries; - ValueData? valueData; - while (true) { - try { - valueData = await _routingContext.getDHTValue(key, subkey, - forceRefresh: refreshMode._forceRefresh); - break; - } on VeilidAPIExceptionTryAgain { - retry--; - if (retry == 0) { - throw const DHTExceptionNotAvailable(); + // See if we only ever want the locally stored value + if (refreshMode == DHTRecordRefreshMode.local && lastSeq == null) { + // If it's not available locally already just return null now + return null; } - await asyncSleep(); - } - } - if (valueData == null) { - return null; - } - // See if this get resulted in a newer sequence number - if (refreshMode == DHTRecordRefreshMode.update && - lastSeq != null && - valueData.seq <= lastSeq) { - // If we're only returning updates then punt now - return null; - } - // If we're returning a value, decrypt it - final out = (crypto ?? _crypto).decrypt(valueData.data); - if (outSeqNum != null) { - outSeqNum.save(valueData.seq); - } - return out; - } + var retry = kDHTTryAgainTries; + ValueData? valueData; + while (true) { + try { + valueData = await _routingContext.getDHTValue(key, subkey, + forceRefresh: refreshMode._forceRefresh); + break; + } on VeilidAPIExceptionTryAgain { + retry--; + if (retry == 0) { + throw const DHTExceptionNotAvailable(); + } + await asyncSleep(); + } + } + if (valueData == null) { + return null; + } + + // See if this get resulted in a newer sequence number + if (refreshMode == DHTRecordRefreshMode.update && + lastSeq != null && + valueData.seq <= lastSeq) { + // If we're only returning updates then punt now + return null; + } + // If we're returning a value, decrypt it + final out = (crypto ?? _crypto).decrypt(valueData.data); + if (outSeqNum != null) { + outSeqNum.save(valueData.seq); + } + return out; + }); /// Get a subkey value from this record. /// Process the record returned with a JSON unmarshal function 'fromJson'. @@ -223,97 +224,102 @@ class DHTRecord implements DHTDeleteable { /// If a newer value was found on the network, it is returned /// If the value was succesfully written, null is returned Future tryWriteBytes(Uint8List newValue, - {int subkey = -1, - VeilidCrypto? crypto, - KeyPair? writer, - Output? outSeqNum}) async { - subkey = subkeyOrDefault(subkey); - final lastSeq = await _localSubkeySeq(subkey); - final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue); + {int subkey = -1, + VeilidCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) async => + _wrapStats('tryWriteBytes', () async { + subkey = subkeyOrDefault(subkey); + final lastSeq = await _localSubkeySeq(subkey); + final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue); - // Set the new data if possible - var newValueData = await _routingContext - .setDHTValue(key, subkey, encryptedNewValue, writer: writer ?? _writer); - if (newValueData == null) { - // A newer value wasn't found on the set, but - // we may get a newer value when getting the value for the sequence number - newValueData = await _routingContext.getDHTValue(key, subkey); - if (newValueData == null) { - assert(newValueData != null, "can't get value that was just set"); - return null; - } - } + // Set the new data if possible + var newValueData = await _routingContext.setDHTValue( + key, subkey, encryptedNewValue, + writer: writer ?? _writer); + if (newValueData == null) { + // A newer value wasn't found on the set, but + // we may get a newer value when getting the value for the sequence number + newValueData = await _routingContext.getDHTValue(key, subkey); + if (newValueData == null) { + assert(newValueData != null, "can't get value that was just set"); + return null; + } + } - // Record new sequence number - final isUpdated = newValueData.seq != lastSeq; - if (isUpdated && outSeqNum != null) { - outSeqNum.save(newValueData.seq); - } + // Record new sequence number + final isUpdated = newValueData.seq != lastSeq; + if (isUpdated && outSeqNum != null) { + outSeqNum.save(newValueData.seq); + } - // See if the encrypted data returned is exactly the same - // if so, shortcut and don't bother decrypting it - if (newValueData.data.equals(encryptedNewValue)) { - if (isUpdated) { - DHTRecordPool.instance._processLocalValueChange(key, newValue, subkey); - } - return null; - } + // See if the encrypted data returned is exactly the same + // if so, shortcut and don't bother decrypting it + if (newValueData.data.equals(encryptedNewValue)) { + if (isUpdated) { + DHTRecordPool.instance + ._processLocalValueChange(key, newValue, subkey); + } + return null; + } - // Decrypt value to return it - final decryptedNewValue = - await (crypto ?? _crypto).decrypt(newValueData.data); - if (isUpdated) { - DHTRecordPool.instance - ._processLocalValueChange(key, decryptedNewValue, subkey); - } - return decryptedNewValue; - } + // Decrypt value to return it + final decryptedNewValue = + await (crypto ?? _crypto).decrypt(newValueData.data); + if (isUpdated) { + DHTRecordPool.instance + ._processLocalValueChange(key, decryptedNewValue, subkey); + } + return decryptedNewValue; + }); /// Attempt to write a byte buffer to a DHTRecord subkey /// If a newer value was found on the network, another attempt /// will be made to write the subkey until this succeeds Future eventualWriteBytes(Uint8List newValue, - {int subkey = -1, - VeilidCrypto? crypto, - KeyPair? writer, - Output? outSeqNum}) async { - subkey = subkeyOrDefault(subkey); - final lastSeq = await _localSubkeySeq(subkey); - final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue); + {int subkey = -1, + VeilidCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) async => + _wrapStats('eventualWriteBytes', () async { + subkey = subkeyOrDefault(subkey); + final lastSeq = await _localSubkeySeq(subkey); + final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue); - ValueData? newValueData; - do { - do { - // Set the new data - newValueData = await _routingContext.setDHTValue( - key, subkey, encryptedNewValue, - writer: writer ?? _writer); + ValueData? newValueData; + do { + do { + // Set the new data + newValueData = await _routingContext.setDHTValue( + key, subkey, encryptedNewValue, + writer: writer ?? _writer); - // Repeat if newer data on the network was found - } while (newValueData != null); + // Repeat if newer data on the network was found + } while (newValueData != null); - // Get the data to check its sequence number - newValueData = await _routingContext.getDHTValue(key, subkey); - if (newValueData == null) { - assert(newValueData != null, "can't get value that was just set"); - return; - } + // Get the data to check its sequence number + newValueData = await _routingContext.getDHTValue(key, subkey); + if (newValueData == null) { + assert(newValueData != null, "can't get value that was just set"); + return; + } - // Record new sequence number - if (outSeqNum != null) { - outSeqNum.save(newValueData.seq); - } + // Record new sequence number + if (outSeqNum != null) { + outSeqNum.save(newValueData.seq); + } - // The encrypted data returned should be exactly the same - // as what we are trying to set, - // otherwise we still need to keep trying to set the value - } while (!newValueData.data.equals(encryptedNewValue)); + // The encrypted data returned should be exactly the same + // as what we are trying to set, + // otherwise we still need to keep trying to set the value + } while (!newValueData.data.equals(encryptedNewValue)); - final isUpdated = newValueData.seq != lastSeq; - if (isUpdated) { - DHTRecordPool.instance._processLocalValueChange(key, newValue, subkey); - } - } + final isUpdated = newValueData.seq != lastSeq; + if (isUpdated) { + DHTRecordPool.instance + ._processLocalValueChange(key, newValue, subkey); + } + }); /// Attempt to write a byte buffer to a DHTRecord subkey /// If a newer value was found on the network, another attempt @@ -321,32 +327,36 @@ class DHTRecord implements DHTDeleteable { /// Each attempt to write the value calls an update function with the /// old value to determine what new value should be attempted for that write. Future eventualUpdateBytes( - Future Function(Uint8List? oldValue) update, - {int subkey = -1, - VeilidCrypto? crypto, - KeyPair? writer, - Output? outSeqNum}) async { - subkey = subkeyOrDefault(subkey); + Future Function(Uint8List? oldValue) update, + {int subkey = -1, + VeilidCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) async => + _wrapStats('eventualUpdateBytes', () async { + subkey = subkeyOrDefault(subkey); - // Get the existing data, do not allow force refresh here - // because if we need a refresh the setDHTValue will fail anyway - var oldValue = - await get(subkey: subkey, crypto: crypto, outSeqNum: outSeqNum); + // Get the existing data, do not allow force refresh here + // because if we need a refresh the setDHTValue will fail anyway + var oldValue = + await get(subkey: subkey, crypto: crypto, outSeqNum: outSeqNum); - do { - // Update the data - final updatedValue = await update(oldValue); - if (updatedValue == null) { - // If null is returned from the update, stop trying to do the update - break; - } - // Try to write it back to the network - oldValue = await tryWriteBytes(updatedValue, - subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); + do { + // Update the data + final updatedValue = await update(oldValue); + if (updatedValue == null) { + // If null is returned from the update, stop trying to do the update + break; + } + // Try to write it back to the network + oldValue = await tryWriteBytes(updatedValue, + subkey: subkey, + crypto: crypto, + writer: writer, + outSeqNum: outSeqNum); - // Repeat update if newer data on the network was found - } while (oldValue != null); - } + // Repeat update if newer data on the network was found + } while (oldValue != null); + }); /// Like 'tryWriteBytes' but with JSON marshal/unmarshal of the value Future tryWriteJson(T Function(dynamic) fromJson, T newValue, @@ -555,6 +565,9 @@ class DHTRecord implements DHTDeleteable { local: false, data: update.value?.data, subkeys: update.subkeys); } + Future _wrapStats(String func, Future Function() closure) => + DHTRecordPool.instance._stats.measure(key, debugName, func, closure); + ////////////////////////////////////////////////////////////// final _SharedDHTRecordData _sharedDHTRecordData; diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index 3ee9adc..6dc0634 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -273,24 +273,6 @@ class DHTRecordPool with TableDBBackedJson { } } } - // else { - - // XXX: should no longer be necessary - // // Remove watch state - // - // for (final entry in _opened.entries) { - // final openedKey = entry.key; - // final openedRecordInfo = entry.value; - - // if (openedKey == updateValueChange.key) { - // for (final rec in openedRecordInfo.records) { - // rec._watchState = null; - // } - // openedRecordInfo.shared.needsWatchStateUpdate = true; - // break; - // } - // } - //} } /// Log the current record allocations @@ -320,6 +302,11 @@ class DHTRecordPool with TableDBBackedJson { } } + /// Log the performance stats + void debugPrintStats() { + log('DHTRecordPool Stats:\n${_stats.debugString()}'); + } + /// Public interface to DHTRecordPool logger void log(String message) { _logger?.call(message); @@ -369,109 +356,110 @@ class DHTRecordPool with TableDBBackedJson { } Future _recordOpenCommon( - {required String debugName, - required VeilidRoutingContext dhtctx, - required TypedKey recordKey, - required VeilidCrypto crypto, - required KeyPair? writer, - required TypedKey? parent, - required int defaultSubkey}) async { - log('openDHTRecord: debugName=$debugName key=$recordKey'); + {required String debugName, + required VeilidRoutingContext dhtctx, + required TypedKey recordKey, + required VeilidCrypto crypto, + required KeyPair? writer, + required TypedKey? parent, + required int defaultSubkey}) async => + _stats.measure(recordKey, debugName, '_recordOpenCommon', () async { + log('openDHTRecord: debugName=$debugName key=$recordKey'); - // See if this has been opened yet - final openedRecordInfo = await _mutex.protect(() async { - // If we are opening a key that already exists - // make sure we are using the same parent if one was specified - _validateParentInner(parent, recordKey); + // See if this has been opened yet + final openedRecordInfo = await _mutex.protect(() async { + // If we are opening a key that already exists + // make sure we are using the same parent if one was specified + _validateParentInner(parent, recordKey); - return _opened[recordKey]; - }); + return _opened[recordKey]; + }); - if (openedRecordInfo == null) { - // Fresh open, just open the record - var retry = kDHTKeyNotFoundTries; - late final DHTRecordDescriptor recordDescriptor; - while (true) { - try { - recordDescriptor = - await dhtctx.openDHTRecord(recordKey, writer: writer); - break; - } on VeilidAPIExceptionTryAgain { - throw const DHTExceptionNotAvailable(); - } on VeilidAPIExceptionKeyNotFound { - await asyncSleep(); - retry--; - if (retry == 0) { - throw const DHTExceptionNotAvailable(); + if (openedRecordInfo == null) { + // Fresh open, just open the record + var retry = kDHTKeyNotFoundTries; + late final DHTRecordDescriptor recordDescriptor; + while (true) { + try { + recordDescriptor = + await dhtctx.openDHTRecord(recordKey, writer: writer); + break; + } on VeilidAPIExceptionTryAgain { + throw const DHTExceptionNotAvailable(); + } on VeilidAPIExceptionKeyNotFound { + await asyncSleep(); + retry--; + if (retry == 0) { + throw const DHTExceptionNotAvailable(); + } + } } + + final newOpenedRecordInfo = _OpenedRecordInfo( + recordDescriptor: recordDescriptor, + defaultWriter: writer, + defaultRoutingContext: dhtctx); + + final rec = DHTRecord._( + debugName: debugName, + routingContext: dhtctx, + defaultSubkey: defaultSubkey, + sharedDHTRecordData: newOpenedRecordInfo.shared, + writer: writer, + crypto: crypto); + + await _mutex.protect(() async { + // Register the opened record + _opened[recordDescriptor.key] = newOpenedRecordInfo; + + // Register the dependency + await _addDependencyInner( + parent, + recordKey, + debugName: debugName, + ); + + // Register the newly opened record + newOpenedRecordInfo.records.add(rec); + }); + + return rec; } - } - final newOpenedRecordInfo = _OpenedRecordInfo( - recordDescriptor: recordDescriptor, - defaultWriter: writer, - defaultRoutingContext: dhtctx); + // Already opened - final rec = DHTRecord._( - debugName: debugName, - routingContext: dhtctx, - defaultSubkey: defaultSubkey, - sharedDHTRecordData: newOpenedRecordInfo.shared, - writer: writer, - crypto: crypto); + // See if we need to reopen the record with a default writer and possibly + // a different routing context + if (writer != null && openedRecordInfo.shared.defaultWriter == null) { + await dhtctx.openDHTRecord(recordKey, writer: writer); + // New writer if we didn't specify one before + openedRecordInfo.shared.defaultWriter = writer; + // New default routing context if we opened it again + openedRecordInfo.shared.defaultRoutingContext = dhtctx; + } - await _mutex.protect(() async { - // Register the opened record - _opened[recordDescriptor.key] = newOpenedRecordInfo; + final rec = DHTRecord._( + debugName: debugName, + routingContext: dhtctx, + defaultSubkey: defaultSubkey, + sharedDHTRecordData: openedRecordInfo.shared, + writer: writer, + crypto: crypto); - // Register the dependency - await _addDependencyInner( - parent, - recordKey, - debugName: debugName, - ); + await _mutex.protect(() async { + // Register the dependency + await _addDependencyInner( + parent, + recordKey, + debugName: debugName, + ); - // Register the newly opened record - newOpenedRecordInfo.records.add(rec); + openedRecordInfo.records.add(rec); + }); + + return rec; }); - return rec; - } - - // Already opened - - // See if we need to reopen the record with a default writer and possibly - // a different routing context - if (writer != null && openedRecordInfo.shared.defaultWriter == null) { - await dhtctx.openDHTRecord(recordKey, writer: writer); - // New writer if we didn't specify one before - openedRecordInfo.shared.defaultWriter = writer; - // New default routing context if we opened it again - openedRecordInfo.shared.defaultRoutingContext = dhtctx; - } - - final rec = DHTRecord._( - debugName: debugName, - routingContext: dhtctx, - defaultSubkey: defaultSubkey, - sharedDHTRecordData: openedRecordInfo.shared, - writer: writer, - crypto: crypto); - - await _mutex.protect(() async { - // Register the dependency - await _addDependencyInner( - parent, - recordKey, - debugName: debugName, - ); - - openedRecordInfo.records.add(rec); - }); - - return rec; - } - // Called when a DHTRecord is closed // Cleans up the opened record housekeeping and processes any late deletions Future _recordClosed(DHTRecord record) async { @@ -866,34 +854,37 @@ class DHTRecordPool with TableDBBackedJson { void _pollWatch(TypedKey openedRecordKey, _OpenedRecordInfo openedRecordInfo, _WatchState unionWatchState) { singleFuture((this, _sfPollWatch, openedRecordKey), () async { - final dhtctx = openedRecordInfo.shared.defaultRoutingContext; + await _stats.measure( + openedRecordKey, openedRecordInfo.debugNames, '_pollWatch', () async { + final dhtctx = openedRecordInfo.shared.defaultRoutingContext; - final currentReport = await dhtctx.inspectDHTRecord(openedRecordKey, - subkeys: unionWatchState.subkeys, scope: DHTReportScope.syncGet); + final currentReport = await dhtctx.inspectDHTRecord(openedRecordKey, + subkeys: unionWatchState.subkeys, scope: DHTReportScope.syncGet); - final fsc = currentReport.firstSeqChange; - if (fsc == null) { - return null; - } - final newerSubkeys = currentReport.newerOnlineSubkeys; + final fsc = currentReport.firstSeqChange; + if (fsc == null) { + return null; + } + final newerSubkeys = currentReport.newerOnlineSubkeys; - final valueData = await dhtctx.getDHTValue(openedRecordKey, fsc.subkey, - forceRefresh: true); - if (valueData == null) { - return; - } + final valueData = await dhtctx.getDHTValue(openedRecordKey, fsc.subkey, + forceRefresh: true); + if (valueData == null) { + return; + } - if (valueData.seq < fsc.newSeq) { - log('inspect returned a newer seq than get: ${valueData.seq} < $fsc'); - } + if (valueData.seq < fsc.newSeq) { + log('inspect returned a newer seq than get: ${valueData.seq} < $fsc'); + } - if (fsc.oldSeq == null || valueData.seq > fsc.oldSeq!) { - processRemoteValueChange(VeilidUpdateValueChange( - key: openedRecordKey, - subkeys: newerSubkeys, - count: 0xFFFFFFFF, - value: valueData)); - } + if (fsc.oldSeq == null || valueData.seq > fsc.oldSeq!) { + processRemoteValueChange(VeilidUpdateValueChange( + key: openedRecordKey, + subkeys: newerSubkeys, + count: 0xFFFFFFFF, + value: valueData)); + } + }); }); } @@ -915,8 +906,11 @@ class DHTRecordPool with TableDBBackedJson { _watchStateProcessors.updateState( openedRecordKey, unionWatchState, - (newState) => - _watchStateChange(openedRecordKey, unionWatchState)); + (newState) => _stats.measure( + openedRecordKey, + openedRecordInfo.debugNames, + '_watchStateChange', + () => _watchStateChange(openedRecordKey, unionWatchState))); } } }); @@ -958,6 +952,8 @@ class DHTRecordPool with TableDBBackedJson { // Watch state processors final _watchStateProcessors = SingleStateProcessorMap(); + // Statistics + final _stats = DHTStats(); static DHTRecordPool? _singleton; } diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index 11c2db6..d86402b 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -331,6 +331,14 @@ packages: url: "https://pub.dev" source: hosted version: "4.1.2" + indent: + dependency: "direct main" + description: + name: indent + sha256: "819319a5c185f7fe412750c798953378b37a0d0d32564ce33e7c5acfd1372d2a" + url: "https://pub.dev" + source: hosted + version: "2.0.0" io: dependency: transitive description: diff --git a/packages/veilid_support/pubspec.yaml b/packages/veilid_support/pubspec.yaml index 548c40e..65ba78d 100644 --- a/packages/veilid_support/pubspec.yaml +++ b/packages/veilid_support/pubspec.yaml @@ -16,6 +16,7 @@ dependencies: equatable: ^2.0.7 fast_immutable_collections: ^11.0.3 freezed_annotation: ^3.0.0 + indent: ^2.0.0 json_annotation: ^4.9.0 loggy: ^2.0.3 meta: ^1.16.0 diff --git a/pubspec.lock b/pubspec.lock index db2adfc..e7252a3 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -809,6 +809,14 @@ packages: url: "https://pub.dev" source: hosted version: "4.5.3" + indent: + dependency: transitive + description: + name: indent + sha256: "819319a5c185f7fe412750c798953378b37a0d0d32564ce33e7c5acfd1372d2a" + url: "https://pub.dev" + source: hosted + version: "2.0.0" intl: dependency: "direct main" description: