simplify reconciliation

This commit is contained in:
Christien Rioux 2025-04-19 22:21:40 -04:00
parent bf38c2c44d
commit 4797184a1a
12 changed files with 512 additions and 345 deletions

View File

@ -15,6 +15,7 @@ import '../../account_manager/account_manager.dart';
import '../../contacts/contacts.dart';
import '../../conversation/conversation.dart';
import '../../proto/proto.dart' as proto;
import '../../tools/tools.dart';
import '../models/chat_component_state.dart';
import '../models/message_state.dart';
import '../models/window_state.dart';
@ -383,13 +384,13 @@ class ChatComponentCubit extends Cubit<ChatComponentState> {
if (chatMessage == null) {
continue;
}
chatMessages.insert(0, chatMessage);
if (!tsSet.add(chatMessage.id)) {
// ignore: avoid_print
print('duplicate id found: ${chatMessage.id}:\n'
'Messages:\n${messagesState.window}\n'
'ChatMessages:\n$chatMessages');
assert(false, 'should not have duplicate id');
log.error('duplicate id found: ${chatMessage.id}'
// '\nMessages:\n${messagesState.window}'
// '\nChatMessages:\n$chatMessages'
);
} else {
chatMessages.insert(0, chatMessage);
}
}
return currentState.copyWith(

View File

@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:math';
import 'package:veilid_support/veilid_support.dart';
import '../../../proto/proto.dart' as proto;
@ -6,106 +7,127 @@ import '../../../proto/proto.dart' as proto;
import '../../../tools/tools.dart';
import 'author_input_source.dart';
import 'message_integrity.dart';
import 'output_position.dart';
class AuthorInputQueue {
AuthorInputQueue._({
required TypedKey author,
required AuthorInputSource inputSource,
required OutputPosition? outputPosition,
required int inputPosition,
required proto.Message? previousMessage,
required void Function(Object, StackTrace?) onError,
required MessageIntegrity messageIntegrity,
}) : _author = author,
_onError = onError,
_inputSource = inputSource,
_outputPosition = outputPosition,
_lastMessage = outputPosition?.message.content,
_previousMessage = previousMessage,
_messageIntegrity = messageIntegrity,
_currentPosition = inputSource.currentWindow.last;
_inputPosition = inputPosition;
static Future<AuthorInputQueue?> create({
required TypedKey author,
required AuthorInputSource inputSource,
required OutputPosition? outputPosition,
required proto.Message? previousMessage,
required void Function(Object, StackTrace?) onError,
}) async {
// Get ending input position
final inputPosition = await inputSource.getTailPosition() - 1;
// Create an input queue for the input source
final queue = AuthorInputQueue._(
author: author,
inputSource: inputSource,
outputPosition: outputPosition,
inputPosition: inputPosition,
previousMessage: previousMessage,
onError: onError,
messageIntegrity: await MessageIntegrity.create(author: author));
if (!await queue._findStartOfWork()) {
// Rewind the queue's 'inputPosition' to the first unreconciled message
if (!await queue._rewindInputToAfterLastMessage()) {
return null;
}
return queue;
}
////////////////////////////////////////////////////////////////////////////
// Public interface
// Check if there are no messages left in this queue to reconcile
bool get isDone => _isDone;
/// Get the input source for this queue
AuthorInputSource get inputSource => _inputSource;
// Get the current message that needs reconciliation
proto.Message? get current => _currentMessage;
// Get the earliest output position to start inserting
OutputPosition? get outputPosition => _outputPosition;
// Get the author of this queue
/// Get the author of this queue
TypedKey get author => _author;
// Remove a reconciled message and move to the next message
// Returns true if there is more work to do
Future<bool> consume() async {
if (_isDone) {
/// Get the current message that needs reconciliation
Future<proto.Message?> getCurrentMessage() async {
try {
// if we have a current message already, return it
if (_currentMessage != null) {
return _currentMessage;
}
// Get the window
final currentWindow = await _updateWindow(clampInputPosition: false);
if (currentWindow == null) {
return null;
}
final currentElement =
currentWindow.elements[_inputPosition - currentWindow.firstPosition];
return _currentMessage = currentElement.value;
// Catch everything so we can avoid ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
log.error('Exception getting current message: $e:\n$st\n');
_currentMessage = null;
return null;
}
}
/// Remove a reconciled message and move to the next message
/// Returns true if there is more work to do
Future<bool> advance() async {
final currentMessage = await getCurrentMessage();
if (currentMessage == null) {
return false;
}
while (true) {
_lastMessage = _currentMessage;
_currentPosition++;
// Move current message to previous
_previousMessage = _currentMessage;
_currentMessage = null;
while (true) {
// Advance to next position
_inputPosition++;
// Get more window if we need to
if (!await _updateWindow()) {
// Window is not available so this queue can't work right now
_isDone = true;
final currentMessage = await getCurrentMessage();
if (currentMessage == null) {
return false;
}
final nextMessage = _inputSource.currentWindow
.elements[_currentPosition - _inputSource.currentWindow.first];
// Drop the 'offline' elements because we don't reconcile
// anything until it has been confirmed to be committed to the DHT
// if (nextMessage.isOffline) {
// continue;
// }
if (_lastMessage != null) {
if (_previousMessage != null) {
// Ensure the timestamp is not moving backward
if (nextMessage.value.timestamp < _lastMessage!.timestamp) {
log.warning('timestamp backward: ${nextMessage.value.timestamp}'
' < ${_lastMessage!.timestamp}');
if (currentMessage.timestamp < _previousMessage!.timestamp) {
log.warning('timestamp backward: ${currentMessage.timestamp}'
' < ${_previousMessage!.timestamp}');
continue;
}
}
// Verify the id chain for the message
final matchId = await _messageIntegrity.generateMessageId(_lastMessage);
if (matchId.compare(nextMessage.value.idBytes) != 0) {
log.warning(
'id chain invalid: $matchId != ${nextMessage.value.idBytes}');
final matchId =
await _messageIntegrity.generateMessageId(_previousMessage);
if (matchId.compare(currentMessage.idBytes) != 0) {
log.warning('id chain invalid: $matchId != ${currentMessage.idBytes}');
continue;
}
// Verify the signature for the message
if (!await _messageIntegrity.verifyMessage(nextMessage.value)) {
log.warning('invalid message signature: ${nextMessage.value}');
if (!await _messageIntegrity.verifyMessage(currentMessage)) {
log.warning('invalid message signature: $currentMessage');
continue;
}
_currentMessage = nextMessage.value;
break;
}
return true;
@ -114,106 +136,166 @@ class AuthorInputQueue {
////////////////////////////////////////////////////////////////////////////
// Internal implementation
// Walk backward from the tail of the input queue to find the first
// message newer than our last reconciled message from this author
// Returns false if no work is needed
Future<bool> _findStartOfWork() async {
/// Walk backward from the tail of the input queue to find the first
/// message newer than our last reconciled message from this author
/// Returns false if no work is needed
Future<bool> _rewindInputToAfterLastMessage() async {
// Iterate windows over the inputSource
InputWindow? currentWindow;
outer:
while (true) {
// Get more window if we need to
currentWindow = await _updateWindow(clampInputPosition: true);
if (currentWindow == null) {
// Window is not available or things are empty so this
// queue can't work right now
return false;
}
// Iterate through current window backward
for (var i = _inputSource.currentWindow.elements.length - 1;
i >= 0 && _currentPosition >= 0;
i--, _currentPosition--) {
final elem = _inputSource.currentWindow.elements[i];
for (var i = currentWindow.elements.length - 1;
i >= 0 && _inputPosition >= 0;
i--, _inputPosition--) {
final elem = currentWindow.elements[i];
// If we've found an input element that is older or same time as our
// last reconciled message for this author, or we find the message
// itself then we stop
if (_lastMessage != null) {
if (_previousMessage != null) {
if (elem.value.authorUniqueIdBytes
.compare(_lastMessage!.authorUniqueIdBytes) ==
.compare(_previousMessage!.authorUniqueIdBytes) ==
0 ||
elem.value.timestamp <= _lastMessage!.timestamp) {
elem.value.timestamp <= _previousMessage!.timestamp) {
break outer;
}
}
}
// If we're at the beginning of the inputSource then we stop
if (_currentPosition < 0) {
if (_inputPosition < 0) {
break;
}
// Get more window if we need to
if (!await _updateWindow()) {
// Window is not available or things are empty so this
// queue can't work right now
_isDone = true;
return false;
}
}
// _currentPosition points to either before the input source starts
// _inputPosition points to either before the input source starts
// or the position of the previous element. We still need to set the
// _currentMessage to the previous element so consume() can compare
// against it if we can.
if (_currentPosition >= 0) {
_currentMessage = _inputSource.currentWindow
.elements[_currentPosition - _inputSource.currentWindow.first].value;
if (_inputPosition >= 0) {
_currentMessage = currentWindow
.elements[_inputPosition - currentWindow.firstPosition].value;
}
// After this consume(), the currentPosition and _currentMessage should
// After this advance(), the _inputPosition and _currentMessage should
// be equal to the first message to process and the current window to
// process should not be empty
return consume();
// process should not be empty if there is work to do
return advance();
}
// Slide the window toward the current position and load the batch around it
Future<bool> _updateWindow() async {
/// Slide the window toward the current position and load the batch around it
Future<InputWindow?> _updateWindow({required bool clampInputPosition}) async {
final inputTailPosition = await _inputSource.getTailPosition();
if (inputTailPosition == 0) {
return null;
}
// Handle out-of-range input position
if (clampInputPosition) {
_inputPosition = min(max(_inputPosition, 0), inputTailPosition - 1);
} else if (_inputPosition < 0 || _inputPosition >= inputTailPosition) {
return null;
}
// Check if we are still in the window
if (_currentPosition >= _inputSource.currentWindow.first &&
_currentPosition <= _inputSource.currentWindow.last) {
return true;
final currentWindow = _currentWindow;
int firstPosition;
int lastPosition;
if (currentWindow != null) {
firstPosition = currentWindow.firstPosition;
lastPosition = currentWindow.lastPosition;
// Slide the window if we need to
if (_inputPosition >= firstPosition && _inputPosition <= lastPosition) {
return currentWindow;
} else if (_inputPosition < firstPosition) {
// Slide it backward, current position is now last
firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0);
lastPosition = _inputPosition;
} else if (_inputPosition > lastPosition) {
// Slide it forward, current position is now first
firstPosition = _inputPosition;
lastPosition =
min((_inputPosition + _maxWindowLength) - 1, inputTailPosition - 1);
}
} else {
// need a new window, start with the input position at the end
lastPosition = _inputPosition;
firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0);
}
// Get another input batch futher back
final avOk =
await _inputSource.updateWindow(_currentPosition, _maxWindowLength);
final avCurrentWindow = await _inputSource.getWindow(
firstPosition, lastPosition - firstPosition + 1);
final asErr = avOk.asError;
final asErr = avCurrentWindow.asError;
if (asErr != null) {
_onError(asErr.error, asErr.stackTrace);
return false;
_currentWindow = null;
return null;
}
final asLoading = avOk.asLoading;
final asLoading = avCurrentWindow.asLoading;
if (asLoading != null) {
// xxx: no need to block the cubit here for this
// xxx: might want to switch to a 'busy' state though
// xxx: to let the messages view show a spinner at the bottom
// xxx: while we reconcile...
// emit(const AsyncValue.loading());
return false;
_currentWindow = null;
return null;
}
return avOk.asData!.value;
final nextWindow = avCurrentWindow.asData!.value;
if (nextWindow == null || nextWindow.length == 0) {
_currentWindow = null;
return null;
}
// Handle out-of-range input position
// Doing this again because getWindow is allowed to return a smaller
// window than the one requested, possibly due to DHT consistency
// fluctuations and race conditions
if (clampInputPosition) {
_inputPosition = min(max(_inputPosition, nextWindow.firstPosition),
nextWindow.lastPosition);
} else if (_inputPosition < nextWindow.firstPosition ||
_inputPosition > nextWindow.lastPosition) {
return null;
}
return _currentWindow = nextWindow;
}
////////////////////////////////////////////////////////////////////////////
/// The author of this messages in the input source
final TypedKey _author;
/// The input source we're pulling messages from
final AuthorInputSource _inputSource;
final OutputPosition? _outputPosition;
/// What to call if an error happens
final void Function(Object, StackTrace?) _onError;
/// The message integrity validator
final MessageIntegrity _messageIntegrity;
// The last message we've consumed
proto.Message? _lastMessage;
// The current position in the input log that we are looking at
int _currentPosition;
// The current message we're looking at
proto.Message? _currentMessage;
// If we have reached the end
bool _isDone = false;
/// The last message we reconciled/output
proto.Message? _previousMessage;
// Desired maximum window length
/// The current message we're looking at
proto.Message? _currentMessage;
/// The current position in the input source that we are looking at
int _inputPosition;
/// The current input window from the InputSource;
InputWindow? _currentWindow;
/// Desired maximum window length
static const int _maxWindowLength = 256;
}

View File

@ -9,64 +9,68 @@ import '../../../proto/proto.dart' as proto;
@immutable
class InputWindow {
const InputWindow(
{required this.elements, required this.first, required this.last});
const InputWindow({required this.elements, required this.firstPosition})
: lastPosition = firstPosition + elements.length - 1,
isEmpty = elements.length == 0,
length = elements.length;
final IList<OnlineElementState<proto.Message>> elements;
final int first;
final int last;
final int firstPosition;
final int lastPosition;
final bool isEmpty;
final int length;
}
class AuthorInputSource {
AuthorInputSource.fromCubit(
{required DHTLogStateData<proto.Message> cubitState,
required this.cubit}) {
_currentWindow = InputWindow(
elements: cubitState.window,
first: (cubitState.windowTail - cubitState.window.length) %
cubitState.length,
last: (cubitState.windowTail - 1) % cubitState.length);
}
AuthorInputSource.fromDHTLog(DHTLog dhtLog) : _dhtLog = dhtLog;
////////////////////////////////////////////////////////////////////////////
InputWindow get currentWindow => _currentWindow;
Future<int> getTailPosition() async =>
_dhtLog.operate((reader) async => reader.length);
Future<AsyncValue<bool>> updateWindow(
int currentPosition, int windowLength) async =>
cubit.operate((reader) async {
// See if we're beyond the input source
if (currentPosition < 0 || currentPosition >= reader.length) {
return const AsyncValue.data(false);
}
// Slide the window if we need to
var first = _currentWindow.first;
var last = _currentWindow.last;
if (currentPosition < first) {
// Slide it backward, current position is now last
first = max((currentPosition - windowLength) + 1, 0);
last = currentPosition;
} else if (currentPosition > last) {
// Slide it forward, current position is now first
first = currentPosition;
last = min((currentPosition + windowLength) - 1, reader.length - 1);
} else {
return const AsyncValue.data(true);
Future<AsyncValue<InputWindow?>> getWindow(
int startPosition, int windowLength) async =>
_dhtLog.operate((reader) async {
// Don't allow negative length
if (windowLength <= 0) {
return const AsyncValue.data(null);
}
// Trim if we're beyond input source
var endPosition = startPosition + windowLength - 1;
startPosition = max(startPosition, 0);
endPosition = max(endPosition, 0);
// Get another input batch futher back
final nextWindow = await cubit.loadElementsFromReader(
reader, last + 1, (last + 1) - first);
if (nextWindow == null) {
return const AsyncValue.loading();
try {
Set<int>? offlinePositions;
if (_dhtLog.writer != null) {
offlinePositions = await reader.getOfflinePositions();
}
final messages = await reader.getRangeProtobuf(
proto.Message.fromBuffer, startPosition,
length: endPosition - startPosition + 1);
if (messages == null) {
return const AsyncValue.loading();
}
final elements = messages.indexed
.map((x) => OnlineElementState(
value: x.$2,
isOffline: offlinePositions?.contains(x.$1 + startPosition) ??
false))
.toIList();
final window =
InputWindow(elements: elements, firstPosition: startPosition);
return AsyncValue.data(window);
} on Exception catch (e, st) {
return AsyncValue.error(e, st);
}
_currentWindow =
InputWindow(elements: nextWindow, first: first, last: last);
return const AsyncValue.data(true);
});
////////////////////////////////////////////////////////////////////////////
final DHTLogCubit<proto.Message> cubit;
late InputWindow _currentWindow;
final DHTLog _dhtLog;
}

View File

@ -20,66 +20,113 @@ class MessageReconciliation {
////////////////////////////////////////////////////////////////////////////
void reconcileMessages(
TypedKey author,
DHTLogStateData<proto.Message> inputMessagesCubitState,
DHTLogCubit<proto.Message> inputMessagesCubit) {
if (inputMessagesCubitState.window.isEmpty) {
return;
}
void addInputSourceFromDHTLog(TypedKey author, DHTLog inputMessagesDHTLog) {
_inputSources[author] = AuthorInputSource.fromDHTLog(inputMessagesDHTLog);
}
_inputSources[author] = AuthorInputSource.fromCubit(
cubitState: inputMessagesCubitState, cubit: inputMessagesCubit);
void reconcileMessages(TypedKey? author) {
// xxx: can we use 'author' here to optimize _updateAuthorInputQueues?
singleFuture(this, onError: _onError, () async {
// Take entire list of input sources we have currently and process them
final inputSources = _inputSources;
_inputSources = {};
final inputFuts = <Future<AuthorInputQueue?>>[];
for (final kv in inputSources.entries) {
final author = kv.key;
final inputSource = kv.value;
inputFuts
.add(_enqueueAuthorInput(author: author, inputSource: inputSource));
}
final inputQueues = await inputFuts.wait;
// Make this safe to cast by removing inputs that were rejected or empty
inputQueues.removeNulls();
// Update queues
final activeInputQueues = await _updateAuthorInputQueues();
// Process all input queues together
await _outputCubit
.operate((reconciledArray) async => _reconcileInputQueues(
reconciledArray: reconciledArray,
inputQueues: inputQueues.cast<AuthorInputQueue>(),
activeInputQueues: activeInputQueues,
));
});
}
////////////////////////////////////////////////////////////////////////////
// Set up a single author's message reconciliation
Future<AuthorInputQueue?> _enqueueAuthorInput(
{required TypedKey author,
required AuthorInputSource inputSource}) async {
try {
// Get the position of our most recent reconciled message from this author
final outputPosition = await _findLastOutputPosition(author: author);
// Prepare author input queues by removing dead ones and adding new ones
// Queues that are empty are not added until they have something in them
// Active input queues with a current message are returned in a list
Future<List<AuthorInputQueue>> _updateAuthorInputQueues() async {
// Remove any dead input queues
final deadQueues = <TypedKey>[];
for (final author in _inputQueues.keys) {
if (!_inputSources.containsKey(author)) {
deadQueues.add(author);
}
}
for (final author in deadQueues) {
_inputQueues.remove(author);
_outputPositions.remove(author);
}
// Find oldest message we have not yet reconciled
final inputQueue = await AuthorInputQueue.create(
author: author,
inputSource: inputSource,
outputPosition: outputPosition,
onError: _onError,
);
return inputQueue;
// Catch everything so we can avoid ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
log.error('Exception enqueing author input: $e:\n$st\n');
return null;
await _outputCubit.operate((outputArray) async {
final dws = DelayedWaitSet<void, void>();
for (final kv in _inputSources.entries) {
final author = kv.key;
final inputSource = kv.value;
final iqExisting = _inputQueues[author];
if (iqExisting == null || iqExisting.inputSource != inputSource) {
dws.add((_) async {
try {
await _enqueueAuthorInput(
author: author,
inputSource: inputSource,
outputArray: outputArray);
// Catch everything so we can avoid ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
log.error('Exception updating author input queue: $e:\n$st\n');
_inputQueues.remove(author);
_outputPositions.remove(author);
}
});
}
}
await dws();
});
// Get the active input queues
final activeInputQueues = await _inputQueues.entries
.map((entry) async {
if (await entry.value.getCurrentMessage() != null) {
return entry.value;
} else {
return null;
}
})
.toList()
.wait
..removeNulls();
return activeInputQueues.cast<AuthorInputQueue>();
}
// Set up a single author's message reconciliation
Future<void> _enqueueAuthorInput(
{required TypedKey author,
required AuthorInputSource inputSource,
required TableDBArrayProtobuf<proto.ReconciledMessage>
outputArray}) async {
// Get the position of our most recent reconciled message from this author
final outputPosition =
await _findLastOutputPosition(author: author, outputArray: outputArray);
// Find oldest message we have not yet reconciled
final inputQueue = await AuthorInputQueue.create(
author: author,
inputSource: inputSource,
previousMessage: outputPosition?.message.content,
onError: _onError,
);
if (inputQueue != null) {
_inputQueues[author] = inputQueue;
_outputPositions[author] = outputPosition;
} else {
_inputQueues.remove(author);
_outputPositions.remove(author);
}
}
@ -87,36 +134,38 @@ class MessageReconciliation {
// XXX: For a group chat, this should find when the author
// was added to the membership so we don't just go back in time forever
Future<OutputPosition?> _findLastOutputPosition(
{required TypedKey author}) async =>
_outputCubit.operate((arr) async {
var pos = arr.length - 1;
while (pos >= 0) {
final message = await arr.get(pos);
if (message.content.author.toVeilid() == author) {
return OutputPosition(message, pos);
}
pos--;
}
return null;
});
{required TypedKey author,
required TableDBArrayProtobuf<proto.ReconciledMessage>
outputArray}) async {
var pos = outputArray.length - 1;
while (pos >= 0) {
final message = await outputArray.get(pos);
if (message.content.author.toVeilid() == author) {
return OutputPosition(message, pos);
}
pos--;
}
return null;
}
// Process a list of author input queues and insert their messages
// into the output array, performing validation steps along the way
Future<void> _reconcileInputQueues({
required TableDBArrayProtobuf<proto.ReconciledMessage> reconciledArray,
required List<AuthorInputQueue> inputQueues,
required List<AuthorInputQueue> activeInputQueues,
}) async {
// Ensure queues all have something to do
inputQueues.removeWhere((q) => q.isDone);
if (inputQueues.isEmpty) {
// Ensure we have active queues to process
if (activeInputQueues.isEmpty) {
return;
}
// Sort queues from earliest to latest and then by author
// to ensure a deterministic insert order
inputQueues.sort((a, b) {
final acmp = a.outputPosition?.pos ?? -1;
final bcmp = b.outputPosition?.pos ?? -1;
activeInputQueues.sort((a, b) {
final aout = _outputPositions[a.author];
final bout = _outputPositions[b.author];
final acmp = aout?.pos ?? -1;
final bcmp = bout?.pos ?? -1;
if (acmp == bcmp) {
return a.author.toString().compareTo(b.author.toString());
}
@ -124,21 +173,28 @@ class MessageReconciliation {
});
// Start at the earliest position we know about in all the queues
var currentOutputPosition = inputQueues.first.outputPosition;
var currentOutputPosition =
_outputPositions[activeInputQueues.first.author];
final toInsert =
SortedList<proto.Message>(proto.MessageExt.compareTimestamp);
while (inputQueues.isNotEmpty) {
while (activeInputQueues.isNotEmpty) {
// Get up to '_maxReconcileChunk' of the items from the queues
// that we can insert at this location
bool added;
do {
added = false;
var someQueueEmpty = false;
for (final inputQueue in inputQueues) {
final inputCurrent = inputQueue.current!;
final emptyQueues = <AuthorInputQueue>{};
for (final inputQueue in activeInputQueues) {
final inputCurrent = await inputQueue.getCurrentMessage();
if (inputCurrent == null) {
log.error('Active input queue did not have a current message: '
'${inputQueue.author}');
continue;
}
if (currentOutputPosition == null ||
inputCurrent.timestamp <
currentOutputPosition.message.content.timestamp) {
@ -146,16 +202,14 @@ class MessageReconciliation {
added = true;
// Advance this queue
if (!await inputQueue.consume()) {
// Queue is empty now, run a queue purge
someQueueEmpty = true;
if (!await inputQueue.advance()) {
// Mark queue as empty for removal
emptyQueues.add(inputQueue);
}
}
}
// Remove empty queues now that we're done iterating
if (someQueueEmpty) {
inputQueues.removeWhere((q) => q.isDone);
}
// Remove finished queues now that we're done iterating
activeInputQueues.removeWhere(emptyQueues.contains);
if (toInsert.length >= _maxReconcileChunk) {
break;
@ -173,9 +227,27 @@ class MessageReconciliation {
..content = message)
.toList();
await reconciledArray.insertAll(
currentOutputPosition?.pos ?? reconciledArray.length,
reconciledInserts);
// Figure out where to insert the reconciled messages
final insertPos = currentOutputPosition?.pos ?? reconciledArray.length;
// Insert them all at once
await reconciledArray.insertAll(insertPos, reconciledInserts);
// Update output positions for input queues
final updatePositions = _outputPositions.keys.toSet();
var outputPos = insertPos + reconciledInserts.length;
for (final inserted in reconciledInserts.reversed) {
if (updatePositions.isEmpty) {
// Last seen positions already recorded for each active author
break;
}
outputPos--;
final author = inserted.content.author.toVeilid();
if (updatePositions.contains(author)) {
_outputPositions[author] = OutputPosition(inserted, outputPos);
updatePositions.remove(author);
}
}
toInsert.clear();
} else {
@ -195,7 +267,9 @@ class MessageReconciliation {
////////////////////////////////////////////////////////////////////////////
Map<TypedKey, AuthorInputSource> _inputSources = {};
final Map<TypedKey, AuthorInputSource> _inputSources = {};
final Map<TypedKey, AuthorInputQueue> _inputQueues = {};
final Map<TypedKey, OutputPosition?> _outputPositions = {};
final TableDBArrayProtobufCubit<proto.ReconciledMessage> _outputCubit;
final void Function(Object, StackTrace?) _onError;

View File

@ -77,8 +77,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
await _sentSubscription?.cancel();
await _rcvdSubscription?.cancel();
await _reconciledSubscription?.cancel();
await _sentMessagesCubit?.close();
await _rcvdMessagesCubit?.close();
await _sentMessagesDHTLog?.close();
await _rcvdMessagesDHTLog?.close();
await _reconciledMessagesCubit?.close();
// If the local conversation record is gone, then delete the reconciled
@ -111,10 +111,10 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
await _initReconciledMessagesCubit();
// Local messages key
await _initSentMessagesCubit();
await _initSentMessagesDHTLog();
// Remote messages key
await _initRcvdMessagesCubit();
await _initRcvdMessagesDHTLog();
// Command execution background process
_commandRunnerFut = Future.delayed(Duration.zero, _commandRunner);
@ -129,39 +129,40 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}
// Open local messages key
Future<void> _initSentMessagesCubit() async {
Future<void> _initSentMessagesDHTLog() async {
final writer = _accountInfo.identityWriter;
_sentMessagesCubit = DHTLogCubit(
open: () async => DHTLog.openWrite(_localMessagesRecordKey, writer,
final sentMessagesDHTLog =
await DHTLog.openWrite(_localMessagesRecordKey, writer,
debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::'
'SentMessages',
parent: _localConversationRecordKey,
crypto: _conversationCrypto),
decodeElement: proto.Message.fromBuffer);
_sentSubscription =
_sentMessagesCubit!.stream.listen(_updateSentMessagesState);
_updateSentMessagesState(_sentMessagesCubit!.state);
crypto: _conversationCrypto);
_sentSubscription = await sentMessagesDHTLog.listen(_updateSentMessages);
_sentMessagesDHTLog = sentMessagesDHTLog;
_reconciliation.addInputSourceFromDHTLog(
_accountInfo.identityTypedPublicKey, sentMessagesDHTLog);
}
// Open remote messages key
Future<void> _initRcvdMessagesCubit() async {
Future<void> _initRcvdMessagesDHTLog() async {
// Don't bother if we don't have a remote messages record key yet
if (_remoteMessagesRecordKey == null) {
return;
}
// Open new cubit if one is desired
_rcvdMessagesCubit = DHTLogCubit(
open: () async => DHTLog.openRead(_remoteMessagesRecordKey!,
debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::'
'RcvdMessages',
parent: _remoteConversationRecordKey,
crypto: _conversationCrypto),
decodeElement: proto.Message.fromBuffer);
_rcvdSubscription =
_rcvdMessagesCubit!.stream.listen(_updateRcvdMessagesState);
_updateRcvdMessagesState(_rcvdMessagesCubit!.state);
final rcvdMessagesDHTLog = await DHTLog.openRead(_remoteMessagesRecordKey!,
debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::'
'RcvdMessages',
parent: _remoteConversationRecordKey,
crypto: _conversationCrypto);
_rcvdSubscription = await rcvdMessagesDHTLog.listen(_updateRcvdMessages);
_rcvdMessagesDHTLog = rcvdMessagesDHTLog;
_reconciliation.addInputSourceFromDHTLog(
_remoteIdentityPublicKey, rcvdMessagesDHTLog);
}
Future<void> updateRemoteMessagesRecordKey(
@ -175,17 +176,17 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
return;
}
// Close existing cubit if we have one
final rcvdMessagesCubit = _rcvdMessagesCubit;
_rcvdMessagesCubit = null;
// Close existing DHTLog if we have one
final rcvdMessagesDHTLog = _rcvdMessagesDHTLog;
_rcvdMessagesDHTLog = null;
_remoteMessagesRecordKey = null;
await _rcvdSubscription?.cancel();
_rcvdSubscription = null;
await rcvdMessagesCubit?.close();
await rcvdMessagesDHTLog?.close();
// Init the new cubit if we should
// Init the new DHTLog if we should
_remoteMessagesRecordKey = remoteMessagesRecordKey;
await _initRcvdMessagesCubit();
await _initRcvdMessagesDHTLog();
});
}
@ -275,30 +276,15 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
////////////////////////////////////////////////////////////////////////////
// Internal implementation
// Called when the sent messages cubit gets a change
// Called when the sent messages DHTLog gets a change
// This will re-render when messages are sent from another machine
void _updateSentMessagesState(DHTLogBusyState<proto.Message> avmessages) {
final sentMessages = avmessages.state.asData?.value;
if (sentMessages == null) {
return;
}
_reconciliation.reconcileMessages(
_accountInfo.identityTypedPublicKey, sentMessages, _sentMessagesCubit!);
// Update the view
_renderState();
void _updateSentMessages(DHTLogUpdate upd) {
_reconciliation.reconcileMessages(_accountInfo.identityTypedPublicKey);
}
// Called when the received messages cubit gets a change
void _updateRcvdMessagesState(DHTLogBusyState<proto.Message> avmessages) {
final rcvdMessages = avmessages.state.asData?.value;
if (rcvdMessages == null) {
return;
}
_reconciliation.reconcileMessages(
_remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!);
// Called when the received messages DHTLog gets a change
void _updateRcvdMessages(DHTLogUpdate upd) {
_reconciliation.reconcileMessages(_remoteIdentityPublicKey);
}
// Called when the reconciled messages window gets a change
@ -327,7 +313,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// _renderState();
try {
await _sentMessagesCubit!.operateAppendEventual((writer) async {
await _sentMessagesDHTLog!.operateAppendEventual((writer) async {
// Get the previous message if we have one
var previousMessage = writer.length == 0
? null
@ -357,16 +343,17 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Produce a state for this cubit from the input cubits and queues
void _renderState() {
// Get all reconciled messages
// Get all reconciled messages in the cubit window
final reconciledMessages =
_reconciledMessagesCubit?.state.state.asData?.value;
// Get all sent messages
final sentMessages = _sentMessagesCubit?.state.state.asData?.value;
// Get all sent messages that are still offline
//final sentMessages = _sentMessagesDHTLog.
//Get all items in the unsent queue
//final unsentMessages = _unsentMessagesQueue.queue;
// If we aren't ready to render a state, say we're loading
if (reconciledMessages == null || sentMessages == null) {
if (reconciledMessages == null) {
emit(const AsyncLoading());
return;
}
@ -377,11 +364,11 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// keyMapper: (x) => x.content.authorUniqueIdString,
// values: reconciledMessages.windowElements,
// );
final sentMessagesMap =
IMap<String, OnlineElementState<proto.Message>>.fromValues(
keyMapper: (x) => x.value.authorUniqueIdString,
values: sentMessages.window,
);
// final sentMessagesMap =
// IMap<String, OnlineElementState<proto.Message>>.fromValues(
// keyMapper: (x) => x.value.authorUniqueIdString,
// values: sentMessages.window,
// );
// final unsentMessagesMap = IMap<String, proto.Message>.fromValues(
// keyMapper: (x) => x.authorUniqueIdString,
// values: unsentMessages,
@ -393,10 +380,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
final isLocal =
m.content.author.toVeilid() == _accountInfo.identityTypedPublicKey;
final reconciledTimestamp = Timestamp.fromInt64(m.reconciledTime);
final sm =
isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null;
final sent = isLocal && sm != null;
final sentOffline = isLocal && sm != null && sm.isOffline;
//final sm =
//isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null;
//final sent = isLocal && sm != null;
//final sentOffline = isLocal && sm != null && sm.isOffline;
final sent = isLocal;
final sentOffline = false; //
renderedElements.add(RenderStateElement(
message: m.content,
@ -491,16 +480,16 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
late final VeilidCrypto _conversationCrypto;
late final MessageIntegrity _senderMessageIntegrity;
DHTLogCubit<proto.Message>? _sentMessagesCubit;
DHTLogCubit<proto.Message>? _rcvdMessagesCubit;
DHTLog? _sentMessagesDHTLog;
DHTLog? _rcvdMessagesDHTLog;
TableDBArrayProtobufCubit<proto.ReconciledMessage>? _reconciledMessagesCubit;
late final MessageReconciliation _reconciliation;
late final PersistentQueue<proto.Message> _unsentMessagesQueue;
// IList<proto.Message> _sendingMessages = const IList.empty();
StreamSubscription<DHTLogBusyState<proto.Message>>? _sentSubscription;
StreamSubscription<DHTLogBusyState<proto.Message>>? _rcvdSubscription;
StreamSubscription<void>? _sentSubscription;
StreamSubscription<void>? _rcvdSubscription;
StreamSubscription<TableDBArrayProtobufBusyState<proto.ReconciledMessage>>?
_reconciledSubscription;
final StreamController<Future<void> Function()> _commandController;

View File

@ -213,7 +213,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
/// Runs a closure allowing read-only access to the log
Future<T> operate<T>(Future<T> Function(DHTLogReadOperations) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _spine.operate((spine) async {
@ -230,7 +230,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
Future<T> operateAppend<T>(
Future<T> Function(DHTLogWriteOperations) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _spine.operateAppend((spine) async {
@ -249,7 +249,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
Future<T> Function(DHTLogWriteOperations) closure,
{Duration? timeout}) async {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _spine.operateAppendEventual((spine) async {
@ -264,7 +264,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
void Function(DHTLogUpdate) onChanged,
) {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _listenMutex.protect(() async {

View File

@ -112,33 +112,34 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
Future<void> _refreshInner(void Function(AsyncValue<DHTLogStateData<T>>) emit,
{bool forceRefresh = false}) async {
late final int length;
final window = await _log.operate((reader) async {
final windowElements = await _log.operate((reader) async {
length = reader.length;
return loadElementsFromReader(reader, _windowTail, _windowSize);
return _loadElementsFromReader(reader, _windowTail, _windowSize);
});
if (window == null) {
if (windowElements == null) {
setWantsRefresh();
return;
}
emit(AsyncValue.data(DHTLogStateData(
length: length,
window: window,
windowTail: _windowTail,
windowSize: _windowSize,
window: windowElements.$2,
windowTail: windowElements.$1 + windowElements.$2.length,
windowSize: windowElements.$2.length,
follow: _follow)));
setRefreshed();
}
// Tail is one past the last element to load
Future<IList<OnlineElementState<T>>?> loadElementsFromReader(
Future<(int, IList<OnlineElementState<T>>)?> _loadElementsFromReader(
DHTLogReadOperations reader, int tail, int count,
{bool forceRefresh = false}) async {
final length = reader.length;
if (length == 0) {
return const IList.empty();
}
final end = ((tail - 1) % length) + 1;
final start = (count < end) ? end - count : 0;
if (length == 0) {
return (start, IList<OnlineElementState<T>>.empty());
}
// If this is writeable get the offline positions
Set<int>? offlinePositions;
@ -154,8 +155,11 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
value: _decodeElement(x.$2),
isOffline: offlinePositions?.contains(x.$1) ?? false))
.toIList();
if (allItems == null) {
return null;
}
return allItems;
return (start, allItems);
}
void _update(DHTLogUpdate upd) {

View File

@ -21,8 +21,14 @@ class _DHTLogRead implements DHTLogReadOperations {
return null;
}
return lookup.scope((sa) =>
sa.operate((read) => read.get(lookup.pos, forceRefresh: forceRefresh)));
return lookup.scope((sa) => sa.operate((read) async {
if (lookup.pos >= read.length) {
veilidLoggy.error('DHTLog shortarray read @ ${lookup.pos}'
' >= length ${read.length}');
return null;
}
return read.get(lookup.pos, forceRefresh: forceRefresh);
}));
}
(int, int) _clampStartLen(int start, int? len) {
@ -49,7 +55,7 @@ class _DHTLogRead implements DHTLogReadOperations {
.slices(kMaxDHTConcurrency)
.map((chunk) => chunk.map((pos) async {
try {
return get(pos + start, forceRefresh: forceRefresh);
return await get(pos + start, forceRefresh: forceRefresh);
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
@ -59,36 +65,42 @@ class _DHTLogRead implements DHTLogReadOperations {
}));
for (final chunk in chunks) {
final elems = await chunk.wait;
var elems = await chunk.wait;
// If any element was unavailable, return null
if (elems.contains(null)) {
return null;
// Return only the first contiguous range, anything else is garbage
// due to a representational error in the head or shortarray legnth
final nullPos = elems.indexOf(null);
if (nullPos != -1) {
elems = elems.sublist(0, nullPos);
}
out.addAll(elems.cast<Uint8List>());
if (nullPos != -1) {
break;
}
}
return out;
}
@override
Future<Set<int>?> getOfflinePositions() async {
Future<Set<int>> getOfflinePositions() async {
final positionOffline = <int>{};
// Iterate positions backward from most recent
for (var pos = _spine.length - 1; pos >= 0; pos--) {
// Get position
final lookup = await _spine.lookupPosition(pos);
// If position doesn't exist then it definitely wasn't written to offline
if (lookup == null) {
return null;
continue;
}
// Check each segment for offline positions
var foundOffline = false;
final success = await lookup.scope((sa) => sa.operate((read) async {
await lookup.scope((sa) => sa.operate((read) async {
final segmentOffline = await read.getOfflinePositions();
if (segmentOffline == null) {
return false;
}
// For each shortarray segment go through their segment positions
// in reverse order and see if they are offline
@ -102,11 +114,7 @@ class _DHTLogRead implements DHTLogReadOperations {
foundOffline = true;
}
}
return true;
}));
if (!success) {
return null;
}
// If we found nothing offline in this segment then we can stop
if (!foundOffline) {
break;

View File

@ -107,9 +107,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw DHTExceptionInvalidData(
'_DHTLogWrite::add lookup.pos=${lookup.pos} '
'write.length=${write.length}');
await write.truncate(lookup.pos);
}
return write.add(value);
}));

View File

@ -62,9 +62,6 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayCubitState<T>>
Set<int>? offlinePositions;
if (_shortArray.writer != null) {
offlinePositions = await reader.getOfflinePositions();
if (offlinePositions == null) {
return null;
}
}
// Get the items

View File

@ -62,7 +62,7 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
.slices(kMaxDHTConcurrency)
.map((chunk) => chunk.map((pos) async {
try {
return get(pos + start, forceRefresh: forceRefresh);
return await get(pos + start, forceRefresh: forceRefresh);
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
@ -72,13 +72,20 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
}));
for (final chunk in chunks) {
final elems = await chunk.wait;
var elems = await chunk.wait;
// If any element was unavailable, return null
if (elems.contains(null)) {
return null;
// Return only the first contiguous range, anything else is garbage
// due to a representational error in the head or shortarray legnth
final nullPos = elems.indexOf(null);
if (nullPos != -1) {
elems = elems.sublist(0, nullPos);
}
out.addAll(elems.cast<Uint8List>());
if (nullPos != -1) {
break;
}
}
return out;

View File

@ -14,19 +14,22 @@ abstract class DHTRandomRead {
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// Throws an IndexError if the 'pos' is not within the length
/// of the container.
/// of the container. May return null if the item is not available at this
/// time.
Future<Uint8List?> get(int pos, {bool forceRefresh = false});
/// Return a list of a range of items in the DHTArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// Throws an IndexError if either 'start' or '(start+length)' is not within
/// the length of the container.
/// the length of the container. May return fewer items than the length
/// expected if the requested items are not available, but will always
/// return a contiguous range starting at 'start'.
Future<List<Uint8List>?> getRange(int start,
{int? length, bool forceRefresh = false});
/// Get a list of the positions that were written offline and not flushed yet
Future<Set<int>?> getOfflinePositions();
Future<Set<int>> getOfflinePositions();
}
extension DHTRandomReadExt on DHTRandomRead {