bug cleanup

This commit is contained in:
Christien Rioux 2024-06-02 12:53:52 -04:00
parent 0e4606f35e
commit 4082d1dd76
5 changed files with 41 additions and 30 deletions

View File

@ -109,7 +109,7 @@ class AuthorInputQueue {
// Internal implementation
// Walk backward from the tail of the input queue to find the first
// message newer than our last reconcicled message from this author
// message newer than our last reconciled message from this author
// Returns false if no work is needed
Future<bool> _findStartOfWork() async {
// Iterate windows over the inputSource
@ -121,10 +121,14 @@ class AuthorInputQueue {
i--, _currentPosition--) {
final elem = _inputSource.currentWindow.elements[i];
// If we've found an input element that is older than our last
// reconciled message for this author, then we stop
// If we've found an input element that is older or same time as our
// last reconciled message for this author, or we find the message
// itself then we stop
if (_lastMessage != null) {
if (elem.value.timestamp < _lastMessage!.timestamp) {
if (elem.value.authorUniqueIdBytes
.compare(_lastMessage!.authorUniqueIdBytes) ==
0 ||
elem.value.timestamp <= _lastMessage!.timestamp) {
break outer;
}
}

View File

@ -75,8 +75,7 @@ class MessageReconciliation {
return inputQueue;
}
// Get the position of our most recent
// reconciled message from this author
// Get the position of our most recent reconciled message from this author
// XXX: For a group chat, this should find when the author
// was added to the membership so we don't just go back in time forever
Future<OutputPosition?> _findLastOutputPosition(
@ -85,9 +84,6 @@ class MessageReconciliation {
var pos = arr.length - 1;
while (pos >= 0) {
final message = await arr.get(pos);
if (message == null) {
throw StateError('should have gotten last message');
}
if (message.content.author.toVeilid() == author) {
return OutputPosition(message, pos);
}
@ -120,13 +116,7 @@ class MessageReconciliation {
});
// Start at the earliest position we know about in all the queues
final firstOutputPos = inputQueues.first.outputPosition?.pos;
// Get the timestamp for this output position
var currentOutputMessage = firstOutputPos == null
? null
: await reconciledArray.get(firstOutputPos);
var currentOutputPos = firstOutputPos ?? 0;
var currentOutputPosition = inputQueues.first.outputPosition;
final toInsert =
SortedList<proto.Message>(proto.MessageExt.compareTimestamp);
@ -141,8 +131,9 @@ class MessageReconciliation {
var someQueueEmpty = false;
for (final inputQueue in inputQueues) {
final inputCurrent = inputQueue.current!;
if (currentOutputMessage == null ||
inputCurrent.timestamp < currentOutputMessage.content.timestamp) {
if (currentOutputPosition == null ||
inputCurrent.timestamp <
currentOutputPosition.message.content.timestamp) {
toInsert.add(inputCurrent);
added = true;
@ -174,15 +165,22 @@ class MessageReconciliation {
..content = message)
.toList();
await reconciledArray.insertAll(currentOutputPos, reconciledInserts);
await reconciledArray.insertAll(
currentOutputPosition?.pos ?? reconciledArray.length,
reconciledInserts);
toInsert.clear();
} else {
// If there's nothing to insert at this position move to the next one
currentOutputPos++;
currentOutputMessage = (currentOutputPos == reconciledArray.length)
? null
: await reconciledArray.get(currentOutputPos);
final nextOutputPos = (currentOutputPosition != null)
? currentOutputPosition.pos + 1
: reconciledArray.length;
if (nextOutputPos == reconciledArray.length) {
currentOutputPosition = null;
} else {
currentOutputPosition = OutputPosition(
await reconciledArray.get(nextOutputPos), nextOutputPos);
}
}
}
}

View File

@ -200,8 +200,12 @@ class _DHTLogSpine {
throw TimeoutException('timeout reached');
}
}
if (await closure(this)) {
break;
try {
if (await closure(this)) {
break;
}
} on DHTExceptionTryAgain {
//
}
// Failed to write in closure resets state
_head = oldHead;

View File

@ -139,9 +139,14 @@ class _DHTShortArrayHead {
throw TimeoutException('timeout reached');
}
}
if (await closure(this)) {
break;
try {
if (await closure(this)) {
break;
}
} on DHTExceptionTryAgain {
//
}
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);

View File

@ -710,10 +710,10 @@ class TableDBArrayJson<T> extends _TableDBArrayBase {
Future<void> insertAll(int pos, List<T> values) async =>
_insertAll(pos, values.map(jsonEncodeBytes).toList());
Future<T?> get(
Future<T> get(
int pos,
) =>
_get(pos).then((out) => jsonDecodeOptBytes(_fromJson, out));
_get(pos).then((out) => jsonDecodeBytes(_fromJson, out));
Future<List<T>> getRange(int start, [int? end]) =>
_getRange(start, end).then((out) => out.map(_fromJson).toList());
@ -773,7 +773,7 @@ class TableDBArrayProtobuf<T extends GeneratedMessage>
Future<void> insertAll(int pos, List<T> values) async =>
_insertAll(pos, values.map((x) => x.writeToBuffer()).toList());
Future<T?> get(
Future<T> get(
int pos,
) =>
_get(pos).then(_fromBuffer);