diff --git a/lib/chat/cubits/chat_component_cubit.dart b/lib/chat/cubits/chat_component_cubit.dart index 7ea9e95..6112384 100644 --- a/lib/chat/cubits/chat_component_cubit.dart +++ b/lib/chat/cubits/chat_component_cubit.dart @@ -15,6 +15,7 @@ import '../../account_manager/account_manager.dart'; import '../../contacts/contacts.dart'; import '../../conversation/conversation.dart'; import '../../proto/proto.dart' as proto; +import '../../tools/tools.dart'; import '../models/chat_component_state.dart'; import '../models/message_state.dart'; import '../models/window_state.dart'; @@ -383,13 +384,13 @@ class ChatComponentCubit extends Cubit { if (chatMessage == null) { continue; } - chatMessages.insert(0, chatMessage); if (!tsSet.add(chatMessage.id)) { - // ignore: avoid_print - print('duplicate id found: ${chatMessage.id}:\n' - 'Messages:\n${messagesState.window}\n' - 'ChatMessages:\n$chatMessages'); - assert(false, 'should not have duplicate id'); + log.error('duplicate id found: ${chatMessage.id}' + // '\nMessages:\n${messagesState.window}' + // '\nChatMessages:\n$chatMessages' + ); + } else { + chatMessages.insert(0, chatMessage); } } return currentState.copyWith( diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart index b15d92c..73dddc6 100644 --- a/lib/chat/cubits/reconciliation/author_input_queue.dart +++ b/lib/chat/cubits/reconciliation/author_input_queue.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:math'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; @@ -6,106 +7,127 @@ import '../../../proto/proto.dart' as proto; import '../../../tools/tools.dart'; import 'author_input_source.dart'; import 'message_integrity.dart'; -import 'output_position.dart'; class AuthorInputQueue { AuthorInputQueue._({ required TypedKey author, required AuthorInputSource inputSource, - required OutputPosition? outputPosition, + required int inputPosition, + required proto.Message? previousMessage, required void Function(Object, StackTrace?) onError, required MessageIntegrity messageIntegrity, }) : _author = author, _onError = onError, _inputSource = inputSource, - _outputPosition = outputPosition, - _lastMessage = outputPosition?.message.content, + _previousMessage = previousMessage, _messageIntegrity = messageIntegrity, - _currentPosition = inputSource.currentWindow.last; + _inputPosition = inputPosition; static Future create({ required TypedKey author, required AuthorInputSource inputSource, - required OutputPosition? outputPosition, + required proto.Message? previousMessage, required void Function(Object, StackTrace?) onError, }) async { + // Get ending input position + final inputPosition = await inputSource.getTailPosition() - 1; + + // Create an input queue for the input source final queue = AuthorInputQueue._( author: author, inputSource: inputSource, - outputPosition: outputPosition, + inputPosition: inputPosition, + previousMessage: previousMessage, onError: onError, messageIntegrity: await MessageIntegrity.create(author: author)); - if (!await queue._findStartOfWork()) { + + // Rewind the queue's 'inputPosition' to the first unreconciled message + if (!await queue._rewindInputToAfterLastMessage()) { return null; } + return queue; } //////////////////////////////////////////////////////////////////////////// // Public interface - // Check if there are no messages left in this queue to reconcile - bool get isDone => _isDone; + /// Get the input source for this queue + AuthorInputSource get inputSource => _inputSource; - // Get the current message that needs reconciliation - proto.Message? get current => _currentMessage; - - // Get the earliest output position to start inserting - OutputPosition? get outputPosition => _outputPosition; - - // Get the author of this queue + /// Get the author of this queue TypedKey get author => _author; - // Remove a reconciled message and move to the next message - // Returns true if there is more work to do - Future consume() async { - if (_isDone) { + /// Get the current message that needs reconciliation + Future getCurrentMessage() async { + try { + // if we have a current message already, return it + if (_currentMessage != null) { + return _currentMessage; + } + + // Get the window + final currentWindow = await _updateWindow(clampInputPosition: false); + if (currentWindow == null) { + return null; + } + final currentElement = + currentWindow.elements[_inputPosition - currentWindow.firstPosition]; + return _currentMessage = currentElement.value; + // Catch everything so we can avoid ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + log.error('Exception getting current message: $e:\n$st\n'); + _currentMessage = null; + return null; + } + } + + /// Remove a reconciled message and move to the next message + /// Returns true if there is more work to do + Future advance() async { + final currentMessage = await getCurrentMessage(); + if (currentMessage == null) { return false; } - while (true) { - _lastMessage = _currentMessage; - _currentPosition++; + // Move current message to previous + _previousMessage = _currentMessage; + _currentMessage = null; + + while (true) { + // Advance to next position + _inputPosition++; // Get more window if we need to - if (!await _updateWindow()) { - // Window is not available so this queue can't work right now - _isDone = true; + final currentMessage = await getCurrentMessage(); + if (currentMessage == null) { return false; } - final nextMessage = _inputSource.currentWindow - .elements[_currentPosition - _inputSource.currentWindow.first]; - // Drop the 'offline' elements because we don't reconcile - // anything until it has been confirmed to be committed to the DHT - // if (nextMessage.isOffline) { - // continue; - // } - - if (_lastMessage != null) { + if (_previousMessage != null) { // Ensure the timestamp is not moving backward - if (nextMessage.value.timestamp < _lastMessage!.timestamp) { - log.warning('timestamp backward: ${nextMessage.value.timestamp}' - ' < ${_lastMessage!.timestamp}'); + if (currentMessage.timestamp < _previousMessage!.timestamp) { + log.warning('timestamp backward: ${currentMessage.timestamp}' + ' < ${_previousMessage!.timestamp}'); continue; } } // Verify the id chain for the message - final matchId = await _messageIntegrity.generateMessageId(_lastMessage); - if (matchId.compare(nextMessage.value.idBytes) != 0) { - log.warning( - 'id chain invalid: $matchId != ${nextMessage.value.idBytes}'); + final matchId = + await _messageIntegrity.generateMessageId(_previousMessage); + if (matchId.compare(currentMessage.idBytes) != 0) { + log.warning('id chain invalid: $matchId != ${currentMessage.idBytes}'); continue; } // Verify the signature for the message - if (!await _messageIntegrity.verifyMessage(nextMessage.value)) { - log.warning('invalid message signature: ${nextMessage.value}'); + if (!await _messageIntegrity.verifyMessage(currentMessage)) { + log.warning('invalid message signature: $currentMessage'); continue; } - _currentMessage = nextMessage.value; break; } return true; @@ -114,106 +136,166 @@ class AuthorInputQueue { //////////////////////////////////////////////////////////////////////////// // Internal implementation - // Walk backward from the tail of the input queue to find the first - // message newer than our last reconciled message from this author - // Returns false if no work is needed - Future _findStartOfWork() async { + /// Walk backward from the tail of the input queue to find the first + /// message newer than our last reconciled message from this author + /// Returns false if no work is needed + Future _rewindInputToAfterLastMessage() async { // Iterate windows over the inputSource + InputWindow? currentWindow; outer: while (true) { + // Get more window if we need to + currentWindow = await _updateWindow(clampInputPosition: true); + if (currentWindow == null) { + // Window is not available or things are empty so this + // queue can't work right now + return false; + } + // Iterate through current window backward - for (var i = _inputSource.currentWindow.elements.length - 1; - i >= 0 && _currentPosition >= 0; - i--, _currentPosition--) { - final elem = _inputSource.currentWindow.elements[i]; + for (var i = currentWindow.elements.length - 1; + i >= 0 && _inputPosition >= 0; + i--, _inputPosition--) { + final elem = currentWindow.elements[i]; // If we've found an input element that is older or same time as our // last reconciled message for this author, or we find the message // itself then we stop - if (_lastMessage != null) { + if (_previousMessage != null) { if (elem.value.authorUniqueIdBytes - .compare(_lastMessage!.authorUniqueIdBytes) == + .compare(_previousMessage!.authorUniqueIdBytes) == 0 || - elem.value.timestamp <= _lastMessage!.timestamp) { + elem.value.timestamp <= _previousMessage!.timestamp) { break outer; } } } // If we're at the beginning of the inputSource then we stop - if (_currentPosition < 0) { + if (_inputPosition < 0) { break; } - - // Get more window if we need to - if (!await _updateWindow()) { - // Window is not available or things are empty so this - // queue can't work right now - _isDone = true; - return false; - } } - // _currentPosition points to either before the input source starts + // _inputPosition points to either before the input source starts // or the position of the previous element. We still need to set the // _currentMessage to the previous element so consume() can compare // against it if we can. - if (_currentPosition >= 0) { - _currentMessage = _inputSource.currentWindow - .elements[_currentPosition - _inputSource.currentWindow.first].value; + if (_inputPosition >= 0) { + _currentMessage = currentWindow + .elements[_inputPosition - currentWindow.firstPosition].value; } - // After this consume(), the currentPosition and _currentMessage should + // After this advance(), the _inputPosition and _currentMessage should // be equal to the first message to process and the current window to - // process should not be empty - return consume(); + // process should not be empty if there is work to do + return advance(); } - // Slide the window toward the current position and load the batch around it - Future _updateWindow() async { + /// Slide the window toward the current position and load the batch around it + Future _updateWindow({required bool clampInputPosition}) async { + final inputTailPosition = await _inputSource.getTailPosition(); + if (inputTailPosition == 0) { + return null; + } + + // Handle out-of-range input position + if (clampInputPosition) { + _inputPosition = min(max(_inputPosition, 0), inputTailPosition - 1); + } else if (_inputPosition < 0 || _inputPosition >= inputTailPosition) { + return null; + } + // Check if we are still in the window - if (_currentPosition >= _inputSource.currentWindow.first && - _currentPosition <= _inputSource.currentWindow.last) { - return true; + final currentWindow = _currentWindow; + + int firstPosition; + int lastPosition; + if (currentWindow != null) { + firstPosition = currentWindow.firstPosition; + lastPosition = currentWindow.lastPosition; + + // Slide the window if we need to + if (_inputPosition >= firstPosition && _inputPosition <= lastPosition) { + return currentWindow; + } else if (_inputPosition < firstPosition) { + // Slide it backward, current position is now last + firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0); + lastPosition = _inputPosition; + } else if (_inputPosition > lastPosition) { + // Slide it forward, current position is now first + firstPosition = _inputPosition; + lastPosition = + min((_inputPosition + _maxWindowLength) - 1, inputTailPosition - 1); + } + } else { + // need a new window, start with the input position at the end + lastPosition = _inputPosition; + firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0); } // Get another input batch futher back - final avOk = - await _inputSource.updateWindow(_currentPosition, _maxWindowLength); + final avCurrentWindow = await _inputSource.getWindow( + firstPosition, lastPosition - firstPosition + 1); - final asErr = avOk.asError; + final asErr = avCurrentWindow.asError; if (asErr != null) { _onError(asErr.error, asErr.stackTrace); - return false; + _currentWindow = null; + return null; } - final asLoading = avOk.asLoading; + final asLoading = avCurrentWindow.asLoading; if (asLoading != null) { - // xxx: no need to block the cubit here for this - // xxx: might want to switch to a 'busy' state though - // xxx: to let the messages view show a spinner at the bottom - // xxx: while we reconcile... - // emit(const AsyncValue.loading()); - return false; + _currentWindow = null; + return null; } - return avOk.asData!.value; + + final nextWindow = avCurrentWindow.asData!.value; + if (nextWindow == null || nextWindow.length == 0) { + _currentWindow = null; + return null; + } + + // Handle out-of-range input position + // Doing this again because getWindow is allowed to return a smaller + // window than the one requested, possibly due to DHT consistency + // fluctuations and race conditions + if (clampInputPosition) { + _inputPosition = min(max(_inputPosition, nextWindow.firstPosition), + nextWindow.lastPosition); + } else if (_inputPosition < nextWindow.firstPosition || + _inputPosition > nextWindow.lastPosition) { + return null; + } + + return _currentWindow = nextWindow; } //////////////////////////////////////////////////////////////////////////// + /// The author of this messages in the input source final TypedKey _author; + + /// The input source we're pulling messages from final AuthorInputSource _inputSource; - final OutputPosition? _outputPosition; + + /// What to call if an error happens final void Function(Object, StackTrace?) _onError; + + /// The message integrity validator final MessageIntegrity _messageIntegrity; - // The last message we've consumed - proto.Message? _lastMessage; - // The current position in the input log that we are looking at - int _currentPosition; - // The current message we're looking at - proto.Message? _currentMessage; - // If we have reached the end - bool _isDone = false; + /// The last message we reconciled/output + proto.Message? _previousMessage; - // Desired maximum window length + /// The current message we're looking at + proto.Message? _currentMessage; + + /// The current position in the input source that we are looking at + int _inputPosition; + + /// The current input window from the InputSource; + InputWindow? _currentWindow; + + /// Desired maximum window length static const int _maxWindowLength = 256; } diff --git a/lib/chat/cubits/reconciliation/author_input_source.dart b/lib/chat/cubits/reconciliation/author_input_source.dart index 0bd1afb..f974dae 100644 --- a/lib/chat/cubits/reconciliation/author_input_source.dart +++ b/lib/chat/cubits/reconciliation/author_input_source.dart @@ -9,64 +9,68 @@ import '../../../proto/proto.dart' as proto; @immutable class InputWindow { - const InputWindow( - {required this.elements, required this.first, required this.last}); + const InputWindow({required this.elements, required this.firstPosition}) + : lastPosition = firstPosition + elements.length - 1, + isEmpty = elements.length == 0, + length = elements.length; + final IList> elements; - final int first; - final int last; + final int firstPosition; + final int lastPosition; + final bool isEmpty; + final int length; } class AuthorInputSource { - AuthorInputSource.fromCubit( - {required DHTLogStateData cubitState, - required this.cubit}) { - _currentWindow = InputWindow( - elements: cubitState.window, - first: (cubitState.windowTail - cubitState.window.length) % - cubitState.length, - last: (cubitState.windowTail - 1) % cubitState.length); - } + AuthorInputSource.fromDHTLog(DHTLog dhtLog) : _dhtLog = dhtLog; //////////////////////////////////////////////////////////////////////////// - InputWindow get currentWindow => _currentWindow; + Future getTailPosition() async => + _dhtLog.operate((reader) async => reader.length); - Future> updateWindow( - int currentPosition, int windowLength) async => - cubit.operate((reader) async { - // See if we're beyond the input source - if (currentPosition < 0 || currentPosition >= reader.length) { - return const AsyncValue.data(false); - } - - // Slide the window if we need to - var first = _currentWindow.first; - var last = _currentWindow.last; - if (currentPosition < first) { - // Slide it backward, current position is now last - first = max((currentPosition - windowLength) + 1, 0); - last = currentPosition; - } else if (currentPosition > last) { - // Slide it forward, current position is now first - first = currentPosition; - last = min((currentPosition + windowLength) - 1, reader.length - 1); - } else { - return const AsyncValue.data(true); + Future> getWindow( + int startPosition, int windowLength) async => + _dhtLog.operate((reader) async { + // Don't allow negative length + if (windowLength <= 0) { + return const AsyncValue.data(null); } + // Trim if we're beyond input source + var endPosition = startPosition + windowLength - 1; + startPosition = max(startPosition, 0); + endPosition = max(endPosition, 0); // Get another input batch futher back - final nextWindow = await cubit.loadElementsFromReader( - reader, last + 1, (last + 1) - first); - if (nextWindow == null) { - return const AsyncValue.loading(); + try { + Set? offlinePositions; + if (_dhtLog.writer != null) { + offlinePositions = await reader.getOfflinePositions(); + } + + final messages = await reader.getRangeProtobuf( + proto.Message.fromBuffer, startPosition, + length: endPosition - startPosition + 1); + if (messages == null) { + return const AsyncValue.loading(); + } + + final elements = messages.indexed + .map((x) => OnlineElementState( + value: x.$2, + isOffline: offlinePositions?.contains(x.$1 + startPosition) ?? + false)) + .toIList(); + + final window = + InputWindow(elements: elements, firstPosition: startPosition); + + return AsyncValue.data(window); + } on Exception catch (e, st) { + return AsyncValue.error(e, st); } - _currentWindow = - InputWindow(elements: nextWindow, first: first, last: last); - return const AsyncValue.data(true); }); //////////////////////////////////////////////////////////////////////////// - final DHTLogCubit cubit; - - late InputWindow _currentWindow; + final DHTLog _dhtLog; } diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart index 9b183b5..683b46d 100644 --- a/lib/chat/cubits/reconciliation/message_reconciliation.dart +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -20,66 +20,113 @@ class MessageReconciliation { //////////////////////////////////////////////////////////////////////////// - void reconcileMessages( - TypedKey author, - DHTLogStateData inputMessagesCubitState, - DHTLogCubit inputMessagesCubit) { - if (inputMessagesCubitState.window.isEmpty) { - return; - } + void addInputSourceFromDHTLog(TypedKey author, DHTLog inputMessagesDHTLog) { + _inputSources[author] = AuthorInputSource.fromDHTLog(inputMessagesDHTLog); + } - _inputSources[author] = AuthorInputSource.fromCubit( - cubitState: inputMessagesCubitState, cubit: inputMessagesCubit); + void reconcileMessages(TypedKey? author) { + // xxx: can we use 'author' here to optimize _updateAuthorInputQueues? singleFuture(this, onError: _onError, () async { - // Take entire list of input sources we have currently and process them - final inputSources = _inputSources; - _inputSources = {}; - - final inputFuts = >[]; - for (final kv in inputSources.entries) { - final author = kv.key; - final inputSource = kv.value; - inputFuts - .add(_enqueueAuthorInput(author: author, inputSource: inputSource)); - } - final inputQueues = await inputFuts.wait; - - // Make this safe to cast by removing inputs that were rejected or empty - inputQueues.removeNulls(); + // Update queues + final activeInputQueues = await _updateAuthorInputQueues(); // Process all input queues together await _outputCubit .operate((reconciledArray) async => _reconcileInputQueues( reconciledArray: reconciledArray, - inputQueues: inputQueues.cast(), + activeInputQueues: activeInputQueues, )); }); } //////////////////////////////////////////////////////////////////////////// - // Set up a single author's message reconciliation - Future _enqueueAuthorInput( - {required TypedKey author, - required AuthorInputSource inputSource}) async { - try { - // Get the position of our most recent reconciled message from this author - final outputPosition = await _findLastOutputPosition(author: author); + // Prepare author input queues by removing dead ones and adding new ones + // Queues that are empty are not added until they have something in them + // Active input queues with a current message are returned in a list + Future> _updateAuthorInputQueues() async { + // Remove any dead input queues + final deadQueues = []; + for (final author in _inputQueues.keys) { + if (!_inputSources.containsKey(author)) { + deadQueues.add(author); + } + } + for (final author in deadQueues) { + _inputQueues.remove(author); + _outputPositions.remove(author); + } - // Find oldest message we have not yet reconciled - final inputQueue = await AuthorInputQueue.create( - author: author, - inputSource: inputSource, - outputPosition: outputPosition, - onError: _onError, - ); - return inputQueue; - // Catch everything so we can avoid ParallelWaitError - // ignore: avoid_catches_without_on_clauses - } catch (e, st) { - log.error('Exception enqueing author input: $e:\n$st\n'); - return null; + await _outputCubit.operate((outputArray) async { + final dws = DelayedWaitSet(); + + for (final kv in _inputSources.entries) { + final author = kv.key; + final inputSource = kv.value; + + final iqExisting = _inputQueues[author]; + if (iqExisting == null || iqExisting.inputSource != inputSource) { + dws.add((_) async { + try { + await _enqueueAuthorInput( + author: author, + inputSource: inputSource, + outputArray: outputArray); + // Catch everything so we can avoid ParallelWaitError + // ignore: avoid_catches_without_on_clauses + } catch (e, st) { + log.error('Exception updating author input queue: $e:\n$st\n'); + _inputQueues.remove(author); + _outputPositions.remove(author); + } + }); + } + } + + await dws(); + }); + + // Get the active input queues + final activeInputQueues = await _inputQueues.entries + .map((entry) async { + if (await entry.value.getCurrentMessage() != null) { + return entry.value; + } else { + return null; + } + }) + .toList() + .wait + ..removeNulls(); + + return activeInputQueues.cast(); + } + + // Set up a single author's message reconciliation + Future _enqueueAuthorInput( + {required TypedKey author, + required AuthorInputSource inputSource, + required TableDBArrayProtobuf + outputArray}) async { + // Get the position of our most recent reconciled message from this author + final outputPosition = + await _findLastOutputPosition(author: author, outputArray: outputArray); + + // Find oldest message we have not yet reconciled + final inputQueue = await AuthorInputQueue.create( + author: author, + inputSource: inputSource, + previousMessage: outputPosition?.message.content, + onError: _onError, + ); + + if (inputQueue != null) { + _inputQueues[author] = inputQueue; + _outputPositions[author] = outputPosition; + } else { + _inputQueues.remove(author); + _outputPositions.remove(author); } } @@ -87,36 +134,38 @@ class MessageReconciliation { // XXX: For a group chat, this should find when the author // was added to the membership so we don't just go back in time forever Future _findLastOutputPosition( - {required TypedKey author}) async => - _outputCubit.operate((arr) async { - var pos = arr.length - 1; - while (pos >= 0) { - final message = await arr.get(pos); - if (message.content.author.toVeilid() == author) { - return OutputPosition(message, pos); - } - pos--; - } - return null; - }); + {required TypedKey author, + required TableDBArrayProtobuf + outputArray}) async { + var pos = outputArray.length - 1; + while (pos >= 0) { + final message = await outputArray.get(pos); + if (message.content.author.toVeilid() == author) { + return OutputPosition(message, pos); + } + pos--; + } + return null; + } // Process a list of author input queues and insert their messages // into the output array, performing validation steps along the way Future _reconcileInputQueues({ required TableDBArrayProtobuf reconciledArray, - required List inputQueues, + required List activeInputQueues, }) async { - // Ensure queues all have something to do - inputQueues.removeWhere((q) => q.isDone); - if (inputQueues.isEmpty) { + // Ensure we have active queues to process + if (activeInputQueues.isEmpty) { return; } // Sort queues from earliest to latest and then by author // to ensure a deterministic insert order - inputQueues.sort((a, b) { - final acmp = a.outputPosition?.pos ?? -1; - final bcmp = b.outputPosition?.pos ?? -1; + activeInputQueues.sort((a, b) { + final aout = _outputPositions[a.author]; + final bout = _outputPositions[b.author]; + final acmp = aout?.pos ?? -1; + final bcmp = bout?.pos ?? -1; if (acmp == bcmp) { return a.author.toString().compareTo(b.author.toString()); } @@ -124,21 +173,28 @@ class MessageReconciliation { }); // Start at the earliest position we know about in all the queues - var currentOutputPosition = inputQueues.first.outputPosition; + var currentOutputPosition = + _outputPositions[activeInputQueues.first.author]; final toInsert = SortedList(proto.MessageExt.compareTimestamp); - while (inputQueues.isNotEmpty) { + while (activeInputQueues.isNotEmpty) { // Get up to '_maxReconcileChunk' of the items from the queues // that we can insert at this location bool added; do { added = false; - var someQueueEmpty = false; - for (final inputQueue in inputQueues) { - final inputCurrent = inputQueue.current!; + + final emptyQueues = {}; + for (final inputQueue in activeInputQueues) { + final inputCurrent = await inputQueue.getCurrentMessage(); + if (inputCurrent == null) { + log.error('Active input queue did not have a current message: ' + '${inputQueue.author}'); + continue; + } if (currentOutputPosition == null || inputCurrent.timestamp < currentOutputPosition.message.content.timestamp) { @@ -146,16 +202,14 @@ class MessageReconciliation { added = true; // Advance this queue - if (!await inputQueue.consume()) { - // Queue is empty now, run a queue purge - someQueueEmpty = true; + if (!await inputQueue.advance()) { + // Mark queue as empty for removal + emptyQueues.add(inputQueue); } } } - // Remove empty queues now that we're done iterating - if (someQueueEmpty) { - inputQueues.removeWhere((q) => q.isDone); - } + // Remove finished queues now that we're done iterating + activeInputQueues.removeWhere(emptyQueues.contains); if (toInsert.length >= _maxReconcileChunk) { break; @@ -173,9 +227,27 @@ class MessageReconciliation { ..content = message) .toList(); - await reconciledArray.insertAll( - currentOutputPosition?.pos ?? reconciledArray.length, - reconciledInserts); + // Figure out where to insert the reconciled messages + final insertPos = currentOutputPosition?.pos ?? reconciledArray.length; + + // Insert them all at once + await reconciledArray.insertAll(insertPos, reconciledInserts); + + // Update output positions for input queues + final updatePositions = _outputPositions.keys.toSet(); + var outputPos = insertPos + reconciledInserts.length; + for (final inserted in reconciledInserts.reversed) { + if (updatePositions.isEmpty) { + // Last seen positions already recorded for each active author + break; + } + outputPos--; + final author = inserted.content.author.toVeilid(); + if (updatePositions.contains(author)) { + _outputPositions[author] = OutputPosition(inserted, outputPos); + updatePositions.remove(author); + } + } toInsert.clear(); } else { @@ -195,7 +267,9 @@ class MessageReconciliation { //////////////////////////////////////////////////////////////////////////// - Map _inputSources = {}; + final Map _inputSources = {}; + final Map _inputQueues = {}; + final Map _outputPositions = {}; final TableDBArrayProtobufCubit _outputCubit; final void Function(Object, StackTrace?) _onError; diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 559eae2..66032f6 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -77,8 +77,8 @@ class SingleContactMessagesCubit extends Cubit { await _sentSubscription?.cancel(); await _rcvdSubscription?.cancel(); await _reconciledSubscription?.cancel(); - await _sentMessagesCubit?.close(); - await _rcvdMessagesCubit?.close(); + await _sentMessagesDHTLog?.close(); + await _rcvdMessagesDHTLog?.close(); await _reconciledMessagesCubit?.close(); // If the local conversation record is gone, then delete the reconciled @@ -111,10 +111,10 @@ class SingleContactMessagesCubit extends Cubit { await _initReconciledMessagesCubit(); // Local messages key - await _initSentMessagesCubit(); + await _initSentMessagesDHTLog(); // Remote messages key - await _initRcvdMessagesCubit(); + await _initRcvdMessagesDHTLog(); // Command execution background process _commandRunnerFut = Future.delayed(Duration.zero, _commandRunner); @@ -129,39 +129,40 @@ class SingleContactMessagesCubit extends Cubit { } // Open local messages key - Future _initSentMessagesCubit() async { + Future _initSentMessagesDHTLog() async { final writer = _accountInfo.identityWriter; - _sentMessagesCubit = DHTLogCubit( - open: () async => DHTLog.openWrite(_localMessagesRecordKey, writer, + final sentMessagesDHTLog = + await DHTLog.openWrite(_localMessagesRecordKey, writer, debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::' 'SentMessages', parent: _localConversationRecordKey, - crypto: _conversationCrypto), - decodeElement: proto.Message.fromBuffer); - _sentSubscription = - _sentMessagesCubit!.stream.listen(_updateSentMessagesState); - _updateSentMessagesState(_sentMessagesCubit!.state); + crypto: _conversationCrypto); + _sentSubscription = await sentMessagesDHTLog.listen(_updateSentMessages); + + _sentMessagesDHTLog = sentMessagesDHTLog; + _reconciliation.addInputSourceFromDHTLog( + _accountInfo.identityTypedPublicKey, sentMessagesDHTLog); } // Open remote messages key - Future _initRcvdMessagesCubit() async { + Future _initRcvdMessagesDHTLog() async { // Don't bother if we don't have a remote messages record key yet if (_remoteMessagesRecordKey == null) { return; } // Open new cubit if one is desired - _rcvdMessagesCubit = DHTLogCubit( - open: () async => DHTLog.openRead(_remoteMessagesRecordKey!, - debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' - 'RcvdMessages', - parent: _remoteConversationRecordKey, - crypto: _conversationCrypto), - decodeElement: proto.Message.fromBuffer); - _rcvdSubscription = - _rcvdMessagesCubit!.stream.listen(_updateRcvdMessagesState); - _updateRcvdMessagesState(_rcvdMessagesCubit!.state); + final rcvdMessagesDHTLog = await DHTLog.openRead(_remoteMessagesRecordKey!, + debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' + 'RcvdMessages', + parent: _remoteConversationRecordKey, + crypto: _conversationCrypto); + _rcvdSubscription = await rcvdMessagesDHTLog.listen(_updateRcvdMessages); + + _rcvdMessagesDHTLog = rcvdMessagesDHTLog; + _reconciliation.addInputSourceFromDHTLog( + _remoteIdentityPublicKey, rcvdMessagesDHTLog); } Future updateRemoteMessagesRecordKey( @@ -175,17 +176,17 @@ class SingleContactMessagesCubit extends Cubit { return; } - // Close existing cubit if we have one - final rcvdMessagesCubit = _rcvdMessagesCubit; - _rcvdMessagesCubit = null; + // Close existing DHTLog if we have one + final rcvdMessagesDHTLog = _rcvdMessagesDHTLog; + _rcvdMessagesDHTLog = null; _remoteMessagesRecordKey = null; await _rcvdSubscription?.cancel(); _rcvdSubscription = null; - await rcvdMessagesCubit?.close(); + await rcvdMessagesDHTLog?.close(); - // Init the new cubit if we should + // Init the new DHTLog if we should _remoteMessagesRecordKey = remoteMessagesRecordKey; - await _initRcvdMessagesCubit(); + await _initRcvdMessagesDHTLog(); }); } @@ -275,30 +276,15 @@ class SingleContactMessagesCubit extends Cubit { //////////////////////////////////////////////////////////////////////////// // Internal implementation - // Called when the sent messages cubit gets a change + // Called when the sent messages DHTLog gets a change // This will re-render when messages are sent from another machine - void _updateSentMessagesState(DHTLogBusyState avmessages) { - final sentMessages = avmessages.state.asData?.value; - if (sentMessages == null) { - return; - } - - _reconciliation.reconcileMessages( - _accountInfo.identityTypedPublicKey, sentMessages, _sentMessagesCubit!); - - // Update the view - _renderState(); + void _updateSentMessages(DHTLogUpdate upd) { + _reconciliation.reconcileMessages(_accountInfo.identityTypedPublicKey); } - // Called when the received messages cubit gets a change - void _updateRcvdMessagesState(DHTLogBusyState avmessages) { - final rcvdMessages = avmessages.state.asData?.value; - if (rcvdMessages == null) { - return; - } - - _reconciliation.reconcileMessages( - _remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!); + // Called when the received messages DHTLog gets a change + void _updateRcvdMessages(DHTLogUpdate upd) { + _reconciliation.reconcileMessages(_remoteIdentityPublicKey); } // Called when the reconciled messages window gets a change @@ -327,7 +313,7 @@ class SingleContactMessagesCubit extends Cubit { // _renderState(); try { - await _sentMessagesCubit!.operateAppendEventual((writer) async { + await _sentMessagesDHTLog!.operateAppendEventual((writer) async { // Get the previous message if we have one var previousMessage = writer.length == 0 ? null @@ -357,16 +343,17 @@ class SingleContactMessagesCubit extends Cubit { // Produce a state for this cubit from the input cubits and queues void _renderState() { - // Get all reconciled messages + // Get all reconciled messages in the cubit window final reconciledMessages = _reconciledMessagesCubit?.state.state.asData?.value; - // Get all sent messages - final sentMessages = _sentMessagesCubit?.state.state.asData?.value; + + // Get all sent messages that are still offline + //final sentMessages = _sentMessagesDHTLog. //Get all items in the unsent queue //final unsentMessages = _unsentMessagesQueue.queue; // If we aren't ready to render a state, say we're loading - if (reconciledMessages == null || sentMessages == null) { + if (reconciledMessages == null) { emit(const AsyncLoading()); return; } @@ -377,11 +364,11 @@ class SingleContactMessagesCubit extends Cubit { // keyMapper: (x) => x.content.authorUniqueIdString, // values: reconciledMessages.windowElements, // ); - final sentMessagesMap = - IMap>.fromValues( - keyMapper: (x) => x.value.authorUniqueIdString, - values: sentMessages.window, - ); + // final sentMessagesMap = + // IMap>.fromValues( + // keyMapper: (x) => x.value.authorUniqueIdString, + // values: sentMessages.window, + // ); // final unsentMessagesMap = IMap.fromValues( // keyMapper: (x) => x.authorUniqueIdString, // values: unsentMessages, @@ -393,10 +380,12 @@ class SingleContactMessagesCubit extends Cubit { final isLocal = m.content.author.toVeilid() == _accountInfo.identityTypedPublicKey; final reconciledTimestamp = Timestamp.fromInt64(m.reconciledTime); - final sm = - isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null; - final sent = isLocal && sm != null; - final sentOffline = isLocal && sm != null && sm.isOffline; + //final sm = + //isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null; + //final sent = isLocal && sm != null; + //final sentOffline = isLocal && sm != null && sm.isOffline; + final sent = isLocal; + final sentOffline = false; // renderedElements.add(RenderStateElement( message: m.content, @@ -491,16 +480,16 @@ class SingleContactMessagesCubit extends Cubit { late final VeilidCrypto _conversationCrypto; late final MessageIntegrity _senderMessageIntegrity; - DHTLogCubit? _sentMessagesCubit; - DHTLogCubit? _rcvdMessagesCubit; + DHTLog? _sentMessagesDHTLog; + DHTLog? _rcvdMessagesDHTLog; TableDBArrayProtobufCubit? _reconciledMessagesCubit; late final MessageReconciliation _reconciliation; late final PersistentQueue _unsentMessagesQueue; // IList _sendingMessages = const IList.empty(); - StreamSubscription>? _sentSubscription; - StreamSubscription>? _rcvdSubscription; + StreamSubscription? _sentSubscription; + StreamSubscription? _rcvdSubscription; StreamSubscription>? _reconciledSubscription; final StreamController Function()> _commandController; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index 1d3fb89..2687bdc 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -213,7 +213,7 @@ class DHTLog implements DHTDeleteable { /// Runs a closure allowing read-only access to the log Future operate(Future Function(DHTLogReadOperations) closure) async { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _spine.operate((spine) async { @@ -230,7 +230,7 @@ class DHTLog implements DHTDeleteable { Future operateAppend( Future Function(DHTLogWriteOperations) closure) async { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _spine.operateAppend((spine) async { @@ -249,7 +249,7 @@ class DHTLog implements DHTDeleteable { Future Function(DHTLogWriteOperations) closure, {Duration? timeout}) async { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _spine.operateAppendEventual((spine) async { @@ -264,7 +264,7 @@ class DHTLog implements DHTDeleteable { void Function(DHTLogUpdate) onChanged, ) { if (!isOpen) { - throw StateError('log is not open"'); + throw StateError('log is not open'); } return _listenMutex.protect(() async { diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart index 492312f..a7884f9 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -112,33 +112,34 @@ class DHTLogCubit extends Cubit> Future _refreshInner(void Function(AsyncValue>) emit, {bool forceRefresh = false}) async { late final int length; - final window = await _log.operate((reader) async { + final windowElements = await _log.operate((reader) async { length = reader.length; - return loadElementsFromReader(reader, _windowTail, _windowSize); + return _loadElementsFromReader(reader, _windowTail, _windowSize); }); - if (window == null) { + if (windowElements == null) { setWantsRefresh(); return; } + emit(AsyncValue.data(DHTLogStateData( length: length, - window: window, - windowTail: _windowTail, - windowSize: _windowSize, + window: windowElements.$2, + windowTail: windowElements.$1 + windowElements.$2.length, + windowSize: windowElements.$2.length, follow: _follow))); setRefreshed(); } // Tail is one past the last element to load - Future>?> loadElementsFromReader( + Future<(int, IList>)?> _loadElementsFromReader( DHTLogReadOperations reader, int tail, int count, {bool forceRefresh = false}) async { final length = reader.length; - if (length == 0) { - return const IList.empty(); - } final end = ((tail - 1) % length) + 1; final start = (count < end) ? end - count : 0; + if (length == 0) { + return (start, IList>.empty()); + } // If this is writeable get the offline positions Set? offlinePositions; @@ -154,8 +155,11 @@ class DHTLogCubit extends Cubit> value: _decodeElement(x.$2), isOffline: offlinePositions?.contains(x.$1) ?? false)) .toIList(); + if (allItems == null) { + return null; + } - return allItems; + return (start, allItems); } void _update(DHTLogUpdate upd) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart index d8634c6..3ebb2b8 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -21,8 +21,14 @@ class _DHTLogRead implements DHTLogReadOperations { return null; } - return lookup.scope((sa) => - sa.operate((read) => read.get(lookup.pos, forceRefresh: forceRefresh))); + return lookup.scope((sa) => sa.operate((read) async { + if (lookup.pos >= read.length) { + veilidLoggy.error('DHTLog shortarray read @ ${lookup.pos}' + ' >= length ${read.length}'); + return null; + } + return read.get(lookup.pos, forceRefresh: forceRefresh); + })); } (int, int) _clampStartLen(int start, int? len) { @@ -49,7 +55,7 @@ class _DHTLogRead implements DHTLogReadOperations { .slices(kMaxDHTConcurrency) .map((chunk) => chunk.map((pos) async { try { - return get(pos + start, forceRefresh: forceRefresh); + return await get(pos + start, forceRefresh: forceRefresh); // Need some way to debug ParallelWaitError // ignore: avoid_catches_without_on_clauses } catch (e, st) { @@ -59,36 +65,42 @@ class _DHTLogRead implements DHTLogReadOperations { })); for (final chunk in chunks) { - final elems = await chunk.wait; + var elems = await chunk.wait; - // If any element was unavailable, return null - if (elems.contains(null)) { - return null; + // Return only the first contiguous range, anything else is garbage + // due to a representational error in the head or shortarray legnth + final nullPos = elems.indexOf(null); + if (nullPos != -1) { + elems = elems.sublist(0, nullPos); } + out.addAll(elems.cast()); + + if (nullPos != -1) { + break; + } } return out; } @override - Future?> getOfflinePositions() async { + Future> getOfflinePositions() async { final positionOffline = {}; // Iterate positions backward from most recent for (var pos = _spine.length - 1; pos >= 0; pos--) { + // Get position final lookup = await _spine.lookupPosition(pos); + // If position doesn't exist then it definitely wasn't written to offline if (lookup == null) { - return null; + continue; } // Check each segment for offline positions var foundOffline = false; - final success = await lookup.scope((sa) => sa.operate((read) async { + await lookup.scope((sa) => sa.operate((read) async { final segmentOffline = await read.getOfflinePositions(); - if (segmentOffline == null) { - return false; - } // For each shortarray segment go through their segment positions // in reverse order and see if they are offline @@ -102,11 +114,7 @@ class _DHTLogRead implements DHTLogReadOperations { foundOffline = true; } } - return true; })); - if (!success) { - return null; - } // If we found nothing offline in this segment then we can stop if (!foundOffline) { break; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart index 8d34280..397a1dc 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart @@ -107,9 +107,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { await write.clear(); } else if (lookup.pos != write.length) { // We should always be appending at the length - throw DHTExceptionInvalidData( - '_DHTLogWrite::add lookup.pos=${lookup.pos} ' - 'write.length=${write.length}'); + await write.truncate(lookup.pos); } return write.add(value); })); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart index 6ff6d95..246a990 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart @@ -62,9 +62,6 @@ class DHTShortArrayCubit extends Cubit> Set? offlinePositions; if (_shortArray.writer != null) { offlinePositions = await reader.getOfflinePositions(); - if (offlinePositions == null) { - return null; - } } // Get the items diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 747a892..eeb9648 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -62,7 +62,7 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { .slices(kMaxDHTConcurrency) .map((chunk) => chunk.map((pos) async { try { - return get(pos + start, forceRefresh: forceRefresh); + return await get(pos + start, forceRefresh: forceRefresh); // Need some way to debug ParallelWaitError // ignore: avoid_catches_without_on_clauses } catch (e, st) { @@ -72,13 +72,20 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { })); for (final chunk in chunks) { - final elems = await chunk.wait; + var elems = await chunk.wait; - // If any element was unavailable, return null - if (elems.contains(null)) { - return null; + // Return only the first contiguous range, anything else is garbage + // due to a representational error in the head or shortarray legnth + final nullPos = elems.indexOf(null); + if (nullPos != -1) { + elems = elems.sublist(0, nullPos); } + out.addAll(elems.cast()); + + if (nullPos != -1) { + break; + } } return out; diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart index 0547332..d361757 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_read.dart @@ -14,19 +14,22 @@ abstract class DHTRandomRead { /// is specified, the network will always be checked for newer values /// rather than returning the existing locally stored copy of the elements. /// Throws an IndexError if the 'pos' is not within the length - /// of the container. + /// of the container. May return null if the item is not available at this + /// time. Future get(int pos, {bool forceRefresh = false}); /// Return a list of a range of items in the DHTArray. If 'forceRefresh' /// is specified, the network will always be checked for newer values /// rather than returning the existing locally stored copy of the elements. /// Throws an IndexError if either 'start' or '(start+length)' is not within - /// the length of the container. + /// the length of the container. May return fewer items than the length + /// expected if the requested items are not available, but will always + /// return a contiguous range starting at 'start'. Future?> getRange(int start, {int? length, bool forceRefresh = false}); /// Get a list of the positions that were written offline and not flushed yet - Future?> getOfflinePositions(); + Future> getOfflinePositions(); } extension DHTRandomReadExt on DHTRandomRead {