Merge branch 'simplify-watch' into 'main'

Simplify watches and reconciliation

See merge request veilid/veilidchat!49
This commit is contained in:
Christien Rioux 2025-04-21 13:41:31 -04:00
commit db61908d09
27 changed files with 727 additions and 481 deletions

View File

@ -1,2 +1,3 @@
extensions:
- provider: true
- provider: true
- shared_preferences: true

View File

@ -168,7 +168,7 @@ SPEC CHECKSUMS:
sqflite_darwin: 20b2a3a3b70e43edae938624ce550a3cbf66a3d0
system_info_plus: 555ce7047fbbf29154726db942ae785c29211740
url_launcher_ios: 694010445543906933d732453a59da0a173ae33d
veilid: b3b9418ae6b083e662396bfa2c635fb115c8510e
veilid: 3ce560a4f2b568a77a9fd5e23090f2fa97581019
PODFILE CHECKSUM: c8bf5b16c34712d5790b0b8d2472cc66ac0a8487

View File

@ -15,6 +15,7 @@ import '../../account_manager/account_manager.dart';
import '../../contacts/contacts.dart';
import '../../conversation/conversation.dart';
import '../../proto/proto.dart' as proto;
import '../../tools/tools.dart';
import '../models/chat_component_state.dart';
import '../models/message_state.dart';
import '../models/window_state.dart';
@ -383,13 +384,13 @@ class ChatComponentCubit extends Cubit<ChatComponentState> {
if (chatMessage == null) {
continue;
}
chatMessages.insert(0, chatMessage);
if (!tsSet.add(chatMessage.id)) {
// ignore: avoid_print
print('duplicate id found: ${chatMessage.id}:\n'
'Messages:\n${messagesState.window}\n'
'ChatMessages:\n$chatMessages');
assert(false, 'should not have duplicate id');
log.error('duplicate id found: ${chatMessage.id}'
// '\nMessages:\n${messagesState.window}'
// '\nChatMessages:\n$chatMessages'
);
} else {
chatMessages.insert(0, chatMessage);
}
}
return currentState.copyWith(

View File

@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:math';
import 'package:veilid_support/veilid_support.dart';
import '../../../proto/proto.dart' as proto;
@ -6,106 +7,127 @@ import '../../../proto/proto.dart' as proto;
import '../../../tools/tools.dart';
import 'author_input_source.dart';
import 'message_integrity.dart';
import 'output_position.dart';
class AuthorInputQueue {
AuthorInputQueue._({
required TypedKey author,
required AuthorInputSource inputSource,
required OutputPosition? outputPosition,
required int inputPosition,
required proto.Message? previousMessage,
required void Function(Object, StackTrace?) onError,
required MessageIntegrity messageIntegrity,
}) : _author = author,
_onError = onError,
_inputSource = inputSource,
_outputPosition = outputPosition,
_lastMessage = outputPosition?.message.content,
_previousMessage = previousMessage,
_messageIntegrity = messageIntegrity,
_currentPosition = inputSource.currentWindow.last;
_inputPosition = inputPosition;
static Future<AuthorInputQueue?> create({
required TypedKey author,
required AuthorInputSource inputSource,
required OutputPosition? outputPosition,
required proto.Message? previousMessage,
required void Function(Object, StackTrace?) onError,
}) async {
// Get ending input position
final inputPosition = await inputSource.getTailPosition() - 1;
// Create an input queue for the input source
final queue = AuthorInputQueue._(
author: author,
inputSource: inputSource,
outputPosition: outputPosition,
inputPosition: inputPosition,
previousMessage: previousMessage,
onError: onError,
messageIntegrity: await MessageIntegrity.create(author: author));
if (!await queue._findStartOfWork()) {
// Rewind the queue's 'inputPosition' to the first unreconciled message
if (!await queue._rewindInputToAfterLastMessage()) {
return null;
}
return queue;
}
////////////////////////////////////////////////////////////////////////////
// Public interface
// Check if there are no messages left in this queue to reconcile
bool get isDone => _isDone;
/// Get the input source for this queue
AuthorInputSource get inputSource => _inputSource;
// Get the current message that needs reconciliation
proto.Message? get current => _currentMessage;
// Get the earliest output position to start inserting
OutputPosition? get outputPosition => _outputPosition;
// Get the author of this queue
/// Get the author of this queue
TypedKey get author => _author;
// Remove a reconciled message and move to the next message
// Returns true if there is more work to do
Future<bool> consume() async {
if (_isDone) {
/// Get the current message that needs reconciliation
Future<proto.Message?> getCurrentMessage() async {
try {
// if we have a current message already, return it
if (_currentMessage != null) {
return _currentMessage;
}
// Get the window
final currentWindow = await _updateWindow(clampInputPosition: false);
if (currentWindow == null) {
return null;
}
final currentElement =
currentWindow.elements[_inputPosition - currentWindow.firstPosition];
return _currentMessage = currentElement.value;
// Catch everything so we can avoid ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
log.error('Exception getting current message: $e:\n$st\n');
_currentMessage = null;
return null;
}
}
/// Remove a reconciled message and move to the next message
/// Returns true if there is more work to do
Future<bool> advance() async {
final currentMessage = await getCurrentMessage();
if (currentMessage == null) {
return false;
}
while (true) {
_lastMessage = _currentMessage;
_currentPosition++;
// Move current message to previous
_previousMessage = _currentMessage;
_currentMessage = null;
while (true) {
// Advance to next position
_inputPosition++;
// Get more window if we need to
if (!await _updateWindow()) {
// Window is not available so this queue can't work right now
_isDone = true;
final currentMessage = await getCurrentMessage();
if (currentMessage == null) {
return false;
}
final nextMessage = _inputSource.currentWindow
.elements[_currentPosition - _inputSource.currentWindow.first];
// Drop the 'offline' elements because we don't reconcile
// anything until it has been confirmed to be committed to the DHT
// if (nextMessage.isOffline) {
// continue;
// }
if (_lastMessage != null) {
if (_previousMessage != null) {
// Ensure the timestamp is not moving backward
if (nextMessage.value.timestamp < _lastMessage!.timestamp) {
log.warning('timestamp backward: ${nextMessage.value.timestamp}'
' < ${_lastMessage!.timestamp}');
if (currentMessage.timestamp < _previousMessage!.timestamp) {
log.warning('timestamp backward: ${currentMessage.timestamp}'
' < ${_previousMessage!.timestamp}');
continue;
}
}
// Verify the id chain for the message
final matchId = await _messageIntegrity.generateMessageId(_lastMessage);
if (matchId.compare(nextMessage.value.idBytes) != 0) {
log.warning(
'id chain invalid: $matchId != ${nextMessage.value.idBytes}');
final matchId =
await _messageIntegrity.generateMessageId(_previousMessage);
if (matchId.compare(currentMessage.idBytes) != 0) {
log.warning('id chain invalid: $matchId != ${currentMessage.idBytes}');
continue;
}
// Verify the signature for the message
if (!await _messageIntegrity.verifyMessage(nextMessage.value)) {
log.warning('invalid message signature: ${nextMessage.value}');
if (!await _messageIntegrity.verifyMessage(currentMessage)) {
log.warning('invalid message signature: $currentMessage');
continue;
}
_currentMessage = nextMessage.value;
break;
}
return true;
@ -114,106 +136,166 @@ class AuthorInputQueue {
////////////////////////////////////////////////////////////////////////////
// Internal implementation
// Walk backward from the tail of the input queue to find the first
// message newer than our last reconciled message from this author
// Returns false if no work is needed
Future<bool> _findStartOfWork() async {
/// Walk backward from the tail of the input queue to find the first
/// message newer than our last reconciled message from this author
/// Returns false if no work is needed
Future<bool> _rewindInputToAfterLastMessage() async {
// Iterate windows over the inputSource
InputWindow? currentWindow;
outer:
while (true) {
// Get more window if we need to
currentWindow = await _updateWindow(clampInputPosition: true);
if (currentWindow == null) {
// Window is not available or things are empty so this
// queue can't work right now
return false;
}
// Iterate through current window backward
for (var i = _inputSource.currentWindow.elements.length - 1;
i >= 0 && _currentPosition >= 0;
i--, _currentPosition--) {
final elem = _inputSource.currentWindow.elements[i];
for (var i = currentWindow.elements.length - 1;
i >= 0 && _inputPosition >= 0;
i--, _inputPosition--) {
final elem = currentWindow.elements[i];
// If we've found an input element that is older or same time as our
// last reconciled message for this author, or we find the message
// itself then we stop
if (_lastMessage != null) {
if (_previousMessage != null) {
if (elem.value.authorUniqueIdBytes
.compare(_lastMessage!.authorUniqueIdBytes) ==
.compare(_previousMessage!.authorUniqueIdBytes) ==
0 ||
elem.value.timestamp <= _lastMessage!.timestamp) {
elem.value.timestamp <= _previousMessage!.timestamp) {
break outer;
}
}
}
// If we're at the beginning of the inputSource then we stop
if (_currentPosition < 0) {
if (_inputPosition < 0) {
break;
}
// Get more window if we need to
if (!await _updateWindow()) {
// Window is not available or things are empty so this
// queue can't work right now
_isDone = true;
return false;
}
}
// _currentPosition points to either before the input source starts
// _inputPosition points to either before the input source starts
// or the position of the previous element. We still need to set the
// _currentMessage to the previous element so consume() can compare
// against it if we can.
if (_currentPosition >= 0) {
_currentMessage = _inputSource.currentWindow
.elements[_currentPosition - _inputSource.currentWindow.first].value;
if (_inputPosition >= 0) {
_currentMessage = currentWindow
.elements[_inputPosition - currentWindow.firstPosition].value;
}
// After this consume(), the currentPosition and _currentMessage should
// After this advance(), the _inputPosition and _currentMessage should
// be equal to the first message to process and the current window to
// process should not be empty
return consume();
// process should not be empty if there is work to do
return advance();
}
// Slide the window toward the current position and load the batch around it
Future<bool> _updateWindow() async {
/// Slide the window toward the current position and load the batch around it
Future<InputWindow?> _updateWindow({required bool clampInputPosition}) async {
final inputTailPosition = await _inputSource.getTailPosition();
if (inputTailPosition == 0) {
return null;
}
// Handle out-of-range input position
if (clampInputPosition) {
_inputPosition = min(max(_inputPosition, 0), inputTailPosition - 1);
} else if (_inputPosition < 0 || _inputPosition >= inputTailPosition) {
return null;
}
// Check if we are still in the window
if (_currentPosition >= _inputSource.currentWindow.first &&
_currentPosition <= _inputSource.currentWindow.last) {
return true;
final currentWindow = _currentWindow;
int firstPosition;
int lastPosition;
if (currentWindow != null) {
firstPosition = currentWindow.firstPosition;
lastPosition = currentWindow.lastPosition;
// Slide the window if we need to
if (_inputPosition >= firstPosition && _inputPosition <= lastPosition) {
return currentWindow;
} else if (_inputPosition < firstPosition) {
// Slide it backward, current position is now last
firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0);
lastPosition = _inputPosition;
} else if (_inputPosition > lastPosition) {
// Slide it forward, current position is now first
firstPosition = _inputPosition;
lastPosition =
min((_inputPosition + _maxWindowLength) - 1, inputTailPosition - 1);
}
} else {
// need a new window, start with the input position at the end
lastPosition = _inputPosition;
firstPosition = max((_inputPosition - _maxWindowLength) + 1, 0);
}
// Get another input batch futher back
final avOk =
await _inputSource.updateWindow(_currentPosition, _maxWindowLength);
final avCurrentWindow = await _inputSource.getWindow(
firstPosition, lastPosition - firstPosition + 1);
final asErr = avOk.asError;
final asErr = avCurrentWindow.asError;
if (asErr != null) {
_onError(asErr.error, asErr.stackTrace);
return false;
_currentWindow = null;
return null;
}
final asLoading = avOk.asLoading;
final asLoading = avCurrentWindow.asLoading;
if (asLoading != null) {
// xxx: no need to block the cubit here for this
// xxx: might want to switch to a 'busy' state though
// xxx: to let the messages view show a spinner at the bottom
// xxx: while we reconcile...
// emit(const AsyncValue.loading());
return false;
_currentWindow = null;
return null;
}
return avOk.asData!.value;
final nextWindow = avCurrentWindow.asData!.value;
if (nextWindow == null || nextWindow.length == 0) {
_currentWindow = null;
return null;
}
// Handle out-of-range input position
// Doing this again because getWindow is allowed to return a smaller
// window than the one requested, possibly due to DHT consistency
// fluctuations and race conditions
if (clampInputPosition) {
_inputPosition = min(max(_inputPosition, nextWindow.firstPosition),
nextWindow.lastPosition);
} else if (_inputPosition < nextWindow.firstPosition ||
_inputPosition > nextWindow.lastPosition) {
return null;
}
return _currentWindow = nextWindow;
}
////////////////////////////////////////////////////////////////////////////
/// The author of this messages in the input source
final TypedKey _author;
/// The input source we're pulling messages from
final AuthorInputSource _inputSource;
final OutputPosition? _outputPosition;
/// What to call if an error happens
final void Function(Object, StackTrace?) _onError;
/// The message integrity validator
final MessageIntegrity _messageIntegrity;
// The last message we've consumed
proto.Message? _lastMessage;
// The current position in the input log that we are looking at
int _currentPosition;
// The current message we're looking at
proto.Message? _currentMessage;
// If we have reached the end
bool _isDone = false;
/// The last message we reconciled/output
proto.Message? _previousMessage;
// Desired maximum window length
/// The current message we're looking at
proto.Message? _currentMessage;
/// The current position in the input source that we are looking at
int _inputPosition;
/// The current input window from the InputSource;
InputWindow? _currentWindow;
/// Desired maximum window length
static const int _maxWindowLength = 256;
}

View File

@ -9,64 +9,68 @@ import '../../../proto/proto.dart' as proto;
@immutable
class InputWindow {
const InputWindow(
{required this.elements, required this.first, required this.last});
const InputWindow({required this.elements, required this.firstPosition})
: lastPosition = firstPosition + elements.length - 1,
isEmpty = elements.length == 0,
length = elements.length;
final IList<OnlineElementState<proto.Message>> elements;
final int first;
final int last;
final int firstPosition;
final int lastPosition;
final bool isEmpty;
final int length;
}
class AuthorInputSource {
AuthorInputSource.fromCubit(
{required DHTLogStateData<proto.Message> cubitState,
required this.cubit}) {
_currentWindow = InputWindow(
elements: cubitState.window,
first: (cubitState.windowTail - cubitState.window.length) %
cubitState.length,
last: (cubitState.windowTail - 1) % cubitState.length);
}
AuthorInputSource.fromDHTLog(DHTLog dhtLog) : _dhtLog = dhtLog;
////////////////////////////////////////////////////////////////////////////
InputWindow get currentWindow => _currentWindow;
Future<int> getTailPosition() async =>
_dhtLog.operate((reader) async => reader.length);
Future<AsyncValue<bool>> updateWindow(
int currentPosition, int windowLength) async =>
cubit.operate((reader) async {
// See if we're beyond the input source
if (currentPosition < 0 || currentPosition >= reader.length) {
return const AsyncValue.data(false);
}
// Slide the window if we need to
var first = _currentWindow.first;
var last = _currentWindow.last;
if (currentPosition < first) {
// Slide it backward, current position is now last
first = max((currentPosition - windowLength) + 1, 0);
last = currentPosition;
} else if (currentPosition > last) {
// Slide it forward, current position is now first
first = currentPosition;
last = min((currentPosition + windowLength) - 1, reader.length - 1);
} else {
return const AsyncValue.data(true);
Future<AsyncValue<InputWindow?>> getWindow(
int startPosition, int windowLength) async =>
_dhtLog.operate((reader) async {
// Don't allow negative length
if (windowLength <= 0) {
return const AsyncValue.data(null);
}
// Trim if we're beyond input source
var endPosition = startPosition + windowLength - 1;
startPosition = max(startPosition, 0);
endPosition = max(endPosition, 0);
// Get another input batch futher back
final nextWindow = await cubit.loadElementsFromReader(
reader, last + 1, (last + 1) - first);
if (nextWindow == null) {
return const AsyncValue.loading();
try {
Set<int>? offlinePositions;
if (_dhtLog.writer != null) {
offlinePositions = await reader.getOfflinePositions();
}
final messages = await reader.getRangeProtobuf(
proto.Message.fromBuffer, startPosition,
length: endPosition - startPosition + 1);
if (messages == null) {
return const AsyncValue.loading();
}
final elements = messages.indexed
.map((x) => OnlineElementState(
value: x.$2,
isOffline: offlinePositions?.contains(x.$1 + startPosition) ??
false))
.toIList();
final window =
InputWindow(elements: elements, firstPosition: startPosition);
return AsyncValue.data(window);
} on Exception catch (e, st) {
return AsyncValue.error(e, st);
}
_currentWindow =
InputWindow(elements: nextWindow, first: first, last: last);
return const AsyncValue.data(true);
});
////////////////////////////////////////////////////////////////////////////
final DHTLogCubit<proto.Message> cubit;
late InputWindow _currentWindow;
final DHTLog _dhtLog;
}

View File

@ -6,6 +6,7 @@ import 'package:sorted_list/sorted_list.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../../proto/proto.dart' as proto;
import '../../../tools/tools.dart';
import 'author_input_queue.dart';
import 'author_input_source.dart';
import 'output_position.dart';
@ -19,96 +20,152 @@ class MessageReconciliation {
////////////////////////////////////////////////////////////////////////////
void reconcileMessages(
TypedKey author,
DHTLogStateData<proto.Message> inputMessagesCubitState,
DHTLogCubit<proto.Message> inputMessagesCubit) {
if (inputMessagesCubitState.window.isEmpty) {
return;
}
void addInputSourceFromDHTLog(TypedKey author, DHTLog inputMessagesDHTLog) {
_inputSources[author] = AuthorInputSource.fromDHTLog(inputMessagesDHTLog);
}
_inputSources[author] = AuthorInputSource.fromCubit(
cubitState: inputMessagesCubitState, cubit: inputMessagesCubit);
void reconcileMessages(TypedKey? author) {
// xxx: can we use 'author' here to optimize _updateAuthorInputQueues?
singleFuture(this, onError: _onError, () async {
// Take entire list of input sources we have currently and process them
final inputSources = _inputSources;
_inputSources = {};
final inputFuts = <Future<AuthorInputQueue?>>[];
for (final kv in inputSources.entries) {
final author = kv.key;
final inputSource = kv.value;
inputFuts
.add(_enqueueAuthorInput(author: author, inputSource: inputSource));
}
final inputQueues = await inputFuts.wait;
// Make this safe to cast by removing inputs that were rejected or empty
inputQueues.removeNulls();
// Update queues
final activeInputQueues = await _updateAuthorInputQueues();
// Process all input queues together
await _outputCubit
.operate((reconciledArray) async => _reconcileInputQueues(
reconciledArray: reconciledArray,
inputQueues: inputQueues.cast<AuthorInputQueue>(),
activeInputQueues: activeInputQueues,
));
});
}
////////////////////////////////////////////////////////////////////////////
// Prepare author input queues by removing dead ones and adding new ones
// Queues that are empty are not added until they have something in them
// Active input queues with a current message are returned in a list
Future<List<AuthorInputQueue>> _updateAuthorInputQueues() async {
// Remove any dead input queues
final deadQueues = <TypedKey>[];
for (final author in _inputQueues.keys) {
if (!_inputSources.containsKey(author)) {
deadQueues.add(author);
}
}
for (final author in deadQueues) {
_inputQueues.remove(author);
_outputPositions.remove(author);
}
await _outputCubit.operate((outputArray) async {
final dws = DelayedWaitSet<void, void>();
for (final kv in _inputSources.entries) {
final author = kv.key;
final inputSource = kv.value;
final iqExisting = _inputQueues[author];
if (iqExisting == null || iqExisting.inputSource != inputSource) {
dws.add((_) async {
try {
await _enqueueAuthorInput(
author: author,
inputSource: inputSource,
outputArray: outputArray);
// Catch everything so we can avoid ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
log.error('Exception updating author input queue: $e:\n$st\n');
_inputQueues.remove(author);
_outputPositions.remove(author);
}
});
}
}
await dws();
});
// Get the active input queues
final activeInputQueues = await _inputQueues.entries
.map((entry) async {
if (await entry.value.getCurrentMessage() != null) {
return entry.value;
} else {
return null;
}
})
.toList()
.wait
..removeNulls();
return activeInputQueues.cast<AuthorInputQueue>();
}
// Set up a single author's message reconciliation
Future<AuthorInputQueue?> _enqueueAuthorInput(
Future<void> _enqueueAuthorInput(
{required TypedKey author,
required AuthorInputSource inputSource}) async {
required AuthorInputSource inputSource,
required TableDBArrayProtobuf<proto.ReconciledMessage>
outputArray}) async {
// Get the position of our most recent reconciled message from this author
final outputPosition = await _findLastOutputPosition(author: author);
final outputPosition =
await _findLastOutputPosition(author: author, outputArray: outputArray);
// Find oldest message we have not yet reconciled
final inputQueue = await AuthorInputQueue.create(
author: author,
inputSource: inputSource,
outputPosition: outputPosition,
previousMessage: outputPosition?.message.content,
onError: _onError,
);
return inputQueue;
if (inputQueue != null) {
_inputQueues[author] = inputQueue;
_outputPositions[author] = outputPosition;
} else {
_inputQueues.remove(author);
_outputPositions.remove(author);
}
}
// Get the position of our most recent reconciled message from this author
// XXX: For a group chat, this should find when the author
// was added to the membership so we don't just go back in time forever
Future<OutputPosition?> _findLastOutputPosition(
{required TypedKey author}) async =>
_outputCubit.operate((arr) async {
var pos = arr.length - 1;
while (pos >= 0) {
final message = await arr.get(pos);
if (message.content.author.toVeilid() == author) {
return OutputPosition(message, pos);
}
pos--;
}
return null;
});
{required TypedKey author,
required TableDBArrayProtobuf<proto.ReconciledMessage>
outputArray}) async {
var pos = outputArray.length - 1;
while (pos >= 0) {
final message = await outputArray.get(pos);
if (message.content.author.toVeilid() == author) {
return OutputPosition(message, pos);
}
pos--;
}
return null;
}
// Process a list of author input queues and insert their messages
// into the output array, performing validation steps along the way
Future<void> _reconcileInputQueues({
required TableDBArrayProtobuf<proto.ReconciledMessage> reconciledArray,
required List<AuthorInputQueue> inputQueues,
required List<AuthorInputQueue> activeInputQueues,
}) async {
// Ensure queues all have something to do
inputQueues.removeWhere((q) => q.isDone);
if (inputQueues.isEmpty) {
// Ensure we have active queues to process
if (activeInputQueues.isEmpty) {
return;
}
// Sort queues from earliest to latest and then by author
// to ensure a deterministic insert order
inputQueues.sort((a, b) {
final acmp = a.outputPosition?.pos ?? -1;
final bcmp = b.outputPosition?.pos ?? -1;
activeInputQueues.sort((a, b) {
final aout = _outputPositions[a.author];
final bout = _outputPositions[b.author];
final acmp = aout?.pos ?? -1;
final bcmp = bout?.pos ?? -1;
if (acmp == bcmp) {
return a.author.toString().compareTo(b.author.toString());
}
@ -116,21 +173,28 @@ class MessageReconciliation {
});
// Start at the earliest position we know about in all the queues
var currentOutputPosition = inputQueues.first.outputPosition;
var currentOutputPosition =
_outputPositions[activeInputQueues.first.author];
final toInsert =
SortedList<proto.Message>(proto.MessageExt.compareTimestamp);
while (inputQueues.isNotEmpty) {
while (activeInputQueues.isNotEmpty) {
// Get up to '_maxReconcileChunk' of the items from the queues
// that we can insert at this location
bool added;
do {
added = false;
var someQueueEmpty = false;
for (final inputQueue in inputQueues) {
final inputCurrent = inputQueue.current!;
final emptyQueues = <AuthorInputQueue>{};
for (final inputQueue in activeInputQueues) {
final inputCurrent = await inputQueue.getCurrentMessage();
if (inputCurrent == null) {
log.error('Active input queue did not have a current message: '
'${inputQueue.author}');
continue;
}
if (currentOutputPosition == null ||
inputCurrent.timestamp <
currentOutputPosition.message.content.timestamp) {
@ -138,16 +202,14 @@ class MessageReconciliation {
added = true;
// Advance this queue
if (!await inputQueue.consume()) {
// Queue is empty now, run a queue purge
someQueueEmpty = true;
if (!await inputQueue.advance()) {
// Mark queue as empty for removal
emptyQueues.add(inputQueue);
}
}
}
// Remove empty queues now that we're done iterating
if (someQueueEmpty) {
inputQueues.removeWhere((q) => q.isDone);
}
// Remove finished queues now that we're done iterating
activeInputQueues.removeWhere(emptyQueues.contains);
if (toInsert.length >= _maxReconcileChunk) {
break;
@ -165,9 +227,27 @@ class MessageReconciliation {
..content = message)
.toList();
await reconciledArray.insertAll(
currentOutputPosition?.pos ?? reconciledArray.length,
reconciledInserts);
// Figure out where to insert the reconciled messages
final insertPos = currentOutputPosition?.pos ?? reconciledArray.length;
// Insert them all at once
await reconciledArray.insertAll(insertPos, reconciledInserts);
// Update output positions for input queues
final updatePositions = _outputPositions.keys.toSet();
var outputPos = insertPos + reconciledInserts.length;
for (final inserted in reconciledInserts.reversed) {
if (updatePositions.isEmpty) {
// Last seen positions already recorded for each active author
break;
}
outputPos--;
final author = inserted.content.author.toVeilid();
if (updatePositions.contains(author)) {
_outputPositions[author] = OutputPosition(inserted, outputPos);
updatePositions.remove(author);
}
}
toInsert.clear();
} else {
@ -187,7 +267,9 @@ class MessageReconciliation {
////////////////////////////////////////////////////////////////////////////
Map<TypedKey, AuthorInputSource> _inputSources = {};
final Map<TypedKey, AuthorInputSource> _inputSources = {};
final Map<TypedKey, AuthorInputQueue> _inputQueues = {};
final Map<TypedKey, OutputPosition?> _outputPositions = {};
final TableDBArrayProtobufCubit<proto.ReconciledMessage> _outputCubit;
final void Function(Object, StackTrace?) _onError;

View File

@ -77,8 +77,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
await _sentSubscription?.cancel();
await _rcvdSubscription?.cancel();
await _reconciledSubscription?.cancel();
await _sentMessagesCubit?.close();
await _rcvdMessagesCubit?.close();
await _sentMessagesDHTLog?.close();
await _rcvdMessagesDHTLog?.close();
await _reconciledMessagesCubit?.close();
// If the local conversation record is gone, then delete the reconciled
@ -100,8 +100,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
key: _remoteConversationRecordKey.toString(),
fromBuffer: proto.Message.fromBuffer,
closure: _processUnsentMessages,
onError: (e, sp) {
log.error('Exception while processing unsent messages: $e\n$sp\n');
onError: (e, st) {
log.error('Exception while processing unsent messages: $e\n$st\n');
});
// Make crypto
@ -111,10 +111,10 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
await _initReconciledMessagesCubit();
// Local messages key
await _initSentMessagesCubit();
await _initSentMessagesDHTLog();
// Remote messages key
await _initRcvdMessagesCubit();
await _initRcvdMessagesDHTLog();
// Command execution background process
_commandRunnerFut = Future.delayed(Duration.zero, _commandRunner);
@ -129,39 +129,40 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}
// Open local messages key
Future<void> _initSentMessagesCubit() async {
Future<void> _initSentMessagesDHTLog() async {
final writer = _accountInfo.identityWriter;
_sentMessagesCubit = DHTLogCubit(
open: () async => DHTLog.openWrite(_localMessagesRecordKey, writer,
final sentMessagesDHTLog =
await DHTLog.openWrite(_localMessagesRecordKey, writer,
debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::'
'SentMessages',
parent: _localConversationRecordKey,
crypto: _conversationCrypto),
decodeElement: proto.Message.fromBuffer);
_sentSubscription =
_sentMessagesCubit!.stream.listen(_updateSentMessagesState);
_updateSentMessagesState(_sentMessagesCubit!.state);
crypto: _conversationCrypto);
_sentSubscription = await sentMessagesDHTLog.listen(_updateSentMessages);
_sentMessagesDHTLog = sentMessagesDHTLog;
_reconciliation.addInputSourceFromDHTLog(
_accountInfo.identityTypedPublicKey, sentMessagesDHTLog);
}
// Open remote messages key
Future<void> _initRcvdMessagesCubit() async {
Future<void> _initRcvdMessagesDHTLog() async {
// Don't bother if we don't have a remote messages record key yet
if (_remoteMessagesRecordKey == null) {
return;
}
// Open new cubit if one is desired
_rcvdMessagesCubit = DHTLogCubit(
open: () async => DHTLog.openRead(_remoteMessagesRecordKey!,
debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::'
'RcvdMessages',
parent: _remoteConversationRecordKey,
crypto: _conversationCrypto),
decodeElement: proto.Message.fromBuffer);
_rcvdSubscription =
_rcvdMessagesCubit!.stream.listen(_updateRcvdMessagesState);
_updateRcvdMessagesState(_rcvdMessagesCubit!.state);
final rcvdMessagesDHTLog = await DHTLog.openRead(_remoteMessagesRecordKey!,
debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::'
'RcvdMessages',
parent: _remoteConversationRecordKey,
crypto: _conversationCrypto);
_rcvdSubscription = await rcvdMessagesDHTLog.listen(_updateRcvdMessages);
_rcvdMessagesDHTLog = rcvdMessagesDHTLog;
_reconciliation.addInputSourceFromDHTLog(
_remoteIdentityPublicKey, rcvdMessagesDHTLog);
}
Future<void> updateRemoteMessagesRecordKey(
@ -175,17 +176,17 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
return;
}
// Close existing cubit if we have one
final rcvdMessagesCubit = _rcvdMessagesCubit;
_rcvdMessagesCubit = null;
// Close existing DHTLog if we have one
final rcvdMessagesDHTLog = _rcvdMessagesDHTLog;
_rcvdMessagesDHTLog = null;
_remoteMessagesRecordKey = null;
await _rcvdSubscription?.cancel();
_rcvdSubscription = null;
await rcvdMessagesCubit?.close();
await rcvdMessagesDHTLog?.close();
// Init the new cubit if we should
// Init the new DHTLog if we should
_remoteMessagesRecordKey = remoteMessagesRecordKey;
await _initRcvdMessagesCubit();
await _initRcvdMessagesDHTLog();
});
}
@ -275,30 +276,15 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
////////////////////////////////////////////////////////////////////////////
// Internal implementation
// Called when the sent messages cubit gets a change
// Called when the sent messages DHTLog gets a change
// This will re-render when messages are sent from another machine
void _updateSentMessagesState(DHTLogBusyState<proto.Message> avmessages) {
final sentMessages = avmessages.state.asData?.value;
if (sentMessages == null) {
return;
}
_reconciliation.reconcileMessages(
_accountInfo.identityTypedPublicKey, sentMessages, _sentMessagesCubit!);
// Update the view
_renderState();
void _updateSentMessages(DHTLogUpdate upd) {
_reconciliation.reconcileMessages(_accountInfo.identityTypedPublicKey);
}
// Called when the received messages cubit gets a change
void _updateRcvdMessagesState(DHTLogBusyState<proto.Message> avmessages) {
final rcvdMessages = avmessages.state.asData?.value;
if (rcvdMessages == null) {
return;
}
_reconciliation.reconcileMessages(
_remoteIdentityPublicKey, rcvdMessages, _rcvdMessagesCubit!);
// Called when the received messages DHTLog gets a change
void _updateRcvdMessages(DHTLogUpdate upd) {
_reconciliation.reconcileMessages(_remoteIdentityPublicKey);
}
// Called when the reconciled messages window gets a change
@ -310,14 +296,11 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
Future<void> _processMessageToSend(
proto.Message message, proto.Message? previousMessage) async {
// Get the previous message if we don't have one
previousMessage ??= await _sentMessagesCubit!.operate((r) async =>
r.length == 0
? null
: await r.getProtobuf(proto.Message.fromBuffer, r.length - 1));
message.id =
await _senderMessageIntegrity.generateMessageId(previousMessage);
// It's possible we had a signature from a previous
// operateAppendEventual attempt, so clear it and make a new message id too
message
..clearSignature()
..id = await _senderMessageIntegrity.generateMessageId(previousMessage);
// Now sign it
await _senderMessageIntegrity.signMessage(
@ -326,26 +309,33 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Async process to send messages in the background
Future<void> _processUnsentMessages(IList<proto.Message> messages) async {
// Go through and assign ids to all the messages in order
proto.Message? previousMessage;
final processedMessages = messages.toList();
for (final message in processedMessages) {
try {
await _processMessageToSend(message, previousMessage);
previousMessage = message;
} on Exception catch (e) {
log.error('Exception processing unsent message: $e');
}
}
// _sendingMessages = messages;
// _renderState();
try {
await _sentMessagesCubit!.operateAppendEventual((writer) =>
writer.addAll(messages.map((m) => m.writeToBuffer()).toList()));
} on Exception catch (e) {
log.error('Exception appending unsent messages: $e');
await _sentMessagesDHTLog!.operateAppendEventual((writer) async {
// Get the previous message if we have one
var previousMessage = writer.length == 0
? null
: await writer.getProtobuf(
proto.Message.fromBuffer, writer.length - 1);
// Sign all messages
final processedMessages = messages.toList();
for (final message in processedMessages) {
try {
await _processMessageToSend(message, previousMessage);
previousMessage = message;
} on Exception catch (e, st) {
log.error('Exception processing unsent message: $e:\n$st\n');
}
}
final byteMessages = messages.map((m) => m.writeToBuffer()).toList();
return writer.addAll(byteMessages);
});
} on Exception catch (e, st) {
log.error('Exception appending unsent messages: $e:\n$st\n');
}
// _sendingMessages = const IList.empty();
@ -353,16 +343,17 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Produce a state for this cubit from the input cubits and queues
void _renderState() {
// Get all reconciled messages
// Get all reconciled messages in the cubit window
final reconciledMessages =
_reconciledMessagesCubit?.state.state.asData?.value;
// Get all sent messages
final sentMessages = _sentMessagesCubit?.state.state.asData?.value;
// Get all sent messages that are still offline
//final sentMessages = _sentMessagesDHTLog.
//Get all items in the unsent queue
//final unsentMessages = _unsentMessagesQueue.queue;
// If we aren't ready to render a state, say we're loading
if (reconciledMessages == null || sentMessages == null) {
if (reconciledMessages == null) {
emit(const AsyncLoading());
return;
}
@ -373,11 +364,11 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// keyMapper: (x) => x.content.authorUniqueIdString,
// values: reconciledMessages.windowElements,
// );
final sentMessagesMap =
IMap<String, OnlineElementState<proto.Message>>.fromValues(
keyMapper: (x) => x.value.authorUniqueIdString,
values: sentMessages.window,
);
// final sentMessagesMap =
// IMap<String, OnlineElementState<proto.Message>>.fromValues(
// keyMapper: (x) => x.value.authorUniqueIdString,
// values: sentMessages.window,
// );
// final unsentMessagesMap = IMap<String, proto.Message>.fromValues(
// keyMapper: (x) => x.authorUniqueIdString,
// values: unsentMessages,
@ -389,10 +380,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
final isLocal =
m.content.author.toVeilid() == _accountInfo.identityTypedPublicKey;
final reconciledTimestamp = Timestamp.fromInt64(m.reconciledTime);
final sm =
isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null;
final sent = isLocal && sm != null;
final sentOffline = isLocal && sm != null && sm.isOffline;
//final sm =
//isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null;
//final sent = isLocal && sm != null;
//final sentOffline = isLocal && sm != null && sm.isOffline;
final sent = isLocal;
final sentOffline = false; //
renderedElements.add(RenderStateElement(
message: m.content,
@ -487,16 +480,16 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
late final VeilidCrypto _conversationCrypto;
late final MessageIntegrity _senderMessageIntegrity;
DHTLogCubit<proto.Message>? _sentMessagesCubit;
DHTLogCubit<proto.Message>? _rcvdMessagesCubit;
DHTLog? _sentMessagesDHTLog;
DHTLog? _rcvdMessagesDHTLog;
TableDBArrayProtobufCubit<proto.ReconciledMessage>? _reconciledMessagesCubit;
late final MessageReconciliation _reconciliation;
late final PersistentQueue<proto.Message> _unsentMessagesQueue;
// IList<proto.Message> _sendingMessages = const IList.empty();
StreamSubscription<DHTLogBusyState<proto.Message>>? _sentSubscription;
StreamSubscription<DHTLogBusyState<proto.Message>>? _rcvdSubscription;
StreamSubscription<void>? _sentSubscription;
StreamSubscription<void>? _rcvdSubscription;
StreamSubscription<TableDBArrayProtobufBusyState<proto.ReconciledMessage>>?
_reconciledSubscription;
final StreamController<Future<void> Function()> _commandController;

View File

@ -138,7 +138,8 @@ class RouterCubit extends Cubit<RouterState> {
return null;
case '/developer':
return null;
// Otherwise, if there's no account, we need to go to the new account page.
// Otherwise, if there's no account,
// we need to go to the new account page.
default:
return state.hasAnyAccount ? null : '/new_account';
}

View File

@ -65,8 +65,9 @@ class _DeveloperPageState extends State<DeveloperPage> {
}
void _debugOut(String out) {
final sanitizedOut = out.replaceAll('\uFFFD', '');
final pen = AnsiPen()..cyan(bold: true);
final colorOut = pen(out);
final colorOut = pen(sanitizedOut);
debugPrint(colorOut);
globalDebugTerminal.write(colorOut.replaceAll('\n', '\r\n'));
}
@ -124,7 +125,21 @@ class _DeveloperPageState extends State<DeveloperPage> {
_debugOut('DEBUG >>>\n$debugCommand\n');
try {
final out = await Veilid.instance.debug(debugCommand);
var out = await Veilid.instance.debug(debugCommand);
if (debugCommand == 'help') {
out = 'VeilidChat Commands:\n'
' pool allocations - List DHTRecordPool allocations\n'
' pool opened - List opened DHTRecord instances'
' from the pool\n'
' change_log_ignore <layer> <changes> change the log'
' target ignore list for a tracing layer\n'
' targets to add to the ignore list can be separated by'
' a comma.\n'
' to remove a target from the ignore list, prepend it'
' with a minus.\n\n$out';
}
_debugOut('<<< DEBUG\n$out\n');
} on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');

View File

@ -650,7 +650,7 @@ packages:
path: "../../../../veilid/veilid-flutter"
relative: true
source: path
version: "0.4.3"
version: "0.4.4"
veilid_support:
dependency: "direct main"
description:

View File

@ -7,6 +7,7 @@ import 'package:collection/collection.dart';
import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart';
import '../../../src/veilid_log.dart';
import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto;
@ -212,7 +213,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
/// Runs a closure allowing read-only access to the log
Future<T> operate<T>(Future<T> Function(DHTLogReadOperations) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _spine.operate((spine) async {
@ -229,7 +230,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
Future<T> operateAppend<T>(
Future<T> Function(DHTLogWriteOperations) closure) async {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _spine.operateAppend((spine) async {
@ -248,7 +249,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
Future<T> Function(DHTLogWriteOperations) closure,
{Duration? timeout}) async {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _spine.operateAppendEventual((spine) async {
@ -263,7 +264,7 @@ class DHTLog implements DHTDeleteable<DHTLog> {
void Function(DHTLogUpdate) onChanged,
) {
if (!isOpen) {
throw StateError('log is not open"');
throw StateError('log is not open');
}
return _listenMutex.protect(() async {

View File

@ -112,33 +112,34 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
Future<void> _refreshInner(void Function(AsyncValue<DHTLogStateData<T>>) emit,
{bool forceRefresh = false}) async {
late final int length;
final window = await _log.operate((reader) async {
final windowElements = await _log.operate((reader) async {
length = reader.length;
return loadElementsFromReader(reader, _windowTail, _windowSize);
return _loadElementsFromReader(reader, _windowTail, _windowSize);
});
if (window == null) {
if (windowElements == null) {
setWantsRefresh();
return;
}
emit(AsyncValue.data(DHTLogStateData(
length: length,
window: window,
windowTail: _windowTail,
windowSize: _windowSize,
window: windowElements.$2,
windowTail: windowElements.$1 + windowElements.$2.length,
windowSize: windowElements.$2.length,
follow: _follow)));
setRefreshed();
}
// Tail is one past the last element to load
Future<IList<OnlineElementState<T>>?> loadElementsFromReader(
Future<(int, IList<OnlineElementState<T>>)?> _loadElementsFromReader(
DHTLogReadOperations reader, int tail, int count,
{bool forceRefresh = false}) async {
final length = reader.length;
if (length == 0) {
return const IList.empty();
}
final end = ((tail - 1) % length) + 1;
final start = (count < end) ? end - count : 0;
if (length == 0) {
return (start, IList<OnlineElementState<T>>.empty());
}
// If this is writeable get the offline positions
Set<int>? offlinePositions;
@ -154,8 +155,11 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
value: _decodeElement(x.$2),
isOffline: offlinePositions?.contains(x.$1) ?? false))
.toIList();
if (allItems == null) {
return null;
}
return allItems;
return (start, allItems);
}
void _update(DHTLogUpdate upd) {

View File

@ -21,8 +21,14 @@ class _DHTLogRead implements DHTLogReadOperations {
return null;
}
return lookup.scope((sa) =>
sa.operate((read) => read.get(lookup.pos, forceRefresh: forceRefresh)));
return lookup.scope((sa) => sa.operate((read) async {
if (lookup.pos >= read.length) {
veilidLoggy.error('DHTLog shortarray read @ ${lookup.pos}'
' >= length ${read.length}');
return null;
}
return read.get(lookup.pos, forceRefresh: forceRefresh);
}));
}
(int, int) _clampStartLen(int start, int? len) {
@ -47,40 +53,54 @@ class _DHTLogRead implements DHTLogReadOperations {
final chunks = Iterable<int>.generate(length)
.slices(kMaxDHTConcurrency)
.map((chunk) => chunk
.map((pos) async => get(pos + start, forceRefresh: forceRefresh)));
.map((chunk) => chunk.map((pos) async {
try {
return await get(pos + start, forceRefresh: forceRefresh);
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
rethrow;
}
}));
for (final chunk in chunks) {
final elems = await chunk.wait;
var elems = await chunk.wait;
// If any element was unavailable, return null
if (elems.contains(null)) {
return null;
// Return only the first contiguous range, anything else is garbage
// due to a representational error in the head or shortarray legnth
final nullPos = elems.indexOf(null);
if (nullPos != -1) {
elems = elems.sublist(0, nullPos);
}
out.addAll(elems.cast<Uint8List>());
if (nullPos != -1) {
break;
}
}
return out;
}
@override
Future<Set<int>?> getOfflinePositions() async {
Future<Set<int>> getOfflinePositions() async {
final positionOffline = <int>{};
// Iterate positions backward from most recent
for (var pos = _spine.length - 1; pos >= 0; pos--) {
// Get position
final lookup = await _spine.lookupPosition(pos);
// If position doesn't exist then it definitely wasn't written to offline
if (lookup == null) {
return null;
continue;
}
// Check each segment for offline positions
var foundOffline = false;
final success = await lookup.scope((sa) => sa.operate((read) async {
await lookup.scope((sa) => sa.operate((read) async {
final segmentOffline = await read.getOfflinePositions();
if (segmentOffline == null) {
return false;
}
// For each shortarray segment go through their segment positions
// in reverse order and see if they are offline
@ -94,11 +114,7 @@ class _DHTLogRead implements DHTLogReadOperations {
foundOffline = true;
}
}
return true;
}));
if (!success) {
return null;
}
// If we found nothing offline in this segment then we can stop
if (!foundOffline) {
break;

View File

@ -248,7 +248,12 @@ class _DHTLogSpine {
final headDelta = _ringDistance(newHead, oldHead);
final tailDelta = _ringDistance(newTail, oldTail);
if (headDelta > _positionLimit ~/ 2 || tailDelta > _positionLimit ~/ 2) {
throw const DHTExceptionInvalidData();
throw DHTExceptionInvalidData('_DHTLogSpine::_updateHead '
'_head=$_head _tail=$_tail '
'oldHead=$oldHead oldTail=$oldTail '
'newHead=$newHead newTail=$newTail '
'headDelta=$headDelta tailDelta=$tailDelta '
'_positionLimit=$_positionLimit');
}
}

View File

@ -17,7 +17,8 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
}
final lookup = await _spine.lookupPosition(pos);
if (lookup == null) {
throw const DHTExceptionInvalidData();
throw DHTExceptionInvalidData(
'_DHTLogRead::tryWriteItem pos=$pos _spine.length=${_spine.length}');
}
// Write item to the segment
@ -45,12 +46,14 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
}
final aLookup = await _spine.lookupPosition(aPos);
if (aLookup == null) {
throw const DHTExceptionInvalidData();
throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos '
'_spine.length=${_spine.length}');
}
final bLookup = await _spine.lookupPosition(bPos);
if (bLookup == null) {
await aLookup.close();
throw const DHTExceptionInvalidData();
throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos '
'_spine.length=${_spine.length}');
}
// Swap items in the segments
@ -65,7 +68,10 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
if (bItem.value == null) {
final aItem = await aWrite.get(aLookup.pos);
if (aItem == null) {
throw const DHTExceptionInvalidData();
throw DHTExceptionInvalidData(
'_DHTLogWrite::swap aPos=$aPos bPos=$bPos '
'aLookup.pos=${aLookup.pos} bLookup.pos=${bLookup.pos} '
'_spine.length=${_spine.length}');
}
await sb.operateWriteEventual((bWrite) async {
final success = await bWrite
@ -101,7 +107,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw const DHTExceptionInvalidData();
await write.truncate(lookup.pos);
}
return write.add(value);
}));
@ -117,12 +123,16 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
final dws = DelayedWaitSet<void, void>();
var success = true;
for (var valueIdx = 0; valueIdx < values.length;) {
for (var valueIdxIter = 0; valueIdxIter < values.length;) {
final valueIdx = valueIdxIter;
final remaining = values.length - valueIdx;
final lookup = await _spine.lookupPosition(insertPos + valueIdx);
if (lookup == null) {
throw const DHTExceptionInvalidData();
throw DHTExceptionInvalidData('_DHTLogWrite::addAll '
'_spine.length=${_spine.length}'
'insertPos=$insertPos valueIdx=$valueIdx '
'values.length=${values.length} ');
}
final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos);
@ -137,16 +147,21 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw const DHTExceptionInvalidData();
await write.truncate(lookup.pos);
}
return write.addAll(sublistValues);
await write.addAll(sublistValues);
success = true;
}));
} on DHTExceptionOutdated {
success = false;
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
}
});
valueIdx += sacount;
valueIdxIter += sacount;
}
await dws();

View File

@ -246,11 +246,13 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
/// Print children
String debugChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
allDeps ??= _collectChildrenInner(recordKey);
// Debugging
// ignore: avoid_print
var out =
'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n';
for (final dep in allDeps) {
if (dep != recordKey) {
// Debugging
// ignore: avoid_print
out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n';
}
@ -270,32 +272,25 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
break;
}
}
} else {
final now = Veilid.instance.now().value;
// Expired, process renewal if desired
for (final entry in _opened.entries) {
final openedKey = entry.key;
final openedRecordInfo = entry.value;
if (openedKey == updateValueChange.key) {
// Renew watch state for each opened record
for (final rec in openedRecordInfo.records) {
// See if the watch had an expiration and if it has expired
// otherwise the renewal will keep the same parameters
final watchState = rec._watchState;
if (watchState != null) {
final exp = watchState.expiration;
if (exp != null && exp.value < now) {
// Has expiration, and it has expired, clear watch state
rec._watchState = null;
}
}
}
openedRecordInfo.shared.needsWatchStateUpdate = true;
break;
}
}
}
// else {
// XXX: should no longer be necessary
// // Remove watch state
//
// for (final entry in _opened.entries) {
// final openedKey = entry.key;
// final openedRecordInfo = entry.value;
// if (openedKey == updateValueChange.key) {
// for (final rec in openedRecordInfo.records) {
// rec._watchState = null;
// }
// openedRecordInfo.shared.needsWatchStateUpdate = true;
// break;
// }
// }
//}
}
/// Log the current record allocations
@ -735,7 +730,6 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
int? totalCount;
Timestamp? maxExpiration;
List<ValueSubkeyRange>? allSubkeys;
Timestamp? earliestRenewalTime;
var noExpiration = false;
var everySubkey = false;
@ -768,15 +762,6 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
} else {
everySubkey = true;
}
final wsRenewalTime = ws.renewalTime;
if (wsRenewalTime != null) {
earliestRenewalTime = earliestRenewalTime == null
? wsRenewalTime
: Timestamp(
value: (wsRenewalTime.value < earliestRenewalTime.value
? wsRenewalTime.value
: earliestRenewalTime.value));
}
}
}
if (noExpiration) {
@ -790,25 +775,10 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
return _WatchState(
subkeys: allSubkeys,
expiration: maxExpiration,
count: totalCount,
renewalTime: earliestRenewalTime);
}
static void _updateWatchRealExpirations(Iterable<DHTRecord> records,
Timestamp realExpiration, Timestamp renewalTime) {
for (final rec in records) {
final ws = rec._watchState;
if (ws != null) {
rec._watchState = _WatchState(
subkeys: ws.subkeys,
expiration: ws.expiration,
count: ws.count,
realExpiration: realExpiration,
renewalTime: renewalTime);
}
}
subkeys: allSubkeys,
expiration: maxExpiration,
count: totalCount,
);
}
Future<void> _watchStateChange(
@ -833,9 +803,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
// Only try this once, if it doesn't succeed then it can just expire
// on its own.
try {
final cancelled = await dhtctx.cancelDHTWatch(openedRecordKey);
final stillActive = await dhtctx.cancelDHTWatch(openedRecordKey);
log('cancelDHTWatch: key=$openedRecordKey, cancelled=$cancelled, '
log('cancelDHTWatch: key=$openedRecordKey, stillActive=$stillActive, '
'debugNames=${openedRecordInfo.debugNames}');
openedRecordInfo.shared.unionWatchState = null;
@ -858,34 +828,20 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
final subkeys = unionWatchState.subkeys?.toList();
final count = unionWatchState.count;
final expiration = unionWatchState.expiration;
final now = veilid.now();
final realExpiration = await dhtctx.watchDHTValues(openedRecordKey,
final active = await dhtctx.watchDHTValues(openedRecordKey,
subkeys: unionWatchState.subkeys?.toList(),
count: unionWatchState.count,
expiration: unionWatchState.expiration ??
(_defaultWatchDurationSecs == null
? null
: veilid.now().offset(TimestampDuration.fromMillis(
_defaultWatchDurationSecs! * 1000))));
expiration: unionWatchState.expiration);
final expirationDuration = realExpiration.diff(now);
final renewalTime = now.offset(TimestampDuration(
value: expirationDuration.value *
BigInt.from(_watchRenewalNumerator) ~/
BigInt.from(_watchRenewalDenominator)));
log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, '
log('watchDHTValues(active=$active): '
'key=$openedRecordKey, subkeys=$subkeys, '
'count=$count, expiration=$expiration, '
'realExpiration=$realExpiration, '
'renewalTime=$renewalTime, '
'debugNames=${openedRecordInfo.debugNames}');
// Update watch states with real expiration
if (realExpiration.value != BigInt.zero) {
if (active) {
openedRecordInfo.shared.unionWatchState = unionWatchState;
_updateWatchRealExpirations(
openedRecordInfo.records, realExpiration, renewalTime);
openedRecordInfo.shared.needsWatchStateUpdate = false;
}
} on VeilidAPIExceptionTimeout {
@ -944,22 +900,13 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
/// Ticker to check watch state change requests
Future<void> tick() async => _mutex.protect(() async {
// See if any opened records need watch state changes
final now = veilid.now();
for (final kv in _opened.entries) {
final openedRecordKey = kv.key;
final openedRecordInfo = kv.value;
var wantsWatchStateUpdate =
final wantsWatchStateUpdate =
openedRecordInfo.shared.needsWatchStateUpdate;
// Check if we have reached renewal time for the watch
if (openedRecordInfo.shared.unionWatchState != null &&
openedRecordInfo.shared.unionWatchState!.renewalTime != null &&
now.value >
openedRecordInfo.shared.unionWatchState!.renewalTime!.value) {
wantsWatchStateUpdate = true;
}
if (wantsWatchStateUpdate) {
// Update union watch state
final unionWatchState =

View File

@ -1,9 +1,5 @@
part of 'dht_record_pool.dart';
const int? _defaultWatchDurationSecs = null; // 600
const int _watchRenewalNumerator = 4;
const int _watchRenewalDenominator = 5;
// DHT crypto domain
const String _cryptoDomainDHT = 'dht';
@ -14,21 +10,17 @@ const _sfListen = 'listen';
/// Watch state
@immutable
class _WatchState extends Equatable {
const _WatchState(
{required this.subkeys,
required this.expiration,
required this.count,
this.realExpiration,
this.renewalTime});
const _WatchState({
required this.subkeys,
required this.expiration,
required this.count,
});
final List<ValueSubkeyRange>? subkeys;
final Timestamp? expiration;
final int? count;
final Timestamp? realExpiration;
final Timestamp? renewalTime;
@override
List<Object?> get props =>
[subkeys, expiration, count, realExpiration, renewalTime];
List<Object?> get props => [subkeys, expiration, count];
}
/// Data shared amongst all DHTRecord instances

View File

@ -4,6 +4,7 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:collection/collection.dart';
import '../../../src/veilid_log.dart';
import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto;

View File

@ -62,9 +62,6 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayCubitState<T>>
Set<int>? offlinePositions;
if (_shortArray.writer != null) {
offlinePositions = await reader.getOfflinePositions();
if (offlinePositions == null) {
return null;
}
}
// Get the items

View File

@ -383,6 +383,24 @@ class _DHTShortArrayHead {
// xxx: free list optimization here?
}
/// Truncate index to a particular length
void truncate(int newLength) {
if (newLength >= _index.length) {
return;
} else if (newLength == 0) {
clearIndex();
return;
} else if (newLength < 0) {
throw StateError('can not truncate to negative length');
}
final newIndex = _index.sublist(0, newLength);
final freed = _index.sublist(newLength);
_index = newIndex;
_free.addAll(freed);
}
/// Validate the head from the DHT is properly formatted
/// and calculate the free list from it while we're here
List<int> _makeFreeList(

View File

@ -60,17 +60,32 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
final chunks = Iterable<int>.generate(length)
.slices(kMaxDHTConcurrency)
.map((chunk) => chunk
.map((pos) async => get(pos + start, forceRefresh: forceRefresh)));
.map((chunk) => chunk.map((pos) async {
try {
return await get(pos + start, forceRefresh: forceRefresh);
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
rethrow;
}
}));
for (final chunk in chunks) {
final elems = await chunk.wait;
var elems = await chunk.wait;
// If any element was unavailable, return null
if (elems.contains(null)) {
return null;
// Return only the first contiguous range, anything else is garbage
// due to a representational error in the head or shortarray legnth
final nullPos = elems.indexOf(null);
if (nullPos != -1) {
elems = elems.sublist(0, nullPos);
}
out.addAll(elems.cast<Uint8List>());
if (nullPos != -1) {
break;
}
}
return out;

View File

@ -9,6 +9,7 @@ abstract class DHTShortArrayWriteOperations
DHTRandomWrite,
DHTInsertRemove,
DHTAdd,
DHTTruncate,
DHTClear {}
class _DHTShortArrayWrite extends _DHTShortArrayRead
@ -72,10 +73,16 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
final value = values[i];
final outSeqNum = outSeqNums[i];
dws.add((_) async {
final outValue = await lookup.record.tryWriteBytes(value,
subkey: lookup.recordSubkey, outSeqNum: outSeqNum);
if (outValue != null) {
success = false;
try {
final outValue = await lookup.record.tryWriteBytes(value,
subkey: lookup.recordSubkey, outSeqNum: outSeqNum);
if (outValue != null) {
success = false;
}
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
}
});
}
@ -142,6 +149,11 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
_head.clearIndex();
}
@override
Future<void> truncate(int newLength) async {
_head.truncate(newLength);
}
@override
Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output}) async {

View File

@ -14,19 +14,22 @@ abstract class DHTRandomRead {
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// Throws an IndexError if the 'pos' is not within the length
/// of the container.
/// of the container. May return null if the item is not available at this
/// time.
Future<Uint8List?> get(int pos, {bool forceRefresh = false});
/// Return a list of a range of items in the DHTArray. If 'forceRefresh'
/// is specified, the network will always be checked for newer values
/// rather than returning the existing locally stored copy of the elements.
/// Throws an IndexError if either 'start' or '(start+length)' is not within
/// the length of the container.
/// the length of the container. May return fewer items than the length
/// expected if the requested items are not available, but will always
/// return a contiguous range starting at 'start'.
Future<List<Uint8List>?> getRange(int start,
{int? length, bool forceRefresh = false});
/// Get a list of the positions that were written offline and not flushed yet
Future<Set<int>?> getOfflinePositions();
Future<Set<int>> getOfflinePositions();
}
extension DHTRandomReadExt on DHTRandomRead {

View File

@ -2,20 +2,32 @@ class DHTExceptionOutdated implements Exception {
const DHTExceptionOutdated(
[this.cause = 'operation failed due to newer dht value']);
final String cause;
@override
String toString() => 'DHTExceptionOutdated: $cause';
}
class DHTExceptionInvalidData implements Exception {
const DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']);
const DHTExceptionInvalidData(this.cause);
final String cause;
@override
String toString() => 'DHTExceptionInvalidData: $cause';
}
class DHTExceptionCancelled implements Exception {
const DHTExceptionCancelled([this.cause = 'operation was cancelled']);
final String cause;
@override
String toString() => 'DHTExceptionCancelled: $cause';
}
class DHTExceptionNotAvailable implements Exception {
const DHTExceptionNotAvailable(
[this.cause = 'request could not be completed at this time']);
final String cause;
@override
String toString() => 'DHTExceptionNotAvailable: $cause';
}

View File

@ -7,6 +7,7 @@ import 'package:protobuf/protobuf.dart';
import 'config.dart';
import 'table_db.dart';
import 'veilid_log.dart';
class PersistentQueue<T extends GeneratedMessage>
with TableDBBackedFromBuffer<IList<T>> {
@ -46,7 +47,7 @@ class PersistentQueue<T extends GeneratedMessage>
}
}
Future<void> _init(_) async {
Future<void> _init(Completer<void> _) async {
// Start the processor
unawaited(Future.delayed(Duration.zero, () async {
await _initWait();
@ -182,10 +183,28 @@ class PersistentQueue<T extends GeneratedMessage>
@override
IList<T> valueFromBuffer(Uint8List bytes) {
final reader = CodedBufferReader(bytes);
var out = IList<T>();
while (!reader.isAtEnd()) {
out = out.add(_fromBuffer(reader.readBytesAsView()));
try {
final reader = CodedBufferReader(bytes);
while (!reader.isAtEnd()) {
final bytes = reader.readBytesAsView();
try {
final item = _fromBuffer(bytes);
out = out.add(item);
} on Exception catch (e, st) {
veilidLoggy.debug(
'Dropping invalid item from persistent queue: $bytes\n'
'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n',
e,
st);
}
}
} on Exception catch (e, st) {
veilidLoggy.debug(
'Dropping remainder of invalid persistent queue\n'
'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n',
e,
st);
}
return out;
}

View File

@ -9,6 +9,7 @@ import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart';
import '../veilid_support.dart';
import 'veilid_log.dart';
@immutable
class TableDBArrayUpdate extends Equatable {
@ -262,7 +263,16 @@ class _TableDBArrayBase {
final dws = DelayedWaitSet<Uint8List, void>();
while (batchLen > 0) {
final entry = await _getIndexEntry(pos);
dws.add((_) async => (await _loadEntry(entry))!);
dws.add((_) async {
try {
return (await _loadEntry(entry))!;
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
rethrow;
}
});
pos++;
batchLen--;
}

View File

@ -726,7 +726,7 @@ packages:
path: "../../../veilid/veilid-flutter"
relative: true
source: path
version: "0.4.3"
version: "0.4.4"
vm_service:
dependency: transitive
description: