From bf38c2c44df0d88d6e80fc7002dd022f8822ea65 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 17 Apr 2025 18:55:43 -0400 Subject: [PATCH] clean up a bunch of exceptions --- devtools_options.yaml | 3 +- ios/Podfile.lock | 2 +- .../message_reconciliation.dart | 28 +++-- .../cubits/single_contact_messages_cubit.dart | 56 +++++---- lib/router/cubits/router_cubit.dart | 3 +- lib/veilid_processor/views/developer.dart | 16 ++- packages/veilid_support/example/pubspec.lock | 2 +- .../lib/dht_support/src/dht_log/dht_log.dart | 1 + .../dht_support/src/dht_log/dht_log_read.dart | 12 +- .../src/dht_log/dht_log_spine.dart | 7 +- .../src/dht_log/dht_log_write.dart | 37 ++++-- .../src/dht_record/dht_record_pool.dart | 117 +++++------------- .../dht_record/dht_record_pool_private.dart | 20 +-- .../src/dht_short_array/dht_short_array.dart | 1 + .../dht_short_array/dht_short_array_head.dart | 18 +++ .../dht_short_array/dht_short_array_read.dart | 12 +- .../dht_short_array_write.dart | 20 ++- .../src/interfaces/exceptions.dart | 14 ++- .../lib/src/persistent_queue.dart | 27 +++- .../lib/src/table_db_array.dart | 12 +- packages/veilid_support/pubspec.lock | 2 +- 21 files changed, 244 insertions(+), 166 deletions(-) diff --git a/devtools_options.yaml b/devtools_options.yaml index 5c27c3e..7093540 100644 --- a/devtools_options.yaml +++ b/devtools_options.yaml @@ -1,2 +1,3 @@ extensions: - - provider: true \ No newline at end of file + - provider: true + - shared_preferences: true \ No newline at end of file diff --git a/ios/Podfile.lock b/ios/Podfile.lock index d943756..2528d2d 100644 --- a/ios/Podfile.lock +++ b/ios/Podfile.lock @@ -168,7 +168,7 @@ SPEC CHECKSUMS: sqflite_darwin: 20b2a3a3b70e43edae938624ce550a3cbf66a3d0 system_info_plus: 555ce7047fbbf29154726db942ae785c29211740 url_launcher_ios: 694010445543906933d732453a59da0a173ae33d - veilid: b3b9418ae6b083e662396bfa2c635fb115c8510e + veilid: 3ce560a4f2b568a77a9fd5e23090f2fa97581019 PODFILE CHECKSUM: c8bf5b16c34712d5790b0b8d2472cc66ac0a8487 diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart index f0b8c4c..9b183b5 100644 --- a/lib/chat/cubits/reconciliation/message_reconciliation.dart +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -6,6 +6,7 @@ import 'package:sorted_list/sorted_list.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; +import '../../../tools/tools.dart'; import 'author_input_queue.dart'; import 'author_input_source.dart'; import 'output_position.dart'; @@ -62,17 +63,24 @@ class MessageReconciliation { Future _enqueueAuthorInput( {required TypedKey author, required AuthorInputSource inputSource}) async { - // Get the position of our most recent reconciled message from this author - final outputPosition = await _findLastOutputPosition(author: author); + try { + // Get the position of our most recent reconciled message from this author + final outputPosition = await _findLastOutputPosition(author: author); - // Find oldest message we have not yet reconciled - final inputQueue = await AuthorInputQueue.create( - author: author, - inputSource: inputSource, - outputPosition: outputPosition, - onError: _onError, - ); - return inputQueue; + // Find oldest message we have not yet reconciled + final inputQueue = await AuthorInputQueue.create( + author: author, + inputSource: inputSource, + outputPosition: outputPosition, + onError: _onError, + ); + return inputQueue; + // Catch everything so we can avoid ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + log.error('Exception enqueing author input: $e:\n$st\n'); + return null; + } } // Get the position of our most recent reconciled message from this author diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index b4e77db..559eae2 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -100,8 +100,8 @@ class SingleContactMessagesCubit extends Cubit { key: _remoteConversationRecordKey.toString(), fromBuffer: proto.Message.fromBuffer, closure: _processUnsentMessages, - onError: (e, sp) { - log.error('Exception while processing unsent messages: $e\n$sp\n'); + onError: (e, st) { + log.error('Exception while processing unsent messages: $e\n$st\n'); }); // Make crypto @@ -310,14 +310,11 @@ class SingleContactMessagesCubit extends Cubit { Future _processMessageToSend( proto.Message message, proto.Message? previousMessage) async { - // Get the previous message if we don't have one - previousMessage ??= await _sentMessagesCubit!.operate((r) async => - r.length == 0 - ? null - : await r.getProtobuf(proto.Message.fromBuffer, r.length - 1)); - - message.id = - await _senderMessageIntegrity.generateMessageId(previousMessage); + // It's possible we had a signature from a previous + // operateAppendEventual attempt, so clear it and make a new message id too + message + ..clearSignature() + ..id = await _senderMessageIntegrity.generateMessageId(previousMessage); // Now sign it await _senderMessageIntegrity.signMessage( @@ -326,26 +323,33 @@ class SingleContactMessagesCubit extends Cubit { // Async process to send messages in the background Future _processUnsentMessages(IList messages) async { - // Go through and assign ids to all the messages in order - proto.Message? previousMessage; - final processedMessages = messages.toList(); - for (final message in processedMessages) { - try { - await _processMessageToSend(message, previousMessage); - previousMessage = message; - } on Exception catch (e) { - log.error('Exception processing unsent message: $e'); - } - } - // _sendingMessages = messages; // _renderState(); try { - await _sentMessagesCubit!.operateAppendEventual((writer) => - writer.addAll(messages.map((m) => m.writeToBuffer()).toList())); - } on Exception catch (e) { - log.error('Exception appending unsent messages: $e'); + await _sentMessagesCubit!.operateAppendEventual((writer) async { + // Get the previous message if we have one + var previousMessage = writer.length == 0 + ? null + : await writer.getProtobuf( + proto.Message.fromBuffer, writer.length - 1); + + // Sign all messages + final processedMessages = messages.toList(); + for (final message in processedMessages) { + try { + await _processMessageToSend(message, previousMessage); + previousMessage = message; + } on Exception catch (e, st) { + log.error('Exception processing unsent message: $e:\n$st\n'); + } + } + final byteMessages = messages.map((m) => m.writeToBuffer()).toList(); + + return writer.addAll(byteMessages); + }); + } on Exception catch (e, st) { + log.error('Exception appending unsent messages: $e:\n$st\n'); } // _sendingMessages = const IList.empty(); diff --git a/lib/router/cubits/router_cubit.dart b/lib/router/cubits/router_cubit.dart index e5a2024..d651611 100644 --- a/lib/router/cubits/router_cubit.dart +++ b/lib/router/cubits/router_cubit.dart @@ -138,7 +138,8 @@ class RouterCubit extends Cubit { return null; case '/developer': return null; - // Otherwise, if there's no account, we need to go to the new account page. + // Otherwise, if there's no account, + // we need to go to the new account page. default: return state.hasAnyAccount ? null : '/new_account'; } diff --git a/lib/veilid_processor/views/developer.dart b/lib/veilid_processor/views/developer.dart index cb64d3d..08463fb 100644 --- a/lib/veilid_processor/views/developer.dart +++ b/lib/veilid_processor/views/developer.dart @@ -124,7 +124,21 @@ class _DeveloperPageState extends State { _debugOut('DEBUG >>>\n$debugCommand\n'); try { - final out = await Veilid.instance.debug(debugCommand); + var out = await Veilid.instance.debug(debugCommand); + + if (debugCommand == 'help') { + out = 'VeilidChat Commands:\n' + ' pool allocations - List DHTRecordPool allocations\n' + ' pool opened - List opened DHTRecord instances' + ' from the pool\n' + ' change_log_ignore change the log' + ' target ignore list for a tracing layer\n' + ' targets to add to the ignore list can be separated by' + ' a comma.\n' + ' to remove a target from the ignore list, prepend it' + ' with a minus.\n\n$out'; + } + _debugOut('<<< DEBUG\n$out\n'); } on Exception catch (e, st) { _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); diff --git a/packages/veilid_support/example/pubspec.lock b/packages/veilid_support/example/pubspec.lock index 6ef291b..2e87d8c 100644 --- a/packages/veilid_support/example/pubspec.lock +++ b/packages/veilid_support/example/pubspec.lock @@ -650,7 +650,7 @@ packages: path: "../../../../veilid/veilid-flutter" relative: true source: path - version: "0.4.3" + version: "0.4.4" veilid_support: dependency: "direct main" description: diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index 8f88ce1..1d3fb89 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -7,6 +7,7 @@ import 'package:collection/collection.dart'; import 'package:equatable/equatable.dart'; import 'package:meta/meta.dart'; +import '../../../src/veilid_log.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart index 6281d6e..d8634c6 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -47,8 +47,16 @@ class _DHTLogRead implements DHTLogReadOperations { final chunks = Iterable.generate(length) .slices(kMaxDHTConcurrency) - .map((chunk) => chunk - .map((pos) async => get(pos + start, forceRefresh: forceRefresh))); + .map((chunk) => chunk.map((pos) async { + try { + return get(pos + start, forceRefresh: forceRefresh); + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); + rethrow; + } + })); for (final chunk in chunks) { final elems = await chunk.wait; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index bb27e04..d9f5df2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -248,7 +248,12 @@ class _DHTLogSpine { final headDelta = _ringDistance(newHead, oldHead); final tailDelta = _ringDistance(newTail, oldTail); if (headDelta > _positionLimit ~/ 2 || tailDelta > _positionLimit ~/ 2) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogSpine::_updateHead ' + '_head=$_head _tail=$_tail ' + 'oldHead=$oldHead oldTail=$oldTail ' + 'newHead=$newHead newTail=$newTail ' + 'headDelta=$headDelta tailDelta=$tailDelta ' + '_positionLimit=$_positionLimit'); } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart index 1b5c09f..8d34280 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart @@ -17,7 +17,8 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } final lookup = await _spine.lookupPosition(pos); if (lookup == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData( + '_DHTLogRead::tryWriteItem pos=$pos _spine.length=${_spine.length}'); } // Write item to the segment @@ -45,12 +46,14 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } final aLookup = await _spine.lookupPosition(aPos); if (aLookup == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos ' + '_spine.length=${_spine.length}'); } final bLookup = await _spine.lookupPosition(bPos); if (bLookup == null) { await aLookup.close(); - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos ' + '_spine.length=${_spine.length}'); } // Swap items in the segments @@ -65,7 +68,10 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { if (bItem.value == null) { final aItem = await aWrite.get(aLookup.pos); if (aItem == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData( + '_DHTLogWrite::swap aPos=$aPos bPos=$bPos ' + 'aLookup.pos=${aLookup.pos} bLookup.pos=${bLookup.pos} ' + '_spine.length=${_spine.length}'); } await sb.operateWriteEventual((bWrite) async { final success = await bWrite @@ -101,7 +107,9 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { await write.clear(); } else if (lookup.pos != write.length) { // We should always be appending at the length - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData( + '_DHTLogWrite::add lookup.pos=${lookup.pos} ' + 'write.length=${write.length}'); } return write.add(value); })); @@ -117,12 +125,16 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { final dws = DelayedWaitSet(); var success = true; - for (var valueIdx = 0; valueIdx < values.length;) { + for (var valueIdxIter = 0; valueIdxIter < values.length;) { + final valueIdx = valueIdxIter; final remaining = values.length - valueIdx; final lookup = await _spine.lookupPosition(insertPos + valueIdx); if (lookup == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogWrite::addAll ' + '_spine.length=${_spine.length}' + 'insertPos=$insertPos valueIdx=$valueIdx ' + 'values.length=${values.length} '); } final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos); @@ -137,16 +149,21 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { await write.clear(); } else if (lookup.pos != write.length) { // We should always be appending at the length - throw const DHTExceptionInvalidData(); + await write.truncate(lookup.pos); } - return write.addAll(sublistValues); + await write.addAll(sublistValues); + success = true; })); } on DHTExceptionOutdated { success = false; + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); } }); - valueIdx += sacount; + valueIdxIter += sacount; } await dws(); diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index 9027799..e3c9abe 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -246,11 +246,13 @@ class DHTRecordPool with TableDBBackedJson { /// Print children String debugChildren(TypedKey recordKey, {List? allDeps}) { allDeps ??= _collectChildrenInner(recordKey); + // Debugging // ignore: avoid_print var out = 'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n'; for (final dep in allDeps) { if (dep != recordKey) { + // Debugging // ignore: avoid_print out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n'; } @@ -270,32 +272,25 @@ class DHTRecordPool with TableDBBackedJson { break; } } - } else { - final now = Veilid.instance.now().value; - // Expired, process renewal if desired - for (final entry in _opened.entries) { - final openedKey = entry.key; - final openedRecordInfo = entry.value; - - if (openedKey == updateValueChange.key) { - // Renew watch state for each opened record - for (final rec in openedRecordInfo.records) { - // See if the watch had an expiration and if it has expired - // otherwise the renewal will keep the same parameters - final watchState = rec._watchState; - if (watchState != null) { - final exp = watchState.expiration; - if (exp != null && exp.value < now) { - // Has expiration, and it has expired, clear watch state - rec._watchState = null; - } - } - } - openedRecordInfo.shared.needsWatchStateUpdate = true; - break; - } - } } + // else { + + // XXX: should no longer be necessary + // // Remove watch state + // + // for (final entry in _opened.entries) { + // final openedKey = entry.key; + // final openedRecordInfo = entry.value; + + // if (openedKey == updateValueChange.key) { + // for (final rec in openedRecordInfo.records) { + // rec._watchState = null; + // } + // openedRecordInfo.shared.needsWatchStateUpdate = true; + // break; + // } + // } + //} } /// Log the current record allocations @@ -735,7 +730,6 @@ class DHTRecordPool with TableDBBackedJson { int? totalCount; Timestamp? maxExpiration; List? allSubkeys; - Timestamp? earliestRenewalTime; var noExpiration = false; var everySubkey = false; @@ -768,15 +762,6 @@ class DHTRecordPool with TableDBBackedJson { } else { everySubkey = true; } - final wsRenewalTime = ws.renewalTime; - if (wsRenewalTime != null) { - earliestRenewalTime = earliestRenewalTime == null - ? wsRenewalTime - : Timestamp( - value: (wsRenewalTime.value < earliestRenewalTime.value - ? wsRenewalTime.value - : earliestRenewalTime.value)); - } } } if (noExpiration) { @@ -790,25 +775,10 @@ class DHTRecordPool with TableDBBackedJson { } return _WatchState( - subkeys: allSubkeys, - expiration: maxExpiration, - count: totalCount, - renewalTime: earliestRenewalTime); - } - - static void _updateWatchRealExpirations(Iterable records, - Timestamp realExpiration, Timestamp renewalTime) { - for (final rec in records) { - final ws = rec._watchState; - if (ws != null) { - rec._watchState = _WatchState( - subkeys: ws.subkeys, - expiration: ws.expiration, - count: ws.count, - realExpiration: realExpiration, - renewalTime: renewalTime); - } - } + subkeys: allSubkeys, + expiration: maxExpiration, + count: totalCount, + ); } Future _watchStateChange( @@ -833,9 +803,9 @@ class DHTRecordPool with TableDBBackedJson { // Only try this once, if it doesn't succeed then it can just expire // on its own. try { - final cancelled = await dhtctx.cancelDHTWatch(openedRecordKey); + final stillActive = await dhtctx.cancelDHTWatch(openedRecordKey); - log('cancelDHTWatch: key=$openedRecordKey, cancelled=$cancelled, ' + log('cancelDHTWatch: key=$openedRecordKey, stillActive=$stillActive, ' 'debugNames=${openedRecordInfo.debugNames}'); openedRecordInfo.shared.unionWatchState = null; @@ -858,34 +828,20 @@ class DHTRecordPool with TableDBBackedJson { final subkeys = unionWatchState.subkeys?.toList(); final count = unionWatchState.count; final expiration = unionWatchState.expiration; - final now = veilid.now(); - final realExpiration = await dhtctx.watchDHTValues(openedRecordKey, + final active = await dhtctx.watchDHTValues(openedRecordKey, subkeys: unionWatchState.subkeys?.toList(), count: unionWatchState.count, - expiration: unionWatchState.expiration ?? - (_defaultWatchDurationSecs == null - ? null - : veilid.now().offset(TimestampDuration.fromMillis( - _defaultWatchDurationSecs! * 1000)))); + expiration: unionWatchState.expiration); - final expirationDuration = realExpiration.diff(now); - final renewalTime = now.offset(TimestampDuration( - value: expirationDuration.value * - BigInt.from(_watchRenewalNumerator) ~/ - BigInt.from(_watchRenewalDenominator))); - - log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, ' + log('watchDHTValues(active=$active): ' + 'key=$openedRecordKey, subkeys=$subkeys, ' 'count=$count, expiration=$expiration, ' - 'realExpiration=$realExpiration, ' - 'renewalTime=$renewalTime, ' 'debugNames=${openedRecordInfo.debugNames}'); // Update watch states with real expiration - if (realExpiration.value != BigInt.zero) { + if (active) { openedRecordInfo.shared.unionWatchState = unionWatchState; - _updateWatchRealExpirations( - openedRecordInfo.records, realExpiration, renewalTime); openedRecordInfo.shared.needsWatchStateUpdate = false; } } on VeilidAPIExceptionTimeout { @@ -944,22 +900,13 @@ class DHTRecordPool with TableDBBackedJson { /// Ticker to check watch state change requests Future tick() async => _mutex.protect(() async { // See if any opened records need watch state changes - final now = veilid.now(); for (final kv in _opened.entries) { final openedRecordKey = kv.key; final openedRecordInfo = kv.value; - var wantsWatchStateUpdate = + final wantsWatchStateUpdate = openedRecordInfo.shared.needsWatchStateUpdate; - // Check if we have reached renewal time for the watch - if (openedRecordInfo.shared.unionWatchState != null && - openedRecordInfo.shared.unionWatchState!.renewalTime != null && - now.value > - openedRecordInfo.shared.unionWatchState!.renewalTime!.value) { - wantsWatchStateUpdate = true; - } - if (wantsWatchStateUpdate) { // Update union watch state final unionWatchState = diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart index d1fb5d1..05b93b0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart @@ -1,9 +1,5 @@ part of 'dht_record_pool.dart'; -const int? _defaultWatchDurationSecs = null; // 600 -const int _watchRenewalNumerator = 4; -const int _watchRenewalDenominator = 5; - // DHT crypto domain const String _cryptoDomainDHT = 'dht'; @@ -14,21 +10,17 @@ const _sfListen = 'listen'; /// Watch state @immutable class _WatchState extends Equatable { - const _WatchState( - {required this.subkeys, - required this.expiration, - required this.count, - this.realExpiration, - this.renewalTime}); + const _WatchState({ + required this.subkeys, + required this.expiration, + required this.count, + }); final List? subkeys; final Timestamp? expiration; final int? count; - final Timestamp? realExpiration; - final Timestamp? renewalTime; @override - List get props => - [subkeys, expiration, count, realExpiration, renewalTime]; + List get props => [subkeys, expiration, count]; } /// Data shared amongst all DHTRecord instances diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index 8101a7a..ccf7d18 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -4,6 +4,7 @@ import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; import 'package:collection/collection.dart'; +import '../../../src/veilid_log.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 49659cd..b0cc41b 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -383,6 +383,24 @@ class _DHTShortArrayHead { // xxx: free list optimization here? } + /// Truncate index to a particular length + void truncate(int newLength) { + if (newLength >= _index.length) { + return; + } else if (newLength == 0) { + clearIndex(); + return; + } else if (newLength < 0) { + throw StateError('can not truncate to negative length'); + } + + final newIndex = _index.sublist(0, newLength); + final freed = _index.sublist(newLength); + + _index = newIndex; + _free.addAll(freed); + } + /// Validate the head from the DHT is properly formatted /// and calculate the free list from it while we're here List _makeFreeList( diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index ddfdedc..747a892 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -60,8 +60,16 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { final chunks = Iterable.generate(length) .slices(kMaxDHTConcurrency) - .map((chunk) => chunk - .map((pos) async => get(pos + start, forceRefresh: forceRefresh))); + .map((chunk) => chunk.map((pos) async { + try { + return get(pos + start, forceRefresh: forceRefresh); + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); + rethrow; + } + })); for (final chunk in chunks) { final elems = await chunk.wait; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index 51950f6..1705bc0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -9,6 +9,7 @@ abstract class DHTShortArrayWriteOperations DHTRandomWrite, DHTInsertRemove, DHTAdd, + DHTTruncate, DHTClear {} class _DHTShortArrayWrite extends _DHTShortArrayRead @@ -72,10 +73,16 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead final value = values[i]; final outSeqNum = outSeqNums[i]; dws.add((_) async { - final outValue = await lookup.record.tryWriteBytes(value, - subkey: lookup.recordSubkey, outSeqNum: outSeqNum); - if (outValue != null) { - success = false; + try { + final outValue = await lookup.record.tryWriteBytes(value, + subkey: lookup.recordSubkey, outSeqNum: outSeqNum); + if (outValue != null) { + success = false; + } + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); } }); } @@ -142,6 +149,11 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead _head.clearIndex(); } + @override + Future truncate(int newLength) async { + _head.truncate(newLength); + } + @override Future tryWriteItem(int pos, Uint8List newValue, {Output? output}) async { diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart index b17dbee..134f5fa 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart @@ -2,20 +2,32 @@ class DHTExceptionOutdated implements Exception { const DHTExceptionOutdated( [this.cause = 'operation failed due to newer dht value']); final String cause; + + @override + String toString() => 'DHTExceptionOutdated: $cause'; } class DHTExceptionInvalidData implements Exception { - const DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']); + const DHTExceptionInvalidData(this.cause); final String cause; + + @override + String toString() => 'DHTExceptionInvalidData: $cause'; } class DHTExceptionCancelled implements Exception { const DHTExceptionCancelled([this.cause = 'operation was cancelled']); final String cause; + + @override + String toString() => 'DHTExceptionCancelled: $cause'; } class DHTExceptionNotAvailable implements Exception { const DHTExceptionNotAvailable( [this.cause = 'request could not be completed at this time']); final String cause; + + @override + String toString() => 'DHTExceptionNotAvailable: $cause'; } diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart index 750c48e..939a5b3 100644 --- a/packages/veilid_support/lib/src/persistent_queue.dart +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -7,6 +7,7 @@ import 'package:protobuf/protobuf.dart'; import 'config.dart'; import 'table_db.dart'; +import 'veilid_log.dart'; class PersistentQueue with TableDBBackedFromBuffer> { @@ -46,7 +47,7 @@ class PersistentQueue } } - Future _init(_) async { + Future _init(Completer _) async { // Start the processor unawaited(Future.delayed(Duration.zero, () async { await _initWait(); @@ -182,10 +183,28 @@ class PersistentQueue @override IList valueFromBuffer(Uint8List bytes) { - final reader = CodedBufferReader(bytes); var out = IList(); - while (!reader.isAtEnd()) { - out = out.add(_fromBuffer(reader.readBytesAsView())); + try { + final reader = CodedBufferReader(bytes); + while (!reader.isAtEnd()) { + final bytes = reader.readBytesAsView(); + try { + final item = _fromBuffer(bytes); + out = out.add(item); + } on Exception catch (e, st) { + veilidLoggy.debug( + 'Dropping invalid item from persistent queue: $bytes\n' + 'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n', + e, + st); + } + } + } on Exception catch (e, st) { + veilidLoggy.debug( + 'Dropping remainder of invalid persistent queue\n' + 'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n', + e, + st); } return out; } diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index c1d54cf..8b59336 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -9,6 +9,7 @@ import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; import '../veilid_support.dart'; +import 'veilid_log.dart'; @immutable class TableDBArrayUpdate extends Equatable { @@ -262,7 +263,16 @@ class _TableDBArrayBase { final dws = DelayedWaitSet(); while (batchLen > 0) { final entry = await _getIndexEntry(pos); - dws.add((_) async => (await _loadEntry(entry))!); + dws.add((_) async { + try { + return (await _loadEntry(entry))!; + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); + rethrow; + } + }); pos++; batchLen--; } diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index 0c8ca3d..11c2db6 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -726,7 +726,7 @@ packages: path: "../../../veilid/veilid-flutter" relative: true source: path - version: "0.4.3" + version: "0.4.4" vm_service: dependency: transitive description: