debugging

This commit is contained in:
Christien Rioux 2024-06-02 11:04:19 -04:00
parent f780a60d69
commit 0e4606f35e
20 changed files with 521 additions and 321 deletions

View file

@ -18,7 +18,7 @@ class AuthorInputQueue {
_onError = onError,
_inputSource = inputSource,
_outputPosition = outputPosition,
_lastMessage = outputPosition?.message,
_lastMessage = outputPosition?.message.content,
_messageIntegrity = messageIntegrity,
_currentPosition = inputSource.currentWindow.last;
@ -43,8 +43,8 @@ class AuthorInputQueue {
////////////////////////////////////////////////////////////////////////////
// Public interface
// Check if there are no messages in this queue to reconcile
bool get isEmpty => _currentMessage == null;
// Check if there are no messages left in this queue to reconcile
bool get isDone => _isDone;
// Get the current message that needs reconciliation
proto.Message? get current => _currentMessage;
@ -58,6 +58,9 @@ class AuthorInputQueue {
// Remove a reconciled message and move to the next message
// Returns true if there is more work to do
Future<bool> consume() async {
if (_isDone) {
return false;
}
while (true) {
_lastMessage = _currentMessage;
@ -66,6 +69,7 @@ class AuthorInputQueue {
// Get more window if we need to
if (!await _updateWindow()) {
// Window is not available so this queue can't work right now
_isDone = true;
return false;
}
final nextMessage = _inputSource.currentWindow
@ -73,9 +77,9 @@ class AuthorInputQueue {
// Drop the 'offline' elements because we don't reconcile
// anything until it has been confirmed to be committed to the DHT
if (nextMessage.isOffline) {
continue;
}
// if (nextMessage.isOffline) {
// continue;
// }
if (_lastMessage != null) {
// Ensure the timestamp is not moving backward
@ -112,7 +116,7 @@ class AuthorInputQueue {
outer:
while (true) {
// Iterate through current window backward
for (var i = _inputSource.currentWindow.elements.length;
for (var i = _inputSource.currentWindow.elements.length - 1;
i >= 0 && _currentPosition >= 0;
i--, _currentPosition--) {
final elem = _inputSource.currentWindow.elements[i];
@ -134,13 +138,24 @@ class AuthorInputQueue {
if (!await _updateWindow()) {
// Window is not available or things are empty so this
// queue can't work right now
_isDone = true;
return false;
}
}
// The current position should be equal to the first message to process
// and the current window to process should not be empty
return _inputSource.currentWindow.elements.isNotEmpty;
// _currentPosition points to either before the input source starts
// or the position of the previous element. We still need to set the
// _currentMessage to the previous element so consume() can compare
// against it if we can.
if (_currentPosition >= 0) {
_currentMessage = _inputSource.currentWindow
.elements[_currentPosition - _inputSource.currentWindow.first].value;
}
// After this consume(), the currentPosition and _currentMessage should
// be equal to the first message to process and the current window to
// process should not be empty
return consume();
}
// Slide the window toward the current position and load the batch around it
@ -186,6 +201,9 @@ class AuthorInputQueue {
int _currentPosition;
// The current message we're looking at
proto.Message? _currentMessage;
// If we have reached the end
bool _isDone = false;
// Desired maximum window length
static const int _maxWindowLength = 256;
}

View file

@ -21,9 +21,10 @@ class AuthorInputSource {
{required DHTLogStateData<proto.Message> cubitState,
required this.cubit}) {
_currentWindow = InputWindow(
elements: cubitState.elements,
first: cubitState.tail - cubitState.elements.length,
last: cubitState.tail - 1);
elements: cubitState.window,
first: (cubitState.windowTail - cubitState.window.length) %
cubitState.length,
last: (cubitState.windowTail - 1) % cubitState.length);
}
////////////////////////////////////////////////////////////////////////////

View file

@ -12,7 +12,7 @@ import 'output_position.dart';
class MessageReconciliation {
MessageReconciliation(
{required TableDBArrayCubit<proto.ReconciledMessage> output,
{required TableDBArrayProtobufCubit<proto.ReconciledMessage> output,
required void Function(Object, StackTrace?) onError})
: _outputCubit = output,
_onError = onError;
@ -23,7 +23,7 @@ class MessageReconciliation {
TypedKey author,
DHTLogStateData<proto.Message> inputMessagesCubitState,
DHTLogCubit<proto.Message> inputMessagesCubit) {
if (inputMessagesCubitState.elements.isEmpty) {
if (inputMessagesCubitState.window.isEmpty) {
return;
}
@ -84,11 +84,11 @@ class MessageReconciliation {
_outputCubit.operate((arr) async {
var pos = arr.length - 1;
while (pos >= 0) {
final message = await arr.getProtobuf(proto.Message.fromBuffer, pos);
final message = await arr.get(pos);
if (message == null) {
throw StateError('should have gotten last message');
}
if (message.author.toVeilid() == author) {
if (message.content.author.toVeilid() == author) {
return OutputPosition(message, pos);
}
pos--;
@ -99,11 +99,11 @@ class MessageReconciliation {
// Process a list of author input queues and insert their messages
// into the output array, performing validation steps along the way
Future<void> _reconcileInputQueues({
required TableDBArray reconciledArray,
required TableDBArrayProtobuf<proto.ReconciledMessage> reconciledArray,
required List<AuthorInputQueue> inputQueues,
}) async {
// Ensure queues all have something to do
inputQueues.removeWhere((q) => q.isEmpty);
inputQueues.removeWhere((q) => q.isDone);
if (inputQueues.isEmpty) {
return;
}
@ -124,8 +124,7 @@ class MessageReconciliation {
// Get the timestamp for this output position
var currentOutputMessage = firstOutputPos == null
? null
: await reconciledArray.getProtobuf(
proto.Message.fromBuffer, firstOutputPos);
: await reconciledArray.get(firstOutputPos);
var currentOutputPos = firstOutputPos ?? 0;
@ -143,7 +142,7 @@ class MessageReconciliation {
for (final inputQueue in inputQueues) {
final inputCurrent = inputQueue.current!;
if (currentOutputMessage == null ||
inputCurrent.timestamp <= currentOutputMessage.timestamp) {
inputCurrent.timestamp < currentOutputMessage.content.timestamp) {
toInsert.add(inputCurrent);
added = true;
@ -156,7 +155,7 @@ class MessageReconciliation {
}
// Remove empty queues now that we're done iterating
if (someQueueEmpty) {
inputQueues.removeWhere((q) => q.isEmpty);
inputQueues.removeWhere((q) => q.isDone);
}
if (toInsert.length >= _maxReconcileChunk) {
@ -166,13 +165,24 @@ class MessageReconciliation {
// Perform insertions in bulk
if (toInsert.isNotEmpty) {
await reconciledArray.insertAllProtobuf(currentOutputPos, toInsert);
final reconciledTime = Veilid.instance.now().toInt64();
// Add reconciled timestamps
final reconciledInserts = toInsert
.map((message) => proto.ReconciledMessage()
..reconciledTime = reconciledTime
..content = message)
.toList();
await reconciledArray.insertAll(currentOutputPos, reconciledInserts);
toInsert.clear();
} else {
// If there's nothing to insert at this position move to the next one
currentOutputPos++;
currentOutputMessage = await reconciledArray.getProtobuf(
proto.Message.fromBuffer, currentOutputPos);
currentOutputMessage = (currentOutputPos == reconciledArray.length)
? null
: await reconciledArray.get(currentOutputPos);
}
}
}
@ -180,7 +190,7 @@ class MessageReconciliation {
////////////////////////////////////////////////////////////////////////////
Map<TypedKey, AuthorInputSource> _inputSources = {};
final TableDBArrayCubit<proto.ReconciledMessage> _outputCubit;
final TableDBArrayProtobufCubit<proto.ReconciledMessage> _outputCubit;
final void Function(Object, StackTrace?) _onError;
static const int _maxReconcileChunk = 65536;

View file

@ -6,7 +6,7 @@ import '../../../proto/proto.dart' as proto;
@immutable
class OutputPosition extends Equatable {
const OutputPosition(this.message, this.pos);
final proto.Message message;
final proto.ReconciledMessage message;
final int pos;
@override
List<Object?> get props => [message, pos];