From 0b4de3ad13572d967e4fe879585f5904d315eeeb Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 24 Apr 2024 22:43:42 -0400 Subject: [PATCH] better handling of subkey sequence numbers --- .../cubits/single_contact_messages_cubit.dart | 1 + lib/chat/views/chat_component.dart | 8 ++ lib/tools/loggy.dart | 5 +- lib/tools/state_logger.dart | 1 + .../src/dht_record/dht_record.dart | 90 ++++++++++++++----- .../src/dht_record/dht_record_pool.dart | 83 +++++++++++++---- .../dht_short_array/dht_short_array_head.dart | 9 +- .../dht_short_array/dht_short_array_read.dart | 11 ++- .../dht_short_array_write.dart | 34 +++++-- 9 files changed, 185 insertions(+), 57 deletions(-) diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 41c872d..6605ab6 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -379,6 +379,7 @@ class SingleContactMessagesCubit extends Cubit { .toIList(); // Emit the rendered state + emit(AsyncValue.data(renderedState)); } diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index d0d3f39..8ca471e 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -158,9 +158,17 @@ class ChatComponent extends StatelessWidget { // Convert protobuf messages to chat messages final chatMessages = []; + final tsSet = {}; for (final message in messages) { final chatMessage = messageToChatMessage(message); chatMessages.insert(0, chatMessage); + if (!tsSet.add(chatMessage.id)) { + // ignore: avoid_print + print('duplicate id found: ${chatMessage.id}:\n' + 'Messages:\n$messages\n' + 'ChatMessages:\n$chatMessages'); + assert(false, 'should not have duplicate id'); + } } return DefaultTextStyle( diff --git a/lib/tools/loggy.dart b/lib/tools/loggy.dart index 9947308..c422ec8 100644 --- a/lib/tools/loggy.dart +++ b/lib/tools/loggy.dart @@ -9,6 +9,7 @@ import 'package:loggy/loggy.dart'; import 'package:veilid_support/veilid_support.dart'; import '../veilid_processor/views/developer.dart'; +import 'responsive.dart'; import 'state_logger.dart'; String wrapWithLogColor(LogLevel? level, String text) { @@ -111,7 +112,9 @@ class CallbackPrinter extends LoggyPrinter { @override void onLog(LogRecord record) { final out = record.pretty(); - debugPrint(out); + if (isDesktop) { + debugPrintSynchronously(out); + } globalDebugTerminal.write('$out\n'.replaceAll('\n', '\r\n')); callback?.call(record); } diff --git a/lib/tools/state_logger.dart b/lib/tools/state_logger.dart index b17727f..4c8e17a 100644 --- a/lib/tools/state_logger.dart +++ b/lib/tools/state_logger.dart @@ -8,6 +8,7 @@ const Map _blocChangeLogLevels = { 'ActiveConversationsBlocMapCubit': LogLevel.off, 'DHTShortArrayCubit': LogLevel.off, 'PersistentQueueCubit': LogLevel.off, + 'SingleContactMessagesCubit': LogLevel.off, }; const Map _blocCreateCloseLogLevels = {}; const Map _blocErrorLogLevels = {}; diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index f7df9a9..7d29292 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -104,7 +104,8 @@ class DHTRecord { {int subkey = -1, DHTRecordCrypto? crypto, bool forceRefresh = false, - bool onlyUpdates = false}) async { + bool onlyUpdates = false, + Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); final valueData = await _routingContext.getDHTValue(key, subkey, forceRefresh: forceRefresh); @@ -116,6 +117,9 @@ class DHTRecord { return null; } final out = (crypto ?? _crypto).decrypt(valueData.data, subkey); + if (outSeqNum != null) { + outSeqNum.save(valueData.seq); + } _sharedDHTRecordData.subkeySeqCache[subkey] = valueData.seq; return out; } @@ -124,12 +128,14 @@ class DHTRecord { {int subkey = -1, DHTRecordCrypto? crypto, bool forceRefresh = false, - bool onlyUpdates = false}) async { + bool onlyUpdates = false, + Output? outSeqNum}) async { final data = await get( subkey: subkey, crypto: crypto, forceRefresh: forceRefresh, - onlyUpdates: onlyUpdates); + onlyUpdates: onlyUpdates, + outSeqNum: outSeqNum); if (data == null) { return null; } @@ -141,12 +147,14 @@ class DHTRecord { {int subkey = -1, DHTRecordCrypto? crypto, bool forceRefresh = false, - bool onlyUpdates = false}) async { + bool onlyUpdates = false, + Output? outSeqNum}) async { final data = await get( subkey: subkey, crypto: crypto, forceRefresh: forceRefresh, - onlyUpdates: onlyUpdates); + onlyUpdates: onlyUpdates, + outSeqNum: outSeqNum); if (data == null) { return null; } @@ -154,7 +162,10 @@ class DHTRecord { } Future tryWriteBytes(Uint8List newValue, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) async { + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; final encryptedNewValue = @@ -175,6 +186,9 @@ class DHTRecord { // Record new sequence number final isUpdated = newValueData.seq != lastSeq; + if (isUpdated && outSeqNum != null) { + outSeqNum.save(newValueData.seq); + } _sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq; // See if the encrypted data returned is exactly the same @@ -197,7 +211,10 @@ class DHTRecord { } Future eventualWriteBytes(Uint8List newValue, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) async { + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; final encryptedNewValue = @@ -222,6 +239,9 @@ class DHTRecord { } // Record new sequence number + if (outSeqNum != null) { + outSeqNum.save(newValueData.seq); + } _sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq; // The encrypted data returned should be exactly the same @@ -239,12 +259,14 @@ class DHTRecord { Future Function(Uint8List? oldValue) update, {int subkey = -1, DHTRecordCrypto? crypto, - KeyPair? writer}) async { + KeyPair? writer, + Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); // Get the existing data, do not allow force refresh here // because if we need a refresh the setDHTValue will fail anyway - var oldValue = await get(subkey: subkey, crypto: crypto); + var oldValue = + await get(subkey: subkey, crypto: crypto, outSeqNum: outSeqNum); do { // Update the data @@ -252,16 +274,22 @@ class DHTRecord { // Try to write it back to the network oldValue = await tryWriteBytes(updatedValue, - subkey: subkey, crypto: crypto, writer: writer); + subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); // Repeat update if newer data on the network was found } while (oldValue != null); } Future tryWriteJson(T Function(dynamic) fromJson, T newValue, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) => + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) => tryWriteBytes(jsonEncodeBytes(newValue), - subkey: subkey, crypto: crypto, writer: writer) + subkey: subkey, + crypto: crypto, + writer: writer, + outSeqNum: outSeqNum) .then((out) { if (out == null) { return null; @@ -271,9 +299,15 @@ class DHTRecord { Future tryWriteProtobuf( T Function(List) fromBuffer, T newValue, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) => + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) => tryWriteBytes(newValue.writeToBuffer(), - subkey: subkey, crypto: crypto, writer: writer) + subkey: subkey, + crypto: crypto, + writer: writer, + outSeqNum: outSeqNum) .then((out) { if (out == null) { return null; @@ -282,26 +316,38 @@ class DHTRecord { }); Future eventualWriteJson(T newValue, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) => + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) => eventualWriteBytes(jsonEncodeBytes(newValue), - subkey: subkey, crypto: crypto, writer: writer); + subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); Future eventualWriteProtobuf(T newValue, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) => + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) => eventualWriteBytes(newValue.writeToBuffer(), - subkey: subkey, crypto: crypto, writer: writer); + subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); Future eventualUpdateJson( T Function(dynamic) fromJson, Future Function(T?) update, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) => + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) => eventualUpdateBytes(jsonUpdate(fromJson, update), - subkey: subkey, crypto: crypto, writer: writer); + subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); Future eventualUpdateProtobuf( T Function(List) fromBuffer, Future Function(T?) update, - {int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) => + {int subkey = -1, + DHTRecordCrypto? crypto, + KeyPair? writer, + Output? outSeqNum}) => eventualUpdateBytes(protobufUpdate(fromBuffer, update), - subkey: subkey, crypto: crypto, writer: writer); + subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum); Future watch( {List? subkeys, diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index 333558e..0be4f25 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -10,6 +10,9 @@ import 'package:protobuf/protobuf.dart'; import '../../../../veilid_support.dart'; +export 'package:fast_immutable_collections/fast_immutable_collections.dart' + show Output; + part 'dht_record_pool.freezed.dart'; part 'dht_record_pool.g.dart'; part 'dht_record.dart'; @@ -17,6 +20,10 @@ part 'dht_record.dart'; const int watchBackoffMultiplier = 2; const int watchBackoffMax = 30; +const int? defaultWatchDurationSecs = null; // 600 +const int watchRenewalNumerator = 4; +const int watchRenewalDenominator = 5; + typedef DHTRecordPoolLogger = void Function(String message); /// Record pool that managed DHTRecords and allows for tagged deletion @@ -56,14 +63,17 @@ class WatchState extends Equatable { {required this.subkeys, required this.expiration, required this.count, - this.realExpiration}); + this.realExpiration, + this.renewalTime}); final List? subkeys; final Timestamp? expiration; final int? count; final Timestamp? realExpiration; + final Timestamp? renewalTime; @override - List get props => [subkeys, expiration, count, realExpiration]; + List get props => + [subkeys, expiration, count, realExpiration, renewalTime]; } /// Data shared amongst all DHTRecord instances @@ -77,6 +87,7 @@ class SharedDHTRecordData { VeilidRoutingContext defaultRoutingContext; Map subkeySeqCache = {}; bool needsWatchStateUpdate = false; + WatchState? unionWatchState; bool deleteOnClose = false; } @@ -616,6 +627,7 @@ class DHTRecordPool with TableDBBackedJson { int? totalCount; Timestamp? maxExpiration; List? allSubkeys; + Timestamp? earliestRenewalTime; var noExpiration = false; var everySubkey = false; @@ -648,6 +660,15 @@ class DHTRecordPool with TableDBBackedJson { } else { everySubkey = true; } + final wsRenewalTime = ws.renewalTime; + if (wsRenewalTime != null) { + earliestRenewalTime = earliestRenewalTime == null + ? wsRenewalTime + : Timestamp( + value: (wsRenewalTime.value < earliestRenewalTime.value + ? wsRenewalTime.value + : earliestRenewalTime.value)); + } } } if (noExpiration) { @@ -661,11 +682,14 @@ class DHTRecordPool with TableDBBackedJson { } return WatchState( - subkeys: allSubkeys, expiration: maxExpiration, count: totalCount); + subkeys: allSubkeys, + expiration: maxExpiration, + count: totalCount, + renewalTime: earliestRenewalTime); } - void _updateWatchRealExpirations( - Iterable records, Timestamp realExpiration) { + void _updateWatchRealExpirations(Iterable records, + Timestamp realExpiration, Timestamp renewalTime) { for (final rec in records) { final ws = rec.watchState; if (ws != null) { @@ -673,7 +697,8 @@ class DHTRecordPool with TableDBBackedJson { subkeys: ws.subkeys, expiration: ws.expiration, count: ws.count, - realExpiration: realExpiration); + realExpiration: realExpiration, + renewalTime: renewalTime); } } } @@ -689,6 +714,7 @@ class DHTRecordPool with TableDBBackedJson { } _inTick = true; _tickCount = 0; + final now = veilid.now(); try { final allSuccess = await _mutex.protect(() async { @@ -700,12 +726,24 @@ class DHTRecordPool with TableDBBackedJson { final openedRecordInfo = kv.value; final dhtctx = openedRecordInfo.shared.defaultRoutingContext; - if (openedRecordInfo.shared.needsWatchStateUpdate) { - final watchState = + var wantsWatchStateUpdate = + openedRecordInfo.shared.needsWatchStateUpdate; + + // Check if we have reached renewal time for the watch + if (openedRecordInfo.shared.unionWatchState != null && + openedRecordInfo.shared.unionWatchState!.renewalTime != null && + now.value > + openedRecordInfo.shared.unionWatchState!.renewalTime!.value) { + wantsWatchStateUpdate = true; + } + + if (wantsWatchStateUpdate) { + // Update union watch state + final unionWatchState = openedRecordInfo.shared.unionWatchState = _collectUnionWatchState(openedRecordInfo.records); // Apply watch changes for record - if (watchState == null) { + if (unionWatchState == null) { unord.add(() async { // Record needs watch cancel var success = false; @@ -727,26 +765,39 @@ class DHTRecordPool with TableDBBackedJson { // Record needs new watch var success = false; try { - final subkeys = watchState.subkeys?.toList(); - final count = watchState.count; - final expiration = watchState.expiration; + final subkeys = unionWatchState.subkeys?.toList(); + final count = unionWatchState.count; + final expiration = unionWatchState.expiration; + final now = veilid.now(); final realExpiration = await dhtctx.watchDHTValues( openedRecordKey, - subkeys: watchState.subkeys?.toList(), - count: watchState.count, - expiration: watchState.expiration); + subkeys: unionWatchState.subkeys?.toList(), + count: unionWatchState.count, + expiration: unionWatchState.expiration ?? + (defaultWatchDurationSecs == null + ? null + : veilid.now().offset( + TimestampDuration.fromMillis( + defaultWatchDurationSecs! * 1000)))); + + final expirationDuration = realExpiration.diff(now); + final renewalTime = now.offset(TimestampDuration( + value: expirationDuration.value * + BigInt.from(watchRenewalNumerator) ~/ + BigInt.from(watchRenewalDenominator))); log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, ' 'count=$count, expiration=$expiration, ' 'realExpiration=$realExpiration, ' + 'renewalTime=$renewalTime, ' 'debugNames=${openedRecordInfo.debugNames}'); // Update watch states with real expiration if (realExpiration.value != BigInt.zero) { openedRecordInfo.shared.needsWatchStateUpdate = false; _updateWatchRealExpirations( - openedRecordInfo.records, realExpiration); + openedRecordInfo.records, realExpiration, renewalTime); success = true; } } on VeilidAPIException catch (e) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 2fd1a60..f6f3a5a 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -424,21 +424,18 @@ class _DHTShortArrayHead { /// Update the sequence number for a particular index in /// our local sequence number list. /// If a write is happening, update the network copy as well. - Future updatePositionSeq(int pos, bool write) async { + void updatePositionSeq(int pos, bool write, int newSeq) { final idx = _index[pos]; - final lookup = await lookupIndex(idx); - final report = await lookup.record - .inspect(subkeys: [ValueSubkeyRange.single(lookup.recordSubkey)]); while (_localSeqs.length <= idx) { _localSeqs.add(0xFFFFFFFF); } - _localSeqs[idx] = report.localSeqs[0]; + _localSeqs[idx] = newSeq; if (write) { while (_seqs.length <= idx) { _seqs.add(0xFFFFFFFF); } - _seqs[idx] = report.localSeqs[0]; + _seqs[idx] = newSeq; } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 44e565d..0dbe51e 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -74,9 +74,14 @@ class _DHTShortArrayRead implements DHTShortArrayRead { final lookup = await _head.lookupPosition(pos); final refresh = forceRefresh || _head.positionNeedsRefresh(pos); - final out = - lookup.record.get(subkey: lookup.recordSubkey, forceRefresh: refresh); - await _head.updatePositionSeq(pos, false); + final outSeqNum = Output(); + final out = lookup.record.get( + subkey: lookup.recordSubkey, + forceRefresh: refresh, + outSeqNum: outSeqNum); + if (outSeqNum.value != null) { + _head.updatePositionSeq(pos, false, outSeqNum.value!); + } return out; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index af6204e..d1c8b2f 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -110,9 +110,6 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead return false; } - // Get sequence number written - await _head.updatePositionSeq(pos, true); - return true; } @@ -127,9 +124,6 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead return false; } - // Get sequence number written - await _head.updatePositionSeq(pos, true); - return true; } @@ -153,9 +147,17 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead throw IndexError.withLength(pos, _head.length); } final lookup = await _head.lookupPosition(pos); + + final outSeqNum = Output(); + final result = lookup.seq == 0xFFFFFFFF ? null : await lookup.record.get(subkey: lookup.recordSubkey); + + if (outSeqNum.value != null) { + _head.updatePositionSeq(pos, false, outSeqNum.value!); + } + if (result == null) { throw StateError('Element does not exist'); } @@ -175,11 +177,25 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead throw IndexError.withLength(pos, _head.length); } final lookup = await _head.lookupPosition(pos); + + final outSeqNum = Output(); + final oldValue = lookup.seq == 0xFFFFFFFF ? null - : await lookup.record.get(subkey: lookup.recordSubkey); - final result = await lookup.record - .tryWriteBytes(newValue, subkey: lookup.recordSubkey); + : await lookup.record + .get(subkey: lookup.recordSubkey, outSeqNum: outSeqNum); + + if (outSeqNum.value != null) { + _head.updatePositionSeq(pos, false, outSeqNum.value!); + } + + final result = await lookup.record.tryWriteBytes(newValue, + subkey: lookup.recordSubkey, outSeqNum: outSeqNum); + + if (outSeqNum.value != null) { + _head.updatePositionSeq(pos, true, outSeqNum.value!); + } + if (result != null) { // A result coming back means the element was overwritten already return (result, false);