import 'dart:async'; import 'package:async_tools/async_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:sorted_list/sorted_list.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../../proto/proto.dart' as proto; import '../../../tools/tools.dart'; import 'author_input_queue.dart'; import 'author_input_source.dart'; import 'output_position.dart'; class MessageReconciliation { MessageReconciliation( {required TableDBArrayProtobufCubit output, required void Function(Object, StackTrace?) onError}) : _outputCubit = output, _onError = onError; //////////////////////////////////////////////////////////////////////////// void addInputSourceFromDHTLog(TypedKey author, DHTLog inputMessagesDHTLog) { _inputSources[author] = AuthorInputSource.fromDHTLog(inputMessagesDHTLog); } void reconcileMessages(TypedKey? author) { // xxx: can we use 'author' here to optimize _updateAuthorInputQueues? singleFuture(this, onError: _onError, () async { // Update queues final activeInputQueues = await _updateAuthorInputQueues(); // Process all input queues together await _outputCubit .operate((reconciledArray) async => _reconcileInputQueues( reconciledArray: reconciledArray, activeInputQueues: activeInputQueues, )); }); } //////////////////////////////////////////////////////////////////////////// // Prepare author input queues by removing dead ones and adding new ones // Queues that are empty are not added until they have something in them // Active input queues with a current message are returned in a list Future> _updateAuthorInputQueues() async { // Remove any dead input queues final deadQueues = []; for (final author in _inputQueues.keys) { if (!_inputSources.containsKey(author)) { deadQueues.add(author); } } for (final author in deadQueues) { _inputQueues.remove(author); _outputPositions.remove(author); } await _outputCubit.operate((outputArray) async { final dws = DelayedWaitSet(); for (final kv in _inputSources.entries) { final author = kv.key; final inputSource = kv.value; final iqExisting = _inputQueues[author]; if (iqExisting == null || iqExisting.inputSource != inputSource) { dws.add((_) async { try { await _enqueueAuthorInput( author: author, inputSource: inputSource, outputArray: outputArray); // Catch everything so we can avoid ParallelWaitError // ignore: avoid_catches_without_on_clauses } catch (e, st) { log.error('Exception updating author input queue: $e:\n$st\n'); _inputQueues.remove(author); _outputPositions.remove(author); } }); } } await dws(); }); // Get the active input queues final activeInputQueues = await _inputQueues.entries .map((entry) async { if (await entry.value.getCurrentMessage() != null) { return entry.value; } else { return null; } }) .toList() .wait ..removeNulls(); return activeInputQueues.cast(); } // Set up a single author's message reconciliation Future _enqueueAuthorInput( {required TypedKey author, required AuthorInputSource inputSource, required TableDBArrayProtobuf outputArray}) async { // Get the position of our most recent reconciled message from this author final outputPosition = await _findLastOutputPosition(author: author, outputArray: outputArray); // Find oldest message we have not yet reconciled final inputQueue = await AuthorInputQueue.create( author: author, inputSource: inputSource, previousMessage: outputPosition?.message.content, onError: _onError, ); if (inputQueue != null) { _inputQueues[author] = inputQueue; _outputPositions[author] = outputPosition; } else { _inputQueues.remove(author); _outputPositions.remove(author); } } // Get the position of our most recent reconciled message from this author // XXX: For a group chat, this should find when the author // was added to the membership so we don't just go back in time forever Future _findLastOutputPosition( {required TypedKey author, required TableDBArrayProtobuf outputArray}) async { var pos = outputArray.length - 1; while (pos >= 0) { final message = await outputArray.get(pos); if (message.content.author.toVeilid() == author) { return OutputPosition(message, pos); } pos--; } return null; } // Process a list of author input queues and insert their messages // into the output array, performing validation steps along the way Future _reconcileInputQueues({ required TableDBArrayProtobuf reconciledArray, required List activeInputQueues, }) async { // Ensure we have active queues to process if (activeInputQueues.isEmpty) { return; } // Sort queues from earliest to latest and then by author // to ensure a deterministic insert order activeInputQueues.sort((a, b) { final aout = _outputPositions[a.author]; final bout = _outputPositions[b.author]; final acmp = aout?.pos ?? -1; final bcmp = bout?.pos ?? -1; if (acmp == bcmp) { return a.author.toString().compareTo(b.author.toString()); } return acmp.compareTo(bcmp); }); // Start at the earliest position we know about in all the queues var currentOutputPosition = _outputPositions[activeInputQueues.first.author]; final toInsert = SortedList(proto.MessageExt.compareTimestamp); while (activeInputQueues.isNotEmpty) { // Get up to '_maxReconcileChunk' of the items from the queues // that we can insert at this location bool added; do { added = false; final emptyQueues = {}; for (final inputQueue in activeInputQueues) { final inputCurrent = await inputQueue.getCurrentMessage(); if (inputCurrent == null) { log.error('Active input queue did not have a current message: ' '${inputQueue.author}'); continue; } if (currentOutputPosition == null || inputCurrent.timestamp < currentOutputPosition.message.content.timestamp) { toInsert.add(inputCurrent); added = true; // Advance this queue if (!await inputQueue.advance()) { // Mark queue as empty for removal emptyQueues.add(inputQueue); } } } // Remove finished queues now that we're done iterating activeInputQueues.removeWhere(emptyQueues.contains); if (toInsert.length >= _maxReconcileChunk) { break; } } while (added); // Perform insertions in bulk if (toInsert.isNotEmpty) { final reconciledTime = Veilid.instance.now().toInt64(); // Add reconciled timestamps final reconciledInserts = toInsert .map((message) => proto.ReconciledMessage() ..reconciledTime = reconciledTime ..content = message) .toList(); // Figure out where to insert the reconciled messages final insertPos = currentOutputPosition?.pos ?? reconciledArray.length; // Insert them all at once await reconciledArray.insertAll(insertPos, reconciledInserts); // Update output positions for input queues final updatePositions = _outputPositions.keys.toSet(); var outputPos = insertPos + reconciledInserts.length; for (final inserted in reconciledInserts.reversed) { if (updatePositions.isEmpty) { // Last seen positions already recorded for each active author break; } outputPos--; final author = inserted.content.author.toVeilid(); if (updatePositions.contains(author)) { _outputPositions[author] = OutputPosition(inserted, outputPos); updatePositions.remove(author); } } toInsert.clear(); } else { // If there's nothing to insert at this position move to the next one final nextOutputPos = (currentOutputPosition != null) ? currentOutputPosition.pos + 1 : reconciledArray.length; if (nextOutputPos == reconciledArray.length) { currentOutputPosition = null; } else { currentOutputPosition = OutputPosition( await reconciledArray.get(nextOutputPos), nextOutputPos); } } } } //////////////////////////////////////////////////////////////////////////// final Map _inputSources = {}; final Map _inputQueues = {}; final Map _outputPositions = {}; final TableDBArrayProtobufCubit _outputCubit; final void Function(Object, StackTrace?) _onError; static const int _maxReconcileChunk = 65536; }