diff --git a/devtools_options.yaml b/devtools_options.yaml index 5c27c3e..7093540 100644 --- a/devtools_options.yaml +++ b/devtools_options.yaml @@ -1,2 +1,3 @@ extensions: - - provider: true \ No newline at end of file + - provider: true + - shared_preferences: true \ No newline at end of file diff --git a/ios/Podfile.lock b/ios/Podfile.lock index d943756..2528d2d 100644 --- a/ios/Podfile.lock +++ b/ios/Podfile.lock @@ -168,7 +168,7 @@ SPEC CHECKSUMS: sqflite_darwin: 20b2a3a3b70e43edae938624ce550a3cbf66a3d0 system_info_plus: 555ce7047fbbf29154726db942ae785c29211740 url_launcher_ios: 694010445543906933d732453a59da0a173ae33d - veilid: b3b9418ae6b083e662396bfa2c635fb115c8510e + veilid: 3ce560a4f2b568a77a9fd5e23090f2fa97581019 PODFILE CHECKSUM: c8bf5b16c34712d5790b0b8d2472cc66ac0a8487 diff --git a/lib/chat/cubits/chat_component_cubit.dart b/lib/chat/cubits/chat_component_cubit.dart index 7ea9e95..6112384 100644 --- a/lib/chat/cubits/chat_component_cubit.dart +++ b/lib/chat/cubits/chat_component_cubit.dart @@ -15,6 +15,7 @@ import '../../account_manager/account_manager.dart'; import '../../contacts/contacts.dart'; import '../../conversation/conversation.dart'; import '../../proto/proto.dart' as proto; +import '../../tools/tools.dart'; import '../models/chat_component_state.dart'; import '../models/message_state.dart'; import '../models/window_state.dart'; @@ -383,13 +384,13 @@ class ChatComponentCubit extends Cubit { if (chatMessage == null) { continue; } - chatMessages.insert(0, chatMessage); if (!tsSet.add(chatMessage.id)) { - // ignore: avoid_print - print('duplicate id found: ${chatMessage.id}:\n' - 'Messages:\n${messagesState.window}\n' - 'ChatMessages:\n$chatMessages'); - assert(false, 'should not have duplicate id'); + log.error('duplicate id found: ${chatMessage.id}' + // '\nMessages:\n${messagesState.window}' + // '\nChatMessages:\n$chatMessages' + ); + } else { + chatMessages.insert(0, chatMessage); } } return currentState.copyWith( diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart index b15d92c..73dddc6 100644 --- a/lib/chat/cubits/reconciliation/author_input_queue.dart +++ b/lib/chat/cubits/reconciliation/author_input_queue.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:math'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; @@ -6,106 +7,127 @@ import '../../../proto/proto.dart' as proto; import '../../../tools/tools.dart'; import 'author_input_source.dart'; import 'message_integrity.dart'; -import 'output_position.dart'; class AuthorInputQueue { AuthorInputQueue._({ required TypedKey author, required AuthorInputSource inputSource, - required OutputPosition? outputPosition, + required int inputPosition, + required proto.Message? previousMessage, required void Function(Object, StackTrace?) onError, required MessageIntegrity messageIntegrity, }) : _author = author, _onError = onError, _inputSource = inputSource, - _outputPosition = outputPosition, - _lastMessage = outputPosition?.message.content, + _previousMessage = previousMessage, _messageIntegrity = messageIntegrity, - _currentPosition = inputSource.currentWindow.last; + _inputPosition = inputPosition; static Future create({ required TypedKey author, required AuthorInputSource inputSource, - required OutputPosition? outputPosition, + required proto.Message? previousMessage, required void Function(Object, StackTrace?) onError, }) async { + // Get ending input position + final inputPosition = await inputSource.getTailPosition() - 1; + + // Create an input queue for the input source final queue = AuthorInputQueue._( author: author, inputSource: inputSource, - outputPosition: outputPosition, + inputPosition: inputPosition, + previousMessage: previousMessage, onError: onError, messageIntegrity: await MessageIntegrity.create(author: author)); - if (!await queue._findStartOfWork()) { + + // Rewind the queue's 'inputPosition' to the first unreconciled message + if (!await queue._rewindInputToAfterLastMessage()) { return null; } + return queue; } //////////////////////////////////////////////////////////////////////////// // Public interface - // Check if there are no messages left in this queue to reconcile - bool get isDone => _isDone; + /// Get the input source for this queue + AuthorInputSource get inputSource => _inputSource; - // Get the current message that needs reconciliation - proto.Message? get current => _currentMessage; - - // Get the earliest output position to start inserting - OutputPosition? get outputPosition => _outputPosition; - - // Get the author of this queue + /// Get the author of this queue TypedKey get author => _author; - // Remove a reconciled message and move to the next message - // Returns true if there is more work to do - Future consume() async { - if (_isDone) { + /// Get the current message that needs reconciliation + Future getCurrentMessage() async { + try { + // if we have a current message already, return it + if (_currentMessage != null) { + return _currentMessage; + } + + // Get the window + final currentWindow = await _updateWindow(clampInputPosition: false); + if (currentWindow == null) { + return null; + } + final currentElement = + currentWindow.elements[_inputPosition - currentWindow.firstPosition]; + return _currentMessage = currentElement.value; + // Catch everything so we can avoid ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + log.error('Exception getting current message: $e:\n$st\n'); + _currentMessage = null; + return null; + } + } + + /// Remove a reconciled message and move to the next message + /// Returns true if there is more work to do + Future advance() async { + final currentMessage = await getCurrentMessage(); + if (currentMessage == null) { return false; } - while (true) { - _lastMessage = _currentMessage; - _currentPosition++; + // Move current message to previous + _previousMessage = _currentMessage; + _currentMessage = null; + + while (true) { + // Advance to next position + _inputPosition++; // Get more window if we need to - if (!await _updateWindow()) { - // Window is not available so this queue can't work right now - _isDone = true; + final currentMessage = await getCurrentMessage(); + if (currentMessage == null) { return false; } - final nextMessage = _inputSource.currentWindow - .elements[_currentPosition - _inputSource.currentWindow.first]; - // Drop the 'offline' elements because we don't reconcile - // anything until it has been confirmed to be committed to the DHT - // if (nextMessage.isOffline) { - // continue; - // } - - if (_lastMessage != null) { + if (_previousMessage != null) { // Ensure the timestamp is not moving backward - if (nextMessage.value.timestamp < _lastMessage!.timestamp) { - log.warning('timestamp backward: ${nextMessage.value.timestamp}' - ' < ${_lastMessage!.timestamp}'); + if (currentMessage.timestamp < _previousMessage!.timestamp) { + log.warning('timestamp backward: ${currentMessage.timestamp}' + ' < ${_previousMessage!.timestamp}'); continue; } } // Verify the id chain for the message - final matchId = await _messageIntegrity.generateMessageId(_lastMessage); - if (matchId.compare(nextMessage.value.idBytes) != 0) { - log.warning( - 'id chain invalid: $matchId != ${nextMessage.value.idBytes}'); + final matchId = + await _messageIntegrity.generateMessageId(_previousMessage); + if (matchId.compare(currentMessage.idBytes) != 0) { + log.warning('id chain invalid: $matchId != ${currentMessage.idBytes}'); continue; } // Verify the signature for the message - if (!await _messageIntegrity.verifyMessage(nextMessage.value)) { - log.warning('invalid message signature: ${nextMessage.value}'); + if (!await _messageIntegrity.verifyMessage(currentMessage)) { + log.warning('invalid message signature: $currentMessage'); continue; } - _currentMessage = nextMessage.value; break; } return true; @@ -114,106 +136,166 @@ class AuthorInputQueue { //////////////////////////////////////////////////////////////////////////// // Internal implementation - // Walk backward from the tail of the input queue to find the first - // message newer than our last reconciled message from this author - // Returns false if no work is needed - Future _findStartOfWork() async { + /// Walk backward from the tail of the input queue to find the first + /// message newer than our last reconciled message from this author + /// Returns false if no work is needed + Future _rewindInputToAfterLastMessage() async { // Iterate windows over the inputSource + InputWindow? currentWindow; outer: while (true) { + // Get more window if we need to + currentWindow = await _updateWindow(clampInputPosition: true); + if (currentWindow == null) { + // Window is not available or things are empty so this + // queue can't work right now + return false; + } + // Iterate through current window backward - for (var i = _inputSource.currentWindow.elements.length - 1; - i >= 0 && _currentPosition >= 0; - i--, _currentPosition--) { - final elem = _inputSource.currentWindow.elements[i]; + for (var i = currentWindow.elements.length - 1; + i >= 0 && _inputPosition >= 0; + i--, _inputPosition--) { + final elem = currentWindow.elements[i]; // If we've found an input element that is older or same time as our // last reconciled message for this author, or we find the message // itself then we stop - if (_lastMessage != null) { + if (_previousMessage != null) { if (elem.value.authorUniqueIdBytes - .compare(_lastMessage!.authorUniqueIdBytes) == + .compare(_previousMessage!.authorUniqueIdBytes) == 0 || - elem.value.timestamp <= _lastMessage!.timestamp) { + elem.value.timestamp <= _previousMessage!.timestamp) { break outer; } } } // If we're at the beginning of the inputSource then we stop - if (_currentPosition < 0) { + if (_inputPosition < 0) { break; } - - // Get more window if we need to - if (!await _updateWindow()) { - // Window is not available or things are empty so this - // queue can't work right now - _isDone = true; - return false; - } } - // _currentPosition points to either before the input source starts + // _inputPosition points to either before the input source starts // or the position of the previous element. We still need to set the // _currentMessage to the previous element so consume() can compare // against it if we can. - if (_currentPosition >= 0) { - _currentMessage = _inputSource.currentWindow - .elements[_currentPosition - _inputSource.currentWindow.first].value; + if (_inputPosition >= 0) { + _currentMessage = currentWindow + .elements[_inputPosition - currentWindow.firstPosition].value; } - // After this consume(), the currentPosition and _currentMessage should + // After this advance(), the _inputPosition and _currentMessage should // be equal to the first message to process and the current window to - // process should not be empty - return consume(); + // process should not be empty if there is work to do + return advance(); } - // Slide the window toward the current position and load the batch around it - Future _updateWindow() async { + /// Slide the window toward the current position and load the batch around it + Future _updateWindow({required bool clampInputPosition}) async { + final inputTailPosition = await _inputSource.getTailPosition(); + if (inputTailPosition == 0) { + return null; + } + + // Handle out-of-range input position + if (clampInputPosition) { + _inputPosition = min(max(_inputPosition, 0), inputTailPosition - 1); + } else if (_inputPosition < 0 || _inputPosition >= inputTailPosition) { + return null; + } + // Check if we are still in the window - if (_currentPosition >= _inputSource.currentWindow.first && - _currentPosition <= _inputSource.currentWindow.last) { - return true; + final currentWindow = _currentWindow; + + int firstPosition; + int lastPosition; + if (currentWindow != null) { + firstPosition = currentWindow.firstPosition; + lastPosition = currentWindow.lastPosition; + + // Slide the window if we need to + if (_inputPosition >= firstPosition && _inputPosition <= lastPosition) { + return currentWindow; + } else if (_inputPosition < firstPosition) { + // Slide it backward, current position is now last + firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0); + lastPosition = _inputPosition; + } else if (_inputPosition > lastPosition) { + // Slide it forward, current position is now first + firstPosition = _inputPosition; + lastPosition = + min((_inputPosition + _maxWindowLength) - 1, inputTailPosition - 1); + } + } else { + // need a new window, start with the input position at the end + lastPosition = _inputPosition; + firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0); } // Get another input batch futher back - final avOk = - await _inputSource.updateWindow(_currentPosition, _maxWindowLength); + final avCurrentWindow = await _inputSource.getWindow( + firstPosition, lastPosition - firstPosition + 1); - final asErr = avOk.asError; + final asErr = avCurrentWindow.asError; if (asErr != null) { _onError(asErr.error, asErr.stackTrace); - return false; + _currentWindow = null; + return null; } - final asLoading = avOk.asLoading; + final asLoading = avCurrentWindow.asLoading; if (asLoading != null) { - // xxx: no need to block the cubit here for this - // xxx: might want to switch to a 'busy' state though - // xxx: to let the messages view show a spinner at the bottom - // xxx: while we reconcile... - // emit(const AsyncValue.loading()); - return false; + _currentWindow = null; + return null; } - return avOk.asData!.value; + + final nextWindow = avCurrentWindow.asData!.value; + if (nextWindow == null || nextWindow.length == 0) { + _currentWindow = null; + return null; + } + + // Handle out-of-range input position + // Doing this again because getWindow is allowed to return a smaller + // window than the one requested, possibly due to DHT consistency + // fluctuations and race conditions + if (clampInputPosition) { + _inputPosition = min(max(_inputPosition, nextWindow.firstPosition), + nextWindow.lastPosition); + } else if (_inputPosition < nextWindow.firstPosition || + _inputPosition > nextWindow.lastPosition) { + return null; + } + + return _currentWindow = nextWindow; } //////////////////////////////////////////////////////////////////////////// + /// The author of this messages in the input source final TypedKey _author; + + /// The input source we're pulling messages from final AuthorInputSource _inputSource; - final OutputPosition? _outputPosition; + + /// What to call if an error happens final void Function(Object, StackTrace?) _onError; + + /// The message integrity validator final MessageIntegrity _messageIntegrity; - // The last message we've consumed - proto.Message? _lastMessage; - // The current position in the input log that we are looking at - int _currentPosition; - // The current message we're looking at - proto.Message? _currentMessage; - // If we have reached the end - bool _isDone = false; + /// The last message we reconciled/output + proto.Message? _previousMessage; - // Desired maximum window length + /// The current message we're looking at + proto.Message? _currentMessage; + + /// The current position in the input source that we are looking at + int _inputPosition; + + /// The current input window from the InputSource; + InputWindow? _currentWindow; + + /// Desired maximum window length static const int _maxWindowLength = 256; } diff --git a/lib/chat/cubits/reconciliation/author_input_source.dart b/lib/chat/cubits/reconciliation/author_input_source.dart index 0bd1afb..f974dae 100644 --- a/lib/chat/cubits/reconciliation/author_input_source.dart +++ b/lib/chat/cubits/reconciliation/author_input_source.dart @@ -9,64 +9,68 @@ import '../../../proto/proto.dart' as proto; @immutable class InputWindow { - const InputWindow( - {required this.elements, required this.first, required this.last}); + const InputWindow({required this.elements, required this.firstPosition}) + : lastPosition = firstPosition + elements.length - 1, + isEmpty = elements.length == 0, + length = elements.length; + final IList> elements; - final int first; - final int last; + final int firstPosition; + final int lastPosition; + final bool isEmpty; + final int length; } class AuthorInputSource { - AuthorInputSource.fromCubit( - {required DHTLogStateData cubitState, - required this.cubit}) { - _currentWindow = InputWindow( - elements: cubitState.window, - first: (cubitState.windowTail - cubitState.window.length) % - cubitState.length, - last: (cubitState.windowTail - 1) % cubitState.length); - } + AuthorInputSource.fromDHTLog(DHTLog dhtLog) : _dhtLog = dhtLog; //////////////////////////////////////////////////////////////////////////// - InputWindow get currentWindow => _currentWindow; + Future getTailPosition() async => + _dhtLog.operate((reader) async => reader.length); - Future> updateWindow( - int currentPosition, int windowLength) async => - cubit.operate((reader) async { - // See if we're beyond the input source - if (currentPosition < 0 || currentPosition >= reader.length) { - return const AsyncValue.data(false); - } - - // Slide the window if we need to - var first = _currentWindow.first; - var last = _currentWindow.last; - if (currentPosition < first) { - // Slide it backward, current position is now last - first = max((currentPosition - windowLength) + 1, 0); - last = currentPosition; - } else if (currentPosition > last) { - // Slide it forward, current position is now first - first = currentPosition; - last = min((currentPosition + windowLength) - 1, reader.length - 1); - } else { - return const AsyncValue.data(true); + Future> getWindow( + int startPosition, int windowLength) async => + _dhtLog.operate((reader) async { + // Don't allow negative length + if (windowLength <= 0) { + return const AsyncValue.data(null); } + // Trim if we're beyond input source + var endPosition = startPosition + windowLength - 1; + startPosition = max(startPosition, 0); + endPosition = max(endPosition, 0); // Get another input batch futher back - final nextWindow = await cubit.loadElementsFromReader( - reader, last + 1, (last + 1) - first); - if (nextWindow == null) { - return const AsyncValue.loading(); + try { + Set? offlinePositions; + if (_dhtLog.writer != null) { + offlinePositions = await reader.getOfflinePositions(); + } + + final messages = await reader.getRangeProtobuf( + proto.Message.fromBuffer, startPosition, + length: endPosition - startPosition + 1); + if (messages == null) { + return const AsyncValue.loading(); + } + + final elements = messages.indexed + .map((x) => OnlineElementState( + value: x.$2, + isOffline: offlinePositions?.contains(x.$1 + startPosition) ?? + false)) + .toIList(); + + final window = + InputWindow(elements: elements, firstPosition: startPosition); + + return AsyncValue.data(window); + } on Exception catch (e, st) { + return AsyncValue.error(e, st); } - _currentWindow = - InputWindow(elements: nextWindow, first: first, last: last); - return const AsyncValue.data(true); }); //////////////////////////////////////////////////////////////////////////// - final DHTLogCubit cubit; - - late InputWindow _currentWindow; + final DHTLog _dhtLog; } diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart index f0b8c4c..683b46d 100644 --- a/lib/chat/cubits/reconciliation/message_reconciliation.dart +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -6,6 +6,7 @@ import 'package:sorted_list/sorted_list.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; +import '../../../tools/tools.dart'; import 'author_input_queue.dart'; import 'author_input_source.dart'; import 'output_position.dart'; @@ -19,96 +20,152 @@ class MessageReconciliation { //////////////////////////////////////////////////////////////////////////// - void reconcileMessages( - TypedKey author, - DHTLogStateData inputMessagesCubitState, - DHTLogCubit inputMessagesCubit) { - if (inputMessagesCubitState.window.isEmpty) { - return; - } + void addInputSourceFromDHTLog(TypedKey author, DHTLog inputMessagesDHTLog) { + _inputSources[author] = AuthorInputSource.fromDHTLog(inputMessagesDHTLog); + } - _inputSources[author] = AuthorInputSource.fromCubit( - cubitState: inputMessagesCubitState, cubit: inputMessagesCubit); + void reconcileMessages(TypedKey? author) { + // xxx: can we use 'author' here to optimize _updateAuthorInputQueues? singleFuture(this, onError: _onError, () async { - // Take entire list of input sources we have currently and process them - final inputSources = _inputSources; - _inputSources = {}; - - final inputFuts = >[]; - for (final kv in inputSources.entries) { - final author = kv.key; - final inputSource = kv.value; - inputFuts - .add(_enqueueAuthorInput(author: author, inputSource: inputSource)); - } - final inputQueues = await inputFuts.wait; - - // Make this safe to cast by removing inputs that were rejected or empty - inputQueues.removeNulls(); + // Update queues + final activeInputQueues = await _updateAuthorInputQueues(); // Process all input queues together await _outputCubit .operate((reconciledArray) async => _reconcileInputQueues( reconciledArray: reconciledArray, - inputQueues: inputQueues.cast(), + activeInputQueues: activeInputQueues, )); }); } //////////////////////////////////////////////////////////////////////////// + // Prepare author input queues by removing dead ones and adding new ones + // Queues that are empty are not added until they have something in them + // Active input queues with a current message are returned in a list + Future> _updateAuthorInputQueues() async { + // Remove any dead input queues + final deadQueues = []; + for (final author in _inputQueues.keys) { + if (!_inputSources.containsKey(author)) { + deadQueues.add(author); + } + } + for (final author in deadQueues) { + _inputQueues.remove(author); + _outputPositions.remove(author); + } + + await _outputCubit.operate((outputArray) async { + final dws = DelayedWaitSet(); + + for (final kv in _inputSources.entries) { + final author = kv.key; + final inputSource = kv.value; + + final iqExisting = _inputQueues[author]; + if (iqExisting == null || iqExisting.inputSource != inputSource) { + dws.add((_) async { + try { + await _enqueueAuthorInput( + author: author, + inputSource: inputSource, + outputArray: outputArray); + // Catch everything so we can avoid ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + log.error('Exception updating author input queue: $e:\n$st\n'); + _inputQueues.remove(author); + _outputPositions.remove(author); + } + }); + } + } + + await dws(); + }); + + // Get the active input queues + final activeInputQueues = await _inputQueues.entries + .map((entry) async { + if (await entry.value.getCurrentMessage() != null) { + return entry.value; + } else { + return null; + } + }) + .toList() + .wait + ..removeNulls(); + + return activeInputQueues.cast(); + } + // Set up a single author's message reconciliation - Future _enqueueAuthorInput( + Future _enqueueAuthorInput( {required TypedKey author, - required AuthorInputSource inputSource}) async { + required AuthorInputSource inputSource, + required TableDBArrayProtobuf + outputArray}) async { // Get the position of our most recent reconciled message from this author - final outputPosition = await _findLastOutputPosition(author: author); + final outputPosition = + await _findLastOutputPosition(author: author, outputArray: outputArray); // Find oldest message we have not yet reconciled final inputQueue = await AuthorInputQueue.create( author: author, inputSource: inputSource, - outputPosition: outputPosition, + previousMessage: outputPosition?.message.content, onError: _onError, ); - return inputQueue; + + if (inputQueue != null) { + _inputQueues[author] = inputQueue; + _outputPositions[author] = outputPosition; + } else { + _inputQueues.remove(author); + _outputPositions.remove(author); + } } // Get the position of our most recent reconciled message from this author // XXX: For a group chat, this should find when the author // was added to the membership so we don't just go back in time forever Future _findLastOutputPosition( - {required TypedKey author}) async => - _outputCubit.operate((arr) async { - var pos = arr.length - 1; - while (pos >= 0) { - final message = await arr.get(pos); - if (message.content.author.toVeilid() == author) { - return OutputPosition(message, pos); - } - pos--; - } - return null; - }); + {required TypedKey author, + required TableDBArrayProtobuf + outputArray}) async { + var pos = outputArray.length - 1; + while (pos >= 0) { + final message = await outputArray.get(pos); + if (message.content.author.toVeilid() == author) { + return OutputPosition(message, pos); + } + pos--; + } + return null; + } // Process a list of author input queues and insert their messages // into the output array, performing validation steps along the way Future _reconcileInputQueues({ required TableDBArrayProtobuf reconciledArray, - required List inputQueues, + required List activeInputQueues, }) async { - // Ensure queues all have something to do - inputQueues.removeWhere((q) => q.isDone); - if (inputQueues.isEmpty) { + // Ensure we have active queues to process + if (activeInputQueues.isEmpty) { return; } // Sort queues from earliest to latest and then by author // to ensure a deterministic insert order - inputQueues.sort((a, b) { - final acmp = a.outputPosition?.pos ?? -1; - final bcmp = b.outputPosition?.pos ?? -1; + activeInputQueues.sort((a, b) { + final aout = _outputPositions[a.author]; + final bout = _outputPositions[b.author]; + final acmp = aout?.pos ?? -1; + final bcmp = bout?.pos ?? -1; if (acmp == bcmp) { return a.author.toString().compareTo(b.author.toString()); } @@ -116,21 +173,28 @@ class MessageReconciliation { }); // Start at the earliest position we know about in all the queues - var currentOutputPosition = inputQueues.first.outputPosition; + var currentOutputPosition = + _outputPositions[activeInputQueues.first.author]; final toInsert = SortedList(proto.MessageExt.compareTimestamp); - while (inputQueues.isNotEmpty) { + while (activeInputQueues.isNotEmpty) { // Get up to '_maxReconcileChunk' of the items from the queues // that we can insert at this location bool added; do { added = false; - var someQueueEmpty = false; - for (final inputQueue in inputQueues) { - final inputCurrent = inputQueue.current!; + + final emptyQueues = {}; + for (final inputQueue in activeInputQueues) { + final inputCurrent = await inputQueue.getCurrentMessage(); + if (inputCurrent == null) { + log.error('Active input queue did not have a current message: ' + '${inputQueue.author}'); + continue; + } if (currentOutputPosition == null || inputCurrent.timestamp < currentOutputPosition.message.content.timestamp) { @@ -138,16 +202,14 @@ class MessageReconciliation { added = true; // Advance this queue - if (!await inputQueue.consume()) { - // Queue is empty now, run a queue purge - someQueueEmpty = true; + if (!await inputQueue.advance()) { + // Mark queue as empty for removal + emptyQueues.add(inputQueue); } } } - // Remove empty queues now that we're done iterating - if (someQueueEmpty) { - inputQueues.removeWhere((q) => q.isDone); - } + // Remove finished queues now that we're done iterating + activeInputQueues.removeWhere(emptyQueues.contains); if (toInsert.length >= _maxReconcileChunk) { break; @@ -165,9 +227,27 @@ class MessageReconciliation { ..content = message) .toList(); - await reconciledArray.insertAll( - currentOutputPosition?.pos ?? reconciledArray.length, - reconciledInserts); + // Figure out where to insert the reconciled messages + final insertPos = currentOutputPosition?.pos ?? reconciledArray.length; + + // Insert them all at once + await reconciledArray.insertAll(insertPos, reconciledInserts); + + // Update output positions for input queues + final updatePositions = _outputPositions.keys.toSet(); + var outputPos = insertPos + reconciledInserts.length; + for (final inserted in reconciledInserts.reversed) { + if (updatePositions.isEmpty) { + // Last seen positions already recorded for each active author + break; + } + outputPos--; + final author = inserted.content.author.toVeilid(); + if (updatePositions.contains(author)) { + _outputPositions[author] = OutputPosition(inserted, outputPos); + updatePositions.remove(author); + } + } toInsert.clear(); } else { @@ -187,7 +267,9 @@ class MessageReconciliation { //////////////////////////////////////////////////////////////////////////// - Map _inputSources = {}; + final Map _inputSources = {}; + final Map _inputQueues = {}; + final Map _outputPositions = {}; final TableDBArrayProtobufCubit _outputCubit; final void Function(Object, StackTrace?) _onError; diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index b4e77db..66032f6 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -77,8 +77,8 @@ class SingleContactMessagesCubit extends Cubit { await _sentSubscription?.cancel(); await _rcvdSubscription?.cancel(); await _reconciledSubscription?.cancel(); - await _sentMessagesCubit?.close(); - await _rcvdMessagesCubit?.close(); + await _sentMessagesDHTLog?.close(); + await _rcvdMessagesDHTLog?.close(); await _reconciledMessagesCubit?.close(); // If the local conversation record is gone, then delete the reconciled @@ -100,8 +100,8 @@ class SingleContactMessagesCubit extends Cubit { key: _remoteConversationRecordKey.toString(), fromBuffer: proto.Message.fromBuffer, closure: _processUnsentMessages, - onError: (e, sp) { - log.error('Exception while processing unsent messages: $e\n$sp\n'); + onError: (e, st) { + log.error('Exception while processing unsent messages: $e\n$st\n'); }); // Make crypto @@ -111,10 +111,10 @@ class SingleContactMessagesCubit extends Cubit { await _initReconciledMessagesCubit(); // Local messages key - await _initSentMessagesCubit(); + await _initSentMessagesDHTLog(); // Remote messages key - await _initRcvdMessagesCubit(); + await _initRcvdMessagesDHTLog(); // Command execution background process _commandRunnerFut = Future.delayed(Duration.zero, _commandRunner); @@ -129,39 +129,40 @@ class SingleContactMessagesCubit extends Cubit { } // Open local messages key - Future _initSentMessagesCubit() async { + Future _initSentMessagesDHTLog() async { final writer = _accountInfo.identityWriter; - _sentMessagesCubit = DHTLogCubit( - open: () async => DHTLog.openWrite(_localMessagesRecordKey, writer, + final sentMessagesDHTLog = + await DHTLog.openWrite(_localMessagesRecordKey, writer, debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::' 'SentMessages', parent: _localConversationRecordKey, - crypto: _conversationCrypto), - decodeElement: proto.Message.fromBuffer); - _sentSubscription = - _sentMessagesCubit!.stream.listen(_updateSentMessagesState); - _updateSentMessagesState(_sentMessagesCubit!.state); + crypto: _conversationCrypto); + _sentSubscription = await sentMessagesDHTLog.listen(_updateSentMessages); + + _sentMessagesDHTLog = sentMessagesDHTLog; + _reconciliation.addInputSourceFromDHTLog( + _accountInfo.identityTypedPublicKey, sentMessagesDHTLog); } // Open remote messages key - Future _initRcvdMessagesCubit() async { + Future _initRcvdMessagesDHTLog() async { // Don't bother if we don't have a remote messages record key yet if (_remoteMessagesRecordKey == null) { return; } // Open new cubit if one is desired - _rcvdMessagesCubit = DHTLogCubit( - open: () async => DHTLog.openRead(_remoteMessagesRecordKey!, - debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' - 'RcvdMessages', - parent: _remoteConversationRecordKey, - crypto: _conversationCrypto), - decodeElement: proto.Message.fromBuffer); - _rcvdSubscription = - _rcvdMessagesCubit!.stream.listen(_updateRcvdMessagesState); - _updateRcvdMessagesState(_rcvdMessagesCubit!.state); + final rcvdMessagesDHTLog = await DHTLog.openRead(_remoteMessagesRecordKey!, + debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' + 'RcvdMessages', + parent: _remoteConversationRecordKey, + crypto: _conversationCrypto); + _rcvdSubscription = await rcvdMessagesDHTLog.listen(_updateRcvdMessages); + + _rcvdMessagesDHTLog = rcvdMessagesDHTLog; + _reconciliation.addInputSourceFromDHTLog( + _remoteIdentityPublicKey, rcvdMessagesDHTLog); } Future updateRemoteMessagesRecordKey( @@ -175,17 +176,17 @@ class SingleContactMessagesCubit extends Cubit { return; } - // Close existing cubit if we have one - final rcvdMessagesCubit = _rcvdMessagesCubit; - _rcvdMessagesCubit = null; + // Close existing DHTLog if we have one + final rcvdMessagesDHTLog = _rcvdMessagesDHTLog; + _rcvdMessagesDHTLog = null; _remoteMessagesRecordKey = null; await _rcvdSubscription?.cancel(); _rcvdSubscription = null; - await rcvdMessagesCubit?.close(); + await rcvdMessagesDHTLog?.close(); - // Init the new cubit if we should + // Init the new DHTLog if we should _remoteMessagesRecordKey = remoteMessagesRecordKey; - await _initRcvdMessagesCubit(); + await _initRcvdMessagesDHTLog(); }); } @@ -275,30 +276,15 @@ class SingleContactMessagesCubit extends Cubit { //////////////////////////////////////////////////////////////////////////// // Internal implementation - // Called when the sent messages cubit gets a change + // Called when the sent messages DHTLog gets a change // This will re-render when messages are sent from another machine - void _updateSentMessagesState(DHTLogBusyState avmessages) { - final sentMessages = avmessages.state.asData?.value; - if (sentMessages == null) { - return; - } - - _reconciliation.reconcileMessages( - _accountInfo.identityTypedPublicKey, sentMessages, _sentMessagesCubit!); - - // Update the view - _renderState(); + void _updateSentMessages(DHTLogUpdate upd) { + _reconciliation.reconcileMessages(_accountInfo.identityTypedPublicKey); } - // Called when the received messages cubit gets a change - void _updateRcvdMessagesState(DHTLogBusyState avmessages) { - final rcvdMessages = avmessages.state.asData?.value; - if (rcvdMessages == null) { - return; - } - - _reconciliation.reconcileMessages( - _remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!); + // Called when the received messages DHTLog gets a change + void _updateRcvdMessages(DHTLogUpdate upd) { + _reconciliation.reconcileMessages(_remoteIdentityPublicKey); } // Called when the reconciled messages window gets a change @@ -310,14 +296,11 @@ class SingleContactMessagesCubit extends Cubit { Future _processMessageToSend( proto.Message message, proto.Message? previousMessage) async { - // Get the previous message if we don't have one - previousMessage ??= await _sentMessagesCubit!.operate((r) async => - r.length == 0 - ? null - : await r.getProtobuf(proto.Message.fromBuffer, r.length - 1)); - - message.id = - await _senderMessageIntegrity.generateMessageId(previousMessage); + // It's possible we had a signature from a previous + // operateAppendEventual attempt, so clear it and make a new message id too + message + ..clearSignature() + ..id = await _senderMessageIntegrity.generateMessageId(previousMessage); // Now sign it await _senderMessageIntegrity.signMessage( @@ -326,26 +309,33 @@ class SingleContactMessagesCubit extends Cubit { // Async process to send messages in the background Future _processUnsentMessages(IList messages) async { - // Go through and assign ids to all the messages in order - proto.Message? previousMessage; - final processedMessages = messages.toList(); - for (final message in processedMessages) { - try { - await _processMessageToSend(message, previousMessage); - previousMessage = message; - } on Exception catch (e) { - log.error('Exception processing unsent message: $e'); - } - } - // _sendingMessages = messages; // _renderState(); try { - await _sentMessagesCubit!.operateAppendEventual((writer) => - writer.addAll(messages.map((m) => m.writeToBuffer()).toList())); - } on Exception catch (e) { - log.error('Exception appending unsent messages: $e'); + await _sentMessagesDHTLog!.operateAppendEventual((writer) async { + // Get the previous message if we have one + var previousMessage = writer.length == 0 + ? null + : await writer.getProtobuf( + proto.Message.fromBuffer, writer.length - 1); + + // Sign all messages + final processedMessages = messages.toList(); + for (final message in processedMessages) { + try { + await _processMessageToSend(message, previousMessage); + previousMessage = message; + } on Exception catch (e, st) { + log.error('Exception processing unsent message: $e:\n$st\n'); + } + } + final byteMessages = messages.map((m) => m.writeToBuffer()).toList(); + + return writer.addAll(byteMessages); + }); + } on Exception catch (e, st) { + log.error('Exception appending unsent messages: $e:\n$st\n'); } // _sendingMessages = const IList.empty(); @@ -353,16 +343,17 @@ class SingleContactMessagesCubit extends Cubit { // Produce a state for this cubit from the input cubits and queues void _renderState() { - // Get all reconciled messages + // Get all reconciled messages in the cubit window final reconciledMessages = _reconciledMessagesCubit?.state.state.asData?.value; - // Get all sent messages - final sentMessages = _sentMessagesCubit?.state.state.asData?.value; + + // Get all sent messages that are still offline + //final sentMessages = _sentMessagesDHTLog. //Get all items in the unsent queue //final unsentMessages = _unsentMessagesQueue.queue; // If we aren't ready to render a state, say we're loading - if (reconciledMessages == null || sentMessages == null) { + if (reconciledMessages == null) { emit(const AsyncLoading()); return; } @@ -373,11 +364,11 @@ class SingleContactMessagesCubit extends Cubit { // keyMapper: (x) => x.content.authorUniqueIdString, // values: reconciledMessages.windowElements, // ); - final sentMessagesMap = - IMap>.fromValues( - keyMapper: (x) => x.value.authorUniqueIdString, - values: sentMessages.window, - ); + // final sentMessagesMap = + // IMap>.fromValues( + // keyMapper: (x) => x.value.authorUniqueIdString, + // values: sentMessages.window, + // ); // final unsentMessagesMap = IMap.fromValues( // keyMapper: (x) => x.authorUniqueIdString, // values: unsentMessages, @@ -389,10 +380,12 @@ class SingleContactMessagesCubit extends Cubit { final isLocal = m.content.author.toVeilid() == _accountInfo.identityTypedPublicKey; final reconciledTimestamp = Timestamp.fromInt64(m.reconciledTime); - final sm = - isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null; - final sent = isLocal && sm != null; - final sentOffline = isLocal && sm != null && sm.isOffline; + //final sm = + //isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null; + //final sent = isLocal && sm != null; + //final sentOffline = isLocal && sm != null && sm.isOffline; + final sent = isLocal; + final sentOffline = false; // renderedElements.add(RenderStateElement( message: m.content, @@ -487,16 +480,16 @@ class SingleContactMessagesCubit extends Cubit { late final VeilidCrypto _conversationCrypto; late final MessageIntegrity _senderMessageIntegrity; - DHTLogCubit? _sentMessagesCubit; - DHTLogCubit? _rcvdMessagesCubit; + DHTLog? _sentMessagesDHTLog; + DHTLog? _rcvdMessagesDHTLog; TableDBArrayProtobufCubit? _reconciledMessagesCubit; late final MessageReconciliation _reconciliation; late final PersistentQueue _unsentMessagesQueue; // IList _sendingMessages = const IList.empty(); - StreamSubscription>? _sentSubscription; - StreamSubscription>? _rcvdSubscription; + StreamSubscription? _sentSubscription; + StreamSubscription? _rcvdSubscription; StreamSubscription>? _reconciledSubscription; final StreamController Function()> _commandController; diff --git a/lib/router/cubits/router_cubit.dart b/lib/router/cubits/router_cubit.dart index e5a2024..d651611 100644 --- a/lib/router/cubits/router_cubit.dart +++ b/lib/router/cubits/router_cubit.dart @@ -138,7 +138,8 @@ class RouterCubit extends Cubit { return null; case '/developer': return null; - // Otherwise, if there's no account, we need to go to the new account page. + // Otherwise, if there's no account, + // we need to go to the new account page. default: return state.hasAnyAccount ? null : '/new_account'; } diff --git a/lib/veilid_processor/views/developer.dart b/lib/veilid_processor/views/developer.dart index cb64d3d..0bfcc0a 100644 --- a/lib/veilid_processor/views/developer.dart +++ b/lib/veilid_processor/views/developer.dart @@ -65,8 +65,9 @@ class _DeveloperPageState extends State { } void _debugOut(String out) { + final sanitizedOut = out.replaceAll('\uFFFD', ''); final pen = AnsiPen()..cyan(bold: true); - final colorOut = pen(out); + final colorOut = pen(sanitizedOut); debugPrint(colorOut); globalDebugTerminal.write(colorOut.replaceAll('\n', '\r\n')); } @@ -124,7 +125,21 @@ class _DeveloperPageState extends State { _debugOut('DEBUG >>>\n$debugCommand\n'); try { - final out = await Veilid.instance.debug(debugCommand); + var out = await Veilid.instance.debug(debugCommand); + + if (debugCommand == 'help') { + out = 'VeilidChat Commands:\n' + ' pool allocations - List DHTRecordPool allocations\n' + ' pool opened - List opened DHTRecord instances' + ' from the pool\n' + ' change_log_ignore change the log' + ' target ignore list for a tracing layer\n' + ' targets to add to the ignore list can be separated by' + ' a comma.\n' + ' to remove a target from the ignore list, prepend it' + ' with a minus.\n\n$out'; + } + _debugOut('<<< DEBUG\n$out\n'); } on Exception catch (e, st) { _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); diff --git a/packages/veilid_support/example/pubspec.lock b/packages/veilid_support/example/pubspec.lock index 6ef291b..2e87d8c 100644 --- a/packages/veilid_support/example/pubspec.lock +++ b/packages/veilid_support/example/pubspec.lock @@ -650,7 +650,7 @@ packages: path: "../../../../veilid/veilid-flutter" relative: true source: path - version: "0.4.3" + version: "0.4.4" veilid_support: dependency: "direct main" description: diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index 8f88ce1..2687bdc 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -7,6 +7,7 @@ import 'package:collection/collection.dart'; import 'package:equatable/equatable.dart'; import 'package:meta/meta.dart'; +import '../../../src/veilid_log.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; @@ -212,7 +213,7 @@ class DHTLog implements DHTDeleteable { /// Runs a closure allowing read-only access to the log Future operate(Future Function(DHTLogReadOperations) closure) async { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _spine.operate((spine) async { @@ -229,7 +230,7 @@ class DHTLog implements DHTDeleteable { Future operateAppend( Future Function(DHTLogWriteOperations) closure) async { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _spine.operateAppend((spine) async { @@ -248,7 +249,7 @@ class DHTLog implements DHTDeleteable { Future Function(DHTLogWriteOperations) closure, {Duration? timeout}) async { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _spine.operateAppendEventual((spine) async { @@ -263,7 +264,7 @@ class DHTLog implements DHTDeleteable { void Function(DHTLogUpdate) onChanged, ) { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _listenMutex.protect(() async { diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart index 492312f..a7884f9 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -112,33 +112,34 @@ class DHTLogCubit extends Cubit> Future _refreshInner(void Function(AsyncValue>) emit, {bool forceRefresh = false}) async { late final int length; - final window = await _log.operate((reader) async { + final windowElements = await _log.operate((reader) async { length = reader.length; - return loadElementsFromReader(reader, _windowTail, _windowSize); + return _loadElementsFromReader(reader, _windowTail, _windowSize); }); - if (window == null) { + if (windowElements == null) { setWantsRefresh(); return; } + emit(AsyncValue.data(DHTLogStateData( length: length, - window: window, - windowTail: _windowTail, - windowSize: _windowSize, + window: windowElements.$2, + windowTail: windowElements.$1 + windowElements.$2.length, + windowSize: windowElements.$2.length, follow: _follow))); setRefreshed(); } // Tail is one past the last element to load - Future>?> loadElementsFromReader( + Future<(int, IList>)?> _loadElementsFromReader( DHTLogReadOperations reader, int tail, int count, {bool forceRefresh = false}) async { final length = reader.length; - if (length == 0) { - return const IList.empty(); - } final end = ((tail - 1) % length) + 1; final start = (count < end) ? end - count : 0; + if (length == 0) { + return (start, IList>.empty()); + } // If this is writeable get the offline positions Set? offlinePositions; @@ -154,8 +155,11 @@ class DHTLogCubit extends Cubit> value: _decodeElement(x.$2), isOffline: offlinePositions?.contains(x.$1) ?? false)) .toIList(); + if (allItems == null) { + return null; + } - return allItems; + return (start, allItems); } void _update(DHTLogUpdate upd) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart index 6281d6e..3ebb2b8 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -21,8 +21,14 @@ class _DHTLogRead implements DHTLogReadOperations { return null; } - return lookup.scope((sa) => - sa.operate((read) => read.get(lookup.pos, forceRefresh: forceRefresh))); + return lookup.scope((sa) => sa.operate((read) async { + if (lookup.pos >= read.length) { + veilidLoggy.error('DHTLog shortarray read @ ${lookup.pos}' + ' >= length ${read.length}'); + return null; + } + return read.get(lookup.pos, forceRefresh: forceRefresh); + })); } (int, int) _clampStartLen(int start, int? len) { @@ -47,40 +53,54 @@ class _DHTLogRead implements DHTLogReadOperations { final chunks = Iterable.generate(length) .slices(kMaxDHTConcurrency) - .map((chunk) => chunk - .map((pos) async => get(pos + start, forceRefresh: forceRefresh))); + .map((chunk) => chunk.map((pos) async { + try { + return await get(pos + start, forceRefresh: forceRefresh); + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); + rethrow; + } + })); for (final chunk in chunks) { - final elems = await chunk.wait; + var elems = await chunk.wait; - // If any element was unavailable, return null - if (elems.contains(null)) { - return null; + // Return only the first contiguous range, anything else is garbage + // due to a representational error in the head or shortarray legnth + final nullPos = elems.indexOf(null); + if (nullPos != -1) { + elems = elems.sublist(0, nullPos); } + out.addAll(elems.cast()); + + if (nullPos != -1) { + break; + } } return out; } @override - Future?> getOfflinePositions() async { + Future> getOfflinePositions() async { final positionOffline = {}; // Iterate positions backward from most recent for (var pos = _spine.length - 1; pos >= 0; pos--) { + // Get position final lookup = await _spine.lookupPosition(pos); + // If position doesn't exist then it definitely wasn't written to offline if (lookup == null) { - return null; + continue; } // Check each segment for offline positions var foundOffline = false; - final success = await lookup.scope((sa) => sa.operate((read) async { + await lookup.scope((sa) => sa.operate((read) async { final segmentOffline = await read.getOfflinePositions(); - if (segmentOffline == null) { - return false; - } // For each shortarray segment go through their segment positions // in reverse order and see if they are offline @@ -94,11 +114,7 @@ class _DHTLogRead implements DHTLogReadOperations { foundOffline = true; } } - return true; })); - if (!success) { - return null; - } // If we found nothing offline in this segment then we can stop if (!foundOffline) { break; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index bb27e04..d9f5df2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -248,7 +248,12 @@ class _DHTLogSpine { final headDelta = _ringDistance(newHead, oldHead); final tailDelta = _ringDistance(newTail, oldTail); if (headDelta > _positionLimit ~/ 2 || tailDelta > _positionLimit ~/ 2) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogSpine::_updateHead ' + '_head=$_head _tail=$_tail ' + 'oldHead=$oldHead oldTail=$oldTail ' + 'newHead=$newHead newTail=$newTail ' + 'headDelta=$headDelta tailDelta=$tailDelta ' + '_positionLimit=$_positionLimit'); } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart index 1b5c09f..397a1dc 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart @@ -17,7 +17,8 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } final lookup = await _spine.lookupPosition(pos); if (lookup == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData( + '_DHTLogRead::tryWriteItem pos=$pos _spine.length=${_spine.length}'); } // Write item to the segment @@ -45,12 +46,14 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } final aLookup = await _spine.lookupPosition(aPos); if (aLookup == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos ' + '_spine.length=${_spine.length}'); } final bLookup = await _spine.lookupPosition(bPos); if (bLookup == null) { await aLookup.close(); - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos ' + '_spine.length=${_spine.length}'); } // Swap items in the segments @@ -65,7 +68,10 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { if (bItem.value == null) { final aItem = await aWrite.get(aLookup.pos); if (aItem == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData( + '_DHTLogWrite::swap aPos=$aPos bPos=$bPos ' + 'aLookup.pos=${aLookup.pos} bLookup.pos=${bLookup.pos} ' + '_spine.length=${_spine.length}'); } await sb.operateWriteEventual((bWrite) async { final success = await bWrite @@ -101,7 +107,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { await write.clear(); } else if (lookup.pos != write.length) { // We should always be appending at the length - throw const DHTExceptionInvalidData(); + await write.truncate(lookup.pos); } return write.add(value); })); @@ -117,12 +123,16 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { final dws = DelayedWaitSet(); var success = true; - for (var valueIdx = 0; valueIdx < values.length;) { + for (var valueIdxIter = 0; valueIdxIter < values.length;) { + final valueIdx = valueIdxIter; final remaining = values.length - valueIdx; final lookup = await _spine.lookupPosition(insertPos + valueIdx); if (lookup == null) { - throw const DHTExceptionInvalidData(); + throw DHTExceptionInvalidData('_DHTLogWrite::addAll ' + '_spine.length=${_spine.length}' + 'insertPos=$insertPos valueIdx=$valueIdx ' + 'values.length=${values.length} '); } final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos); @@ -137,16 +147,21 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { await write.clear(); } else if (lookup.pos != write.length) { // We should always be appending at the length - throw const DHTExceptionInvalidData(); + await write.truncate(lookup.pos); } - return write.addAll(sublistValues); + await write.addAll(sublistValues); + success = true; })); } on DHTExceptionOutdated { success = false; + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); } }); - valueIdx += sacount; + valueIdxIter += sacount; } await dws(); diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index 9027799..e3c9abe 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -246,11 +246,13 @@ class DHTRecordPool with TableDBBackedJson { /// Print children String debugChildren(TypedKey recordKey, {List? allDeps}) { allDeps ??= _collectChildrenInner(recordKey); + // Debugging // ignore: avoid_print var out = 'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n'; for (final dep in allDeps) { if (dep != recordKey) { + // Debugging // ignore: avoid_print out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n'; } @@ -270,32 +272,25 @@ class DHTRecordPool with TableDBBackedJson { break; } } - } else { - final now = Veilid.instance.now().value; - // Expired, process renewal if desired - for (final entry in _opened.entries) { - final openedKey = entry.key; - final openedRecordInfo = entry.value; - - if (openedKey == updateValueChange.key) { - // Renew watch state for each opened record - for (final rec in openedRecordInfo.records) { - // See if the watch had an expiration and if it has expired - // otherwise the renewal will keep the same parameters - final watchState = rec._watchState; - if (watchState != null) { - final exp = watchState.expiration; - if (exp != null && exp.value < now) { - // Has expiration, and it has expired, clear watch state - rec._watchState = null; - } - } - } - openedRecordInfo.shared.needsWatchStateUpdate = true; - break; - } - } } + // else { + + // XXX: should no longer be necessary + // // Remove watch state + // + // for (final entry in _opened.entries) { + // final openedKey = entry.key; + // final openedRecordInfo = entry.value; + + // if (openedKey == updateValueChange.key) { + // for (final rec in openedRecordInfo.records) { + // rec._watchState = null; + // } + // openedRecordInfo.shared.needsWatchStateUpdate = true; + // break; + // } + // } + //} } /// Log the current record allocations @@ -735,7 +730,6 @@ class DHTRecordPool with TableDBBackedJson { int? totalCount; Timestamp? maxExpiration; List? allSubkeys; - Timestamp? earliestRenewalTime; var noExpiration = false; var everySubkey = false; @@ -768,15 +762,6 @@ class DHTRecordPool with TableDBBackedJson { } else { everySubkey = true; } - final wsRenewalTime = ws.renewalTime; - if (wsRenewalTime != null) { - earliestRenewalTime = earliestRenewalTime == null - ? wsRenewalTime - : Timestamp( - value: (wsRenewalTime.value < earliestRenewalTime.value - ? wsRenewalTime.value - : earliestRenewalTime.value)); - } } } if (noExpiration) { @@ -790,25 +775,10 @@ class DHTRecordPool with TableDBBackedJson { } return _WatchState( - subkeys: allSubkeys, - expiration: maxExpiration, - count: totalCount, - renewalTime: earliestRenewalTime); - } - - static void _updateWatchRealExpirations(Iterable records, - Timestamp realExpiration, Timestamp renewalTime) { - for (final rec in records) { - final ws = rec._watchState; - if (ws != null) { - rec._watchState = _WatchState( - subkeys: ws.subkeys, - expiration: ws.expiration, - count: ws.count, - realExpiration: realExpiration, - renewalTime: renewalTime); - } - } + subkeys: allSubkeys, + expiration: maxExpiration, + count: totalCount, + ); } Future _watchStateChange( @@ -833,9 +803,9 @@ class DHTRecordPool with TableDBBackedJson { // Only try this once, if it doesn't succeed then it can just expire // on its own. try { - final cancelled = await dhtctx.cancelDHTWatch(openedRecordKey); + final stillActive = await dhtctx.cancelDHTWatch(openedRecordKey); - log('cancelDHTWatch: key=$openedRecordKey, cancelled=$cancelled, ' + log('cancelDHTWatch: key=$openedRecordKey, stillActive=$stillActive, ' 'debugNames=${openedRecordInfo.debugNames}'); openedRecordInfo.shared.unionWatchState = null; @@ -858,34 +828,20 @@ class DHTRecordPool with TableDBBackedJson { final subkeys = unionWatchState.subkeys?.toList(); final count = unionWatchState.count; final expiration = unionWatchState.expiration; - final now = veilid.now(); - final realExpiration = await dhtctx.watchDHTValues(openedRecordKey, + final active = await dhtctx.watchDHTValues(openedRecordKey, subkeys: unionWatchState.subkeys?.toList(), count: unionWatchState.count, - expiration: unionWatchState.expiration ?? - (_defaultWatchDurationSecs == null - ? null - : veilid.now().offset(TimestampDuration.fromMillis( - _defaultWatchDurationSecs! * 1000)))); + expiration: unionWatchState.expiration); - final expirationDuration = realExpiration.diff(now); - final renewalTime = now.offset(TimestampDuration( - value: expirationDuration.value * - BigInt.from(_watchRenewalNumerator) ~/ - BigInt.from(_watchRenewalDenominator))); - - log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, ' + log('watchDHTValues(active=$active): ' + 'key=$openedRecordKey, subkeys=$subkeys, ' 'count=$count, expiration=$expiration, ' - 'realExpiration=$realExpiration, ' - 'renewalTime=$renewalTime, ' 'debugNames=${openedRecordInfo.debugNames}'); // Update watch states with real expiration - if (realExpiration.value != BigInt.zero) { + if (active) { openedRecordInfo.shared.unionWatchState = unionWatchState; - _updateWatchRealExpirations( - openedRecordInfo.records, realExpiration, renewalTime); openedRecordInfo.shared.needsWatchStateUpdate = false; } } on VeilidAPIExceptionTimeout { @@ -944,22 +900,13 @@ class DHTRecordPool with TableDBBackedJson { /// Ticker to check watch state change requests Future tick() async => _mutex.protect(() async { // See if any opened records need watch state changes - final now = veilid.now(); for (final kv in _opened.entries) { final openedRecordKey = kv.key; final openedRecordInfo = kv.value; - var wantsWatchStateUpdate = + final wantsWatchStateUpdate = openedRecordInfo.shared.needsWatchStateUpdate; - // Check if we have reached renewal time for the watch - if (openedRecordInfo.shared.unionWatchState != null && - openedRecordInfo.shared.unionWatchState!.renewalTime != null && - now.value > - openedRecordInfo.shared.unionWatchState!.renewalTime!.value) { - wantsWatchStateUpdate = true; - } - if (wantsWatchStateUpdate) { // Update union watch state final unionWatchState = diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart index d1fb5d1..05b93b0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart @@ -1,9 +1,5 @@ part of 'dht_record_pool.dart'; -const int? _defaultWatchDurationSecs = null; // 600 -const int _watchRenewalNumerator = 4; -const int _watchRenewalDenominator = 5; - // DHT crypto domain const String _cryptoDomainDHT = 'dht'; @@ -14,21 +10,17 @@ const _sfListen = 'listen'; /// Watch state @immutable class _WatchState extends Equatable { - const _WatchState( - {required this.subkeys, - required this.expiration, - required this.count, - this.realExpiration, - this.renewalTime}); + const _WatchState({ + required this.subkeys, + required this.expiration, + required this.count, + }); final List? subkeys; final Timestamp? expiration; final int? count; - final Timestamp? realExpiration; - final Timestamp? renewalTime; @override - List get props => - [subkeys, expiration, count, realExpiration, renewalTime]; + List get props => [subkeys, expiration, count]; } /// Data shared amongst all DHTRecord instances diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index 8101a7a..ccf7d18 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -4,6 +4,7 @@ import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; import 'package:collection/collection.dart'; +import '../../../src/veilid_log.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart index 6ff6d95..246a990 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart @@ -62,9 +62,6 @@ class DHTShortArrayCubit extends Cubit> Set? offlinePositions; if (_shortArray.writer != null) { offlinePositions = await reader.getOfflinePositions(); - if (offlinePositions == null) { - return null; - } } // Get the items diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 49659cd..b0cc41b 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -383,6 +383,24 @@ class _DHTShortArrayHead { // xxx: free list optimization here? } + /// Truncate index to a particular length + void truncate(int newLength) { + if (newLength >= _index.length) { + return; + } else if (newLength == 0) { + clearIndex(); + return; + } else if (newLength < 0) { + throw StateError('can not truncate to negative length'); + } + + final newIndex = _index.sublist(0, newLength); + final freed = _index.sublist(newLength); + + _index = newIndex; + _free.addAll(freed); + } + /// Validate the head from the DHT is properly formatted /// and calculate the free list from it while we're here List _makeFreeList( diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index ddfdedc..eeb9648 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -60,17 +60,32 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { final chunks = Iterable.generate(length) .slices(kMaxDHTConcurrency) - .map((chunk) => chunk - .map((pos) async => get(pos + start, forceRefresh: forceRefresh))); + .map((chunk) => chunk.map((pos) async { + try { + return await get(pos + start, forceRefresh: forceRefresh); + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); + rethrow; + } + })); for (final chunk in chunks) { - final elems = await chunk.wait; + var elems = await chunk.wait; - // If any element was unavailable, return null - if (elems.contains(null)) { - return null; + // Return only the first contiguous range, anything else is garbage + // due to a representational error in the head or shortarray legnth + final nullPos = elems.indexOf(null); + if (nullPos != -1) { + elems = elems.sublist(0, nullPos); } + out.addAll(elems.cast()); + + if (nullPos != -1) { + break; + } } return out; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index 51950f6..1705bc0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -9,6 +9,7 @@ abstract class DHTShortArrayWriteOperations DHTRandomWrite, DHTInsertRemove, DHTAdd, + DHTTruncate, DHTClear {} class _DHTShortArrayWrite extends _DHTShortArrayRead @@ -72,10 +73,16 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead final value = values[i]; final outSeqNum = outSeqNums[i]; dws.add((_) async { - final outValue = await lookup.record.tryWriteBytes(value, - subkey: lookup.recordSubkey, outSeqNum: outSeqNum); - if (outValue != null) { - success = false; + try { + final outValue = await lookup.record.tryWriteBytes(value, + subkey: lookup.recordSubkey, outSeqNum: outSeqNum); + if (outValue != null) { + success = false; + } + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); } }); } @@ -142,6 +149,11 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead _head.clearIndex(); } + @override + Future truncate(int newLength) async { + _head.truncate(newLength); + } + @override Future tryWriteItem(int pos, Uint8List newValue, {Output? output}) async { diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart index 0547332..d361757 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart @@ -14,19 +14,22 @@ abstract class DHTRandomRead { /// is specified, the network will always be checked for newer values /// rather than returning the existing locally stored copy of the elements. /// Throws an IndexError if the 'pos' is not within the length - /// of the container. + /// of the container. May return null if the item is not available at this + /// time. Future get(int pos, {bool forceRefresh = false}); /// Return a list of a range of items in the DHTArray. If 'forceRefresh' /// is specified, the network will always be checked for newer values /// rather than returning the existing locally stored copy of the elements. /// Throws an IndexError if either 'start' or '(start+length)' is not within - /// the length of the container. + /// the length of the container. May return fewer items than the length + /// expected if the requested items are not available, but will always + /// return a contiguous range starting at 'start'. Future?> getRange(int start, {int? length, bool forceRefresh = false}); /// Get a list of the positions that were written offline and not flushed yet - Future?> getOfflinePositions(); + Future> getOfflinePositions(); } extension DHTRandomReadExt on DHTRandomRead { diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart index b17dbee..134f5fa 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart @@ -2,20 +2,32 @@ class DHTExceptionOutdated implements Exception { const DHTExceptionOutdated( [this.cause = 'operation failed due to newer dht value']); final String cause; + + @override + String toString() => 'DHTExceptionOutdated: $cause'; } class DHTExceptionInvalidData implements Exception { - const DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']); + const DHTExceptionInvalidData(this.cause); final String cause; + + @override + String toString() => 'DHTExceptionInvalidData: $cause'; } class DHTExceptionCancelled implements Exception { const DHTExceptionCancelled([this.cause = 'operation was cancelled']); final String cause; + + @override + String toString() => 'DHTExceptionCancelled: $cause'; } class DHTExceptionNotAvailable implements Exception { const DHTExceptionNotAvailable( [this.cause = 'request could not be completed at this time']); final String cause; + + @override + String toString() => 'DHTExceptionNotAvailable: $cause'; } diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart index 750c48e..939a5b3 100644 --- a/packages/veilid_support/lib/src/persistent_queue.dart +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -7,6 +7,7 @@ import 'package:protobuf/protobuf.dart'; import 'config.dart'; import 'table_db.dart'; +import 'veilid_log.dart'; class PersistentQueue with TableDBBackedFromBuffer> { @@ -46,7 +47,7 @@ class PersistentQueue } } - Future _init(_) async { + Future _init(Completer _) async { // Start the processor unawaited(Future.delayed(Duration.zero, () async { await _initWait(); @@ -182,10 +183,28 @@ class PersistentQueue @override IList valueFromBuffer(Uint8List bytes) { - final reader = CodedBufferReader(bytes); var out = IList(); - while (!reader.isAtEnd()) { - out = out.add(_fromBuffer(reader.readBytesAsView())); + try { + final reader = CodedBufferReader(bytes); + while (!reader.isAtEnd()) { + final bytes = reader.readBytesAsView(); + try { + final item = _fromBuffer(bytes); + out = out.add(item); + } on Exception catch (e, st) { + veilidLoggy.debug( + 'Dropping invalid item from persistent queue: $bytes\n' + 'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n', + e, + st); + } + } + } on Exception catch (e, st) { + veilidLoggy.debug( + 'Dropping remainder of invalid persistent queue\n' + 'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n', + e, + st); } return out; } diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index c1d54cf..8b59336 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -9,6 +9,7 @@ import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; import '../veilid_support.dart'; +import 'veilid_log.dart'; @immutable class TableDBArrayUpdate extends Equatable { @@ -262,7 +263,16 @@ class _TableDBArrayBase { final dws = DelayedWaitSet(); while (batchLen > 0) { final entry = await _getIndexEntry(pos); - dws.add((_) async => (await _loadEntry(entry))!); + dws.add((_) async { + try { + return (await _loadEntry(entry))!; + // Need some way to debug ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + veilidLoggy.error('$e\n$st\n'); + rethrow; + } + }); pos++; batchLen--; } diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index 0c8ca3d..11c2db6 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -726,7 +726,7 @@ packages: path: "../../../veilid/veilid-flutter" relative: true source: path - version: "0.4.3" + version: "0.4.4" vm_service: dependency: transitive description: