import 'dart:async'; import 'dart:math'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; import '../../../tools/tools.dart'; import 'author_input_source.dart'; import 'message_integrity.dart'; class AuthorInputQueue { AuthorInputQueue._({ required TypedKey author, required AuthorInputSource inputSource, required int inputPosition, required proto.Message? previousMessage, required void Function(Object, StackTrace?) onError, required MessageIntegrity messageIntegrity, }) : _author = author, _onError = onError, _inputSource = inputSource, _previousMessage = previousMessage, _messageIntegrity = messageIntegrity, _inputPosition = inputPosition; static Future create({ required TypedKey author, required AuthorInputSource inputSource, required proto.Message? previousMessage, required void Function(Object, StackTrace?) onError, }) async { // Get ending input position final inputPosition = await inputSource.getTailPosition() - 1; // Create an input queue for the input source final queue = AuthorInputQueue._( author: author, inputSource: inputSource, inputPosition: inputPosition, previousMessage: previousMessage, onError: onError, messageIntegrity: await MessageIntegrity.create(author: author)); // Rewind the queue's 'inputPosition' to the first unreconciled message if (!await queue._rewindInputToAfterLastMessage()) { return null; } return queue; } //////////////////////////////////////////////////////////////////////////// // Public interface /// Get the input source for this queue AuthorInputSource get inputSource => _inputSource; /// Get the author of this queue TypedKey get author => _author; /// Get the current message that needs reconciliation Future getCurrentMessage() async { try { // if we have a current message already, return it if (_currentMessage != null) { return _currentMessage; } // Get the window final currentWindow = await _updateWindow(clampInputPosition: false); if (currentWindow == null) { return null; } final currentElement = currentWindow.elements[_inputPosition - currentWindow.firstPosition]; return _currentMessage = currentElement.value; // Catch everything so we can avoid ParallelWaitError // ignore: avoid_catches_without_on_clauses } catch (e, st) { log.error('Exception getting current message: $e:\n$st\n'); _currentMessage = null; return null; } } /// Move the reconciliation cursor (_inputPosition) forward on the input /// queue and tees up the next message for processing /// Returns true if there is more work to do /// Returns false if there are no more messages to reconcile in this queue Future advance() async { // Move current message to previous _previousMessage = await getCurrentMessage(); _currentMessage = null; while (true) { // Advance to next position _inputPosition++; // Get more window if we need to final currentMessage = await getCurrentMessage(); if (currentMessage == null) { return false; } if (_previousMessage != null) { // Ensure the timestamp is not moving backward if (currentMessage.timestamp < _previousMessage!.timestamp) { log.warning('timestamp backward: ${currentMessage.timestamp}' ' < ${_previousMessage!.timestamp}'); continue; } } // Verify the id chain for the message final matchId = await _messageIntegrity.generateMessageId(_previousMessage); if (matchId.compare(currentMessage.idBytes) != 0) { log.warning('id chain invalid: $matchId != ${currentMessage.idBytes}'); continue; } // Verify the signature for the message if (!await _messageIntegrity.verifyMessage(currentMessage)) { log.warning('invalid message signature: $currentMessage'); continue; } break; } return true; } //////////////////////////////////////////////////////////////////////////// // Internal implementation /// Walk backward from the tail of the input queue to find the first /// message newer than our last reconciled message from this author /// Returns false if no work is needed Future _rewindInputToAfterLastMessage() async { // Iterate windows over the inputSource InputWindow? currentWindow; outer: while (true) { // Get more window if we need to currentWindow = await _updateWindow(clampInputPosition: true); if (currentWindow == null) { // Window is not available or things are empty so this // queue can't work right now return false; } // Iterate through current window backward for (var i = currentWindow.elements.length - 1; i >= 0 && _inputPosition >= 0; i--, _inputPosition--) { final elem = currentWindow.elements[i]; // If we've found an input element that is older or same time as our // last reconciled message for this author, or we find the message // itself then we stop if (_previousMessage != null) { if (elem.value.authorUniqueIdBytes .compare(_previousMessage!.authorUniqueIdBytes) == 0 || elem.value.timestamp <= _previousMessage!.timestamp) { break outer; } } } // If we're at the beginning of the inputSource then we stop if (_inputPosition < 0) { break; } } // _inputPosition points to either before the input source starts // or the position of the previous element. We still need to set the // _currentMessage to the previous element so advance() can compare // against it if we can. if (_inputPosition >= 0) { _currentMessage = currentWindow .elements[_inputPosition - currentWindow.firstPosition].value; } // After this advance(), the _inputPosition and _currentMessage should // be equal to the first message to process and the current window to // process should not be empty if there is work to do return advance(); } /// Slide the window toward the current position and load the batch around it Future _updateWindow({required bool clampInputPosition}) async { final inputTailPosition = await _inputSource.getTailPosition(); if (inputTailPosition == 0) { return null; } // Handle out-of-range input position if (clampInputPosition) { _inputPosition = min(max(_inputPosition, 0), inputTailPosition - 1); } else if (_inputPosition < 0 || _inputPosition >= inputTailPosition) { return null; } // Check if we are still in the window final currentWindow = _currentWindow; int firstPosition; int lastPosition; if (currentWindow != null) { firstPosition = currentWindow.firstPosition; lastPosition = currentWindow.lastPosition; // Slide the window if we need to if (_inputPosition >= firstPosition && _inputPosition <= lastPosition) { return currentWindow; } else if (_inputPosition < firstPosition) { // Slide it backward, current position is now last firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0); lastPosition = _inputPosition; } else if (_inputPosition > lastPosition) { // Slide it forward, current position is now first firstPosition = _inputPosition; lastPosition = min((_inputPosition + _maxWindowLength) - 1, inputTailPosition - 1); } } else { // need a new window, start with the input position at the end lastPosition = _inputPosition; firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0); } // Get another input batch futher back final avCurrentWindow = await _inputSource.getWindow( firstPosition, lastPosition - firstPosition + 1); final asErr = avCurrentWindow.asError; if (asErr != null) { _onError(asErr.error, asErr.stackTrace); _currentWindow = null; return null; } final asLoading = avCurrentWindow.asLoading; if (asLoading != null) { _currentWindow = null; return null; } final nextWindow = avCurrentWindow.asData!.value; if (nextWindow == null || nextWindow.length == 0) { _currentWindow = null; return null; } // Handle out-of-range input position // Doing this again because getWindow is allowed to return a smaller // window than the one requested, possibly due to DHT consistency // fluctuations and race conditions if (clampInputPosition) { _inputPosition = min(max(_inputPosition, nextWindow.firstPosition), nextWindow.lastPosition); } else if (_inputPosition < nextWindow.firstPosition || _inputPosition > nextWindow.lastPosition) { return null; } return _currentWindow = nextWindow; } //////////////////////////////////////////////////////////////////////////// /// The author of this messages in the input source final TypedKey _author; /// The input source we're pulling messages from final AuthorInputSource _inputSource; /// What to call if an error happens final void Function(Object, StackTrace?) _onError; /// The message integrity validator final MessageIntegrity _messageIntegrity; /// The last message we reconciled/output proto.Message? _previousMessage; /// The current message we're looking at proto.Message? _currentMessage; /// The current position in the input source that we are looking at int _inputPosition; /// The current input window from the InputSource; InputWindow? _currentWindow; /// Desired maximum window length static const int _maxWindowLength = 256; }