diff --git a/lib/chat/cubits/reconciliation/author_input_queue.dart b/lib/chat/cubits/reconciliation/author_input_queue.dart index 009604d..d7be3eb 100644 --- a/lib/chat/cubits/reconciliation/author_input_queue.dart +++ b/lib/chat/cubits/reconciliation/author_input_queue.dart @@ -109,7 +109,7 @@ class AuthorInputQueue { // Internal implementation // Walk backward from the tail of the input queue to find the first - // message newer than our last reconcicled message from this author + // message newer than our last reconciled message from this author // Returns false if no work is needed Future _findStartOfWork() async { // Iterate windows over the inputSource @@ -121,10 +121,14 @@ class AuthorInputQueue { i--, _currentPosition--) { final elem = _inputSource.currentWindow.elements[i]; - // If we've found an input element that is older than our last - // reconciled message for this author, then we stop + // If we've found an input element that is older or same time as our + // last reconciled message for this author, or we find the message + // itself then we stop if (_lastMessage != null) { - if (elem.value.timestamp < _lastMessage!.timestamp) { + if (elem.value.authorUniqueIdBytes + .compare(_lastMessage!.authorUniqueIdBytes) == + 0 || + elem.value.timestamp <= _lastMessage!.timestamp) { break outer; } } diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart index aa53f49..f0b8c4c 100644 --- a/lib/chat/cubits/reconciliation/message_reconciliation.dart +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -75,8 +75,7 @@ class MessageReconciliation { return inputQueue; } - // Get the position of our most recent - // reconciled message from this author + // Get the position of our most recent reconciled message from this author // XXX: For a group chat, this should find when the author // was added to the membership so we don't just go back in time forever Future _findLastOutputPosition( @@ -85,9 +84,6 @@ class MessageReconciliation { var pos = arr.length - 1; while (pos >= 0) { final message = await arr.get(pos); - if (message == null) { - throw StateError('should have gotten last message'); - } if (message.content.author.toVeilid() == author) { return OutputPosition(message, pos); } @@ -120,13 +116,7 @@ class MessageReconciliation { }); // Start at the earliest position we know about in all the queues - final firstOutputPos = inputQueues.first.outputPosition?.pos; - // Get the timestamp for this output position - var currentOutputMessage = firstOutputPos == null - ? null - : await reconciledArray.get(firstOutputPos); - - var currentOutputPos = firstOutputPos ?? 0; + var currentOutputPosition = inputQueues.first.outputPosition; final toInsert = SortedList(proto.MessageExt.compareTimestamp); @@ -141,8 +131,9 @@ class MessageReconciliation { var someQueueEmpty = false; for (final inputQueue in inputQueues) { final inputCurrent = inputQueue.current!; - if (currentOutputMessage == null || - inputCurrent.timestamp < currentOutputMessage.content.timestamp) { + if (currentOutputPosition == null || + inputCurrent.timestamp < + currentOutputPosition.message.content.timestamp) { toInsert.add(inputCurrent); added = true; @@ -174,15 +165,22 @@ class MessageReconciliation { ..content = message) .toList(); - await reconciledArray.insertAll(currentOutputPos, reconciledInserts); + await reconciledArray.insertAll( + currentOutputPosition?.pos ?? reconciledArray.length, + reconciledInserts); toInsert.clear(); } else { // If there's nothing to insert at this position move to the next one - currentOutputPos++; - currentOutputMessage = (currentOutputPos == reconciledArray.length) - ? null - : await reconciledArray.get(currentOutputPos); + final nextOutputPos = (currentOutputPosition != null) + ? currentOutputPosition.pos + 1 + : reconciledArray.length; + if (nextOutputPos == reconciledArray.length) { + currentOutputPosition = null; + } else { + currentOutputPosition = OutputPosition( + await reconciledArray.get(nextOutputPos), nextOutputPos); + } } } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index 6b5665d..bad7f80 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -200,8 +200,12 @@ class _DHTLogSpine { throw TimeoutException('timeout reached'); } } - if (await closure(this)) { - break; + try { + if (await closure(this)) { + break; + } + } on DHTExceptionTryAgain { + // } // Failed to write in closure resets state _head = oldHead; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 501892d..68c2a18 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -139,9 +139,14 @@ class _DHTShortArrayHead { throw TimeoutException('timeout reached'); } } - if (await closure(this)) { - break; + try { + if (await closure(this)) { + break; + } + } on DHTExceptionTryAgain { + // } + // Failed to write in closure resets state _linkedRecords = List.of(oldLinkedRecords); _index = List.of(oldIndex); diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index bc0eca5..ad4c586 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -710,10 +710,10 @@ class TableDBArrayJson extends _TableDBArrayBase { Future insertAll(int pos, List values) async => _insertAll(pos, values.map(jsonEncodeBytes).toList()); - Future get( + Future get( int pos, ) => - _get(pos).then((out) => jsonDecodeOptBytes(_fromJson, out)); + _get(pos).then((out) => jsonDecodeBytes(_fromJson, out)); Future> getRange(int start, [int? end]) => _getRange(start, end).then((out) => out.map(_fromJson).toList()); @@ -773,7 +773,7 @@ class TableDBArrayProtobuf Future insertAll(int pos, List values) async => _insertAll(pos, values.map((x) => x.writeToBuffer()).toList()); - Future get( + Future get( int pos, ) => _get(pos).then(_fromBuffer);