clean up a bunch of exceptions

This commit is contained in:
Christien Rioux 2025-04-17 18:55:43 -04:00
parent c077a0290f
commit bf38c2c44d
21 changed files with 244 additions and 166 deletions

View file

@ -1,2 +1,3 @@
extensions: extensions:
- provider: true - provider: true
- shared_preferences: true

View file

@ -168,7 +168,7 @@ SPEC CHECKSUMS:
sqflite_darwin: 20b2a3a3b70e43edae938624ce550a3cbf66a3d0 sqflite_darwin: 20b2a3a3b70e43edae938624ce550a3cbf66a3d0
system_info_plus: 555ce7047fbbf29154726db942ae785c29211740 system_info_plus: 555ce7047fbbf29154726db942ae785c29211740
url_launcher_ios: 694010445543906933d732453a59da0a173ae33d url_launcher_ios: 694010445543906933d732453a59da0a173ae33d
veilid: b3b9418ae6b083e662396bfa2c635fb115c8510e veilid: 3ce560a4f2b568a77a9fd5e23090f2fa97581019
PODFILE CHECKSUM: c8bf5b16c34712d5790b0b8d2472cc66ac0a8487 PODFILE CHECKSUM: c8bf5b16c34712d5790b0b8d2472cc66ac0a8487

View file

@ -6,6 +6,7 @@ import 'package:sorted_list/sorted_list.dart';
import 'package:veilid_support/veilid_support.dart'; import 'package:veilid_support/veilid_support.dart';
import '../../../proto/proto.dart' as proto; import '../../../proto/proto.dart' as proto;
import '../../../tools/tools.dart';
import 'author_input_queue.dart'; import 'author_input_queue.dart';
import 'author_input_source.dart'; import 'author_input_source.dart';
import 'output_position.dart'; import 'output_position.dart';
@ -62,6 +63,7 @@ class MessageReconciliation {
Future<AuthorInputQueue?> _enqueueAuthorInput( Future<AuthorInputQueue?> _enqueueAuthorInput(
{required TypedKey author, {required TypedKey author,
required AuthorInputSource inputSource}) async { required AuthorInputSource inputSource}) async {
try {
// Get the position of our most recent reconciled message from this author // Get the position of our most recent reconciled message from this author
final outputPosition = await _findLastOutputPosition(author: author); final outputPosition = await _findLastOutputPosition(author: author);
@ -73,6 +75,12 @@ class MessageReconciliation {
onError: _onError, onError: _onError,
); );
return inputQueue; return inputQueue;
// Catch everything so we can avoid ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
log.error('Exception enqueing author input: $e:\n$st\n');
return null;
}
} }
// Get the position of our most recent reconciled message from this author // Get the position of our most recent reconciled message from this author

View file

@ -100,8 +100,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
key: _remoteConversationRecordKey.toString(), key: _remoteConversationRecordKey.toString(),
fromBuffer: proto.Message.fromBuffer, fromBuffer: proto.Message.fromBuffer,
closure: _processUnsentMessages, closure: _processUnsentMessages,
onError: (e, sp) { onError: (e, st) {
log.error('Exception while processing unsent messages: $e\n$sp\n'); log.error('Exception while processing unsent messages: $e\n$st\n');
}); });
// Make crypto // Make crypto
@ -310,14 +310,11 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
Future<void> _processMessageToSend( Future<void> _processMessageToSend(
proto.Message message, proto.Message? previousMessage) async { proto.Message message, proto.Message? previousMessage) async {
// Get the previous message if we don't have one // It's possible we had a signature from a previous
previousMessage ??= await _sentMessagesCubit!.operate((r) async => // operateAppendEventual attempt, so clear it and make a new message id too
r.length == 0 message
? null ..clearSignature()
: await r.getProtobuf(proto.Message.fromBuffer, r.length - 1)); ..id = await _senderMessageIntegrity.generateMessageId(previousMessage);
message.id =
await _senderMessageIntegrity.generateMessageId(previousMessage);
// Now sign it // Now sign it
await _senderMessageIntegrity.signMessage( await _senderMessageIntegrity.signMessage(
@ -326,26 +323,33 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Async process to send messages in the background // Async process to send messages in the background
Future<void> _processUnsentMessages(IList<proto.Message> messages) async { Future<void> _processUnsentMessages(IList<proto.Message> messages) async {
// Go through and assign ids to all the messages in order // _sendingMessages = messages;
proto.Message? previousMessage;
// _renderState();
try {
await _sentMessagesCubit!.operateAppendEventual((writer) async {
// Get the previous message if we have one
var previousMessage = writer.length == 0
? null
: await writer.getProtobuf(
proto.Message.fromBuffer, writer.length - 1);
// Sign all messages
final processedMessages = messages.toList(); final processedMessages = messages.toList();
for (final message in processedMessages) { for (final message in processedMessages) {
try { try {
await _processMessageToSend(message, previousMessage); await _processMessageToSend(message, previousMessage);
previousMessage = message; previousMessage = message;
} on Exception catch (e) { } on Exception catch (e, st) {
log.error('Exception processing unsent message: $e'); log.error('Exception processing unsent message: $e:\n$st\n');
} }
} }
final byteMessages = messages.map((m) => m.writeToBuffer()).toList();
// _sendingMessages = messages; return writer.addAll(byteMessages);
});
// _renderState(); } on Exception catch (e, st) {
try { log.error('Exception appending unsent messages: $e:\n$st\n');
await _sentMessagesCubit!.operateAppendEventual((writer) =>
writer.addAll(messages.map((m) => m.writeToBuffer()).toList()));
} on Exception catch (e) {
log.error('Exception appending unsent messages: $e');
} }
// _sendingMessages = const IList.empty(); // _sendingMessages = const IList.empty();

View file

@ -138,7 +138,8 @@ class RouterCubit extends Cubit<RouterState> {
return null; return null;
case '/developer': case '/developer':
return null; return null;
// Otherwise, if there's no account, we need to go to the new account page. // Otherwise, if there's no account,
// we need to go to the new account page.
default: default:
return state.hasAnyAccount ? null : '/new_account'; return state.hasAnyAccount ? null : '/new_account';
} }

View file

@ -124,7 +124,21 @@ class _DeveloperPageState extends State<DeveloperPage> {
_debugOut('DEBUG >>>\n$debugCommand\n'); _debugOut('DEBUG >>>\n$debugCommand\n');
try { try {
final out = await Veilid.instance.debug(debugCommand); var out = await Veilid.instance.debug(debugCommand);
if (debugCommand == 'help') {
out = 'VeilidChat Commands:\n'
' pool allocations - List DHTRecordPool allocations\n'
' pool opened - List opened DHTRecord instances'
' from the pool\n'
' change_log_ignore <layer> <changes> change the log'
' target ignore list for a tracing layer\n'
' targets to add to the ignore list can be separated by'
' a comma.\n'
' to remove a target from the ignore list, prepend it'
' with a minus.\n\n$out';
}
_debugOut('<<< DEBUG\n$out\n'); _debugOut('<<< DEBUG\n$out\n');
} on Exception catch (e, st) { } on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); _debugOut('<<< ERROR\n$e\n<<< STACK\n$st');

View file

@ -650,7 +650,7 @@ packages:
path: "../../../../veilid/veilid-flutter" path: "../../../../veilid/veilid-flutter"
relative: true relative: true
source: path source: path
version: "0.4.3" version: "0.4.4"
veilid_support: veilid_support:
dependency: "direct main" dependency: "direct main"
description: description:

View file

@ -7,6 +7,7 @@ import 'package:collection/collection.dart';
import 'package:equatable/equatable.dart'; import 'package:equatable/equatable.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import '../../../src/veilid_log.dart';
import '../../../veilid_support.dart'; import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto; import '../../proto/proto.dart' as proto;

View file

@ -47,8 +47,16 @@ class _DHTLogRead implements DHTLogReadOperations {
final chunks = Iterable<int>.generate(length) final chunks = Iterable<int>.generate(length)
.slices(kMaxDHTConcurrency) .slices(kMaxDHTConcurrency)
.map((chunk) => chunk .map((chunk) => chunk.map((pos) async {
.map((pos) async => get(pos + start, forceRefresh: forceRefresh))); try {
return get(pos + start, forceRefresh: forceRefresh);
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
rethrow;
}
}));
for (final chunk in chunks) { for (final chunk in chunks) {
final elems = await chunk.wait; final elems = await chunk.wait;

View file

@ -248,7 +248,12 @@ class _DHTLogSpine {
final headDelta = _ringDistance(newHead, oldHead); final headDelta = _ringDistance(newHead, oldHead);
final tailDelta = _ringDistance(newTail, oldTail); final tailDelta = _ringDistance(newTail, oldTail);
if (headDelta > _positionLimit ~/ 2 || tailDelta > _positionLimit ~/ 2) { if (headDelta > _positionLimit ~/ 2 || tailDelta > _positionLimit ~/ 2) {
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData('_DHTLogSpine::_updateHead '
'_head=$_head _tail=$_tail '
'oldHead=$oldHead oldTail=$oldTail '
'newHead=$newHead newTail=$newTail '
'headDelta=$headDelta tailDelta=$tailDelta '
'_positionLimit=$_positionLimit');
} }
} }

View file

@ -17,7 +17,8 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
} }
final lookup = await _spine.lookupPosition(pos); final lookup = await _spine.lookupPosition(pos);
if (lookup == null) { if (lookup == null) {
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData(
'_DHTLogRead::tryWriteItem pos=$pos _spine.length=${_spine.length}');
} }
// Write item to the segment // Write item to the segment
@ -45,12 +46,14 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
} }
final aLookup = await _spine.lookupPosition(aPos); final aLookup = await _spine.lookupPosition(aPos);
if (aLookup == null) { if (aLookup == null) {
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos '
'_spine.length=${_spine.length}');
} }
final bLookup = await _spine.lookupPosition(bPos); final bLookup = await _spine.lookupPosition(bPos);
if (bLookup == null) { if (bLookup == null) {
await aLookup.close(); await aLookup.close();
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData('_DHTLogWrite::swap aPos=$aPos bPos=$bPos '
'_spine.length=${_spine.length}');
} }
// Swap items in the segments // Swap items in the segments
@ -65,7 +68,10 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
if (bItem.value == null) { if (bItem.value == null) {
final aItem = await aWrite.get(aLookup.pos); final aItem = await aWrite.get(aLookup.pos);
if (aItem == null) { if (aItem == null) {
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData(
'_DHTLogWrite::swap aPos=$aPos bPos=$bPos '
'aLookup.pos=${aLookup.pos} bLookup.pos=${bLookup.pos} '
'_spine.length=${_spine.length}');
} }
await sb.operateWriteEventual((bWrite) async { await sb.operateWriteEventual((bWrite) async {
final success = await bWrite final success = await bWrite
@ -101,7 +107,9 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
await write.clear(); await write.clear();
} else if (lookup.pos != write.length) { } else if (lookup.pos != write.length) {
// We should always be appending at the length // We should always be appending at the length
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData(
'_DHTLogWrite::add lookup.pos=${lookup.pos} '
'write.length=${write.length}');
} }
return write.add(value); return write.add(value);
})); }));
@ -117,12 +125,16 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
final dws = DelayedWaitSet<void, void>(); final dws = DelayedWaitSet<void, void>();
var success = true; var success = true;
for (var valueIdx = 0; valueIdx < values.length;) { for (var valueIdxIter = 0; valueIdxIter < values.length;) {
final valueIdx = valueIdxIter;
final remaining = values.length - valueIdx; final remaining = values.length - valueIdx;
final lookup = await _spine.lookupPosition(insertPos + valueIdx); final lookup = await _spine.lookupPosition(insertPos + valueIdx);
if (lookup == null) { if (lookup == null) {
throw const DHTExceptionInvalidData(); throw DHTExceptionInvalidData('_DHTLogWrite::addAll '
'_spine.length=${_spine.length}'
'insertPos=$insertPos valueIdx=$valueIdx '
'values.length=${values.length} ');
} }
final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos); final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos);
@ -137,16 +149,21 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
await write.clear(); await write.clear();
} else if (lookup.pos != write.length) { } else if (lookup.pos != write.length) {
// We should always be appending at the length // We should always be appending at the length
throw const DHTExceptionInvalidData(); await write.truncate(lookup.pos);
} }
return write.addAll(sublistValues); await write.addAll(sublistValues);
success = true;
})); }));
} on DHTExceptionOutdated { } on DHTExceptionOutdated {
success = false; success = false;
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
} }
}); });
valueIdx += sacount; valueIdxIter += sacount;
} }
await dws(); await dws();

View file

@ -246,11 +246,13 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
/// Print children /// Print children
String debugChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) { String debugChildren(TypedKey recordKey, {List<TypedKey>? allDeps}) {
allDeps ??= _collectChildrenInner(recordKey); allDeps ??= _collectChildrenInner(recordKey);
// Debugging
// ignore: avoid_print // ignore: avoid_print
var out = var out =
'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n'; 'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n';
for (final dep in allDeps) { for (final dep in allDeps) {
if (dep != recordKey) { if (dep != recordKey) {
// Debugging
// ignore: avoid_print // ignore: avoid_print
out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n'; out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n';
} }
@ -270,32 +272,25 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
break; break;
} }
} }
} else { }
final now = Veilid.instance.now().value; // else {
// Expired, process renewal if desired
for (final entry in _opened.entries) {
final openedKey = entry.key;
final openedRecordInfo = entry.value;
if (openedKey == updateValueChange.key) { // XXX: should no longer be necessary
// Renew watch state for each opened record // // Remove watch state
for (final rec in openedRecordInfo.records) { //
// See if the watch had an expiration and if it has expired // for (final entry in _opened.entries) {
// otherwise the renewal will keep the same parameters // final openedKey = entry.key;
final watchState = rec._watchState; // final openedRecordInfo = entry.value;
if (watchState != null) {
final exp = watchState.expiration; // if (openedKey == updateValueChange.key) {
if (exp != null && exp.value < now) { // for (final rec in openedRecordInfo.records) {
// Has expiration, and it has expired, clear watch state // rec._watchState = null;
rec._watchState = null; // }
} // openedRecordInfo.shared.needsWatchStateUpdate = true;
} // break;
} // }
openedRecordInfo.shared.needsWatchStateUpdate = true; // }
break; //}
}
}
}
} }
/// Log the current record allocations /// Log the current record allocations
@ -735,7 +730,6 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
int? totalCount; int? totalCount;
Timestamp? maxExpiration; Timestamp? maxExpiration;
List<ValueSubkeyRange>? allSubkeys; List<ValueSubkeyRange>? allSubkeys;
Timestamp? earliestRenewalTime;
var noExpiration = false; var noExpiration = false;
var everySubkey = false; var everySubkey = false;
@ -768,15 +762,6 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
} else { } else {
everySubkey = true; everySubkey = true;
} }
final wsRenewalTime = ws.renewalTime;
if (wsRenewalTime != null) {
earliestRenewalTime = earliestRenewalTime == null
? wsRenewalTime
: Timestamp(
value: (wsRenewalTime.value < earliestRenewalTime.value
? wsRenewalTime.value
: earliestRenewalTime.value));
}
} }
} }
if (noExpiration) { if (noExpiration) {
@ -793,22 +778,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
subkeys: allSubkeys, subkeys: allSubkeys,
expiration: maxExpiration, expiration: maxExpiration,
count: totalCount, count: totalCount,
renewalTime: earliestRenewalTime); );
}
static void _updateWatchRealExpirations(Iterable<DHTRecord> records,
Timestamp realExpiration, Timestamp renewalTime) {
for (final rec in records) {
final ws = rec._watchState;
if (ws != null) {
rec._watchState = _WatchState(
subkeys: ws.subkeys,
expiration: ws.expiration,
count: ws.count,
realExpiration: realExpiration,
renewalTime: renewalTime);
}
}
} }
Future<void> _watchStateChange( Future<void> _watchStateChange(
@ -833,9 +803,9 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
// Only try this once, if it doesn't succeed then it can just expire // Only try this once, if it doesn't succeed then it can just expire
// on its own. // on its own.
try { try {
final cancelled = await dhtctx.cancelDHTWatch(openedRecordKey); final stillActive = await dhtctx.cancelDHTWatch(openedRecordKey);
log('cancelDHTWatch: key=$openedRecordKey, cancelled=$cancelled, ' log('cancelDHTWatch: key=$openedRecordKey, stillActive=$stillActive, '
'debugNames=${openedRecordInfo.debugNames}'); 'debugNames=${openedRecordInfo.debugNames}');
openedRecordInfo.shared.unionWatchState = null; openedRecordInfo.shared.unionWatchState = null;
@ -858,34 +828,20 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
final subkeys = unionWatchState.subkeys?.toList(); final subkeys = unionWatchState.subkeys?.toList();
final count = unionWatchState.count; final count = unionWatchState.count;
final expiration = unionWatchState.expiration; final expiration = unionWatchState.expiration;
final now = veilid.now();
final realExpiration = await dhtctx.watchDHTValues(openedRecordKey, final active = await dhtctx.watchDHTValues(openedRecordKey,
subkeys: unionWatchState.subkeys?.toList(), subkeys: unionWatchState.subkeys?.toList(),
count: unionWatchState.count, count: unionWatchState.count,
expiration: unionWatchState.expiration ?? expiration: unionWatchState.expiration);
(_defaultWatchDurationSecs == null
? null
: veilid.now().offset(TimestampDuration.fromMillis(
_defaultWatchDurationSecs! * 1000))));
final expirationDuration = realExpiration.diff(now); log('watchDHTValues(active=$active): '
final renewalTime = now.offset(TimestampDuration( 'key=$openedRecordKey, subkeys=$subkeys, '
value: expirationDuration.value *
BigInt.from(_watchRenewalNumerator) ~/
BigInt.from(_watchRenewalDenominator)));
log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, '
'count=$count, expiration=$expiration, ' 'count=$count, expiration=$expiration, '
'realExpiration=$realExpiration, '
'renewalTime=$renewalTime, '
'debugNames=${openedRecordInfo.debugNames}'); 'debugNames=${openedRecordInfo.debugNames}');
// Update watch states with real expiration // Update watch states with real expiration
if (realExpiration.value != BigInt.zero) { if (active) {
openedRecordInfo.shared.unionWatchState = unionWatchState; openedRecordInfo.shared.unionWatchState = unionWatchState;
_updateWatchRealExpirations(
openedRecordInfo.records, realExpiration, renewalTime);
openedRecordInfo.shared.needsWatchStateUpdate = false; openedRecordInfo.shared.needsWatchStateUpdate = false;
} }
} on VeilidAPIExceptionTimeout { } on VeilidAPIExceptionTimeout {
@ -944,22 +900,13 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
/// Ticker to check watch state change requests /// Ticker to check watch state change requests
Future<void> tick() async => _mutex.protect(() async { Future<void> tick() async => _mutex.protect(() async {
// See if any opened records need watch state changes // See if any opened records need watch state changes
final now = veilid.now();
for (final kv in _opened.entries) { for (final kv in _opened.entries) {
final openedRecordKey = kv.key; final openedRecordKey = kv.key;
final openedRecordInfo = kv.value; final openedRecordInfo = kv.value;
var wantsWatchStateUpdate = final wantsWatchStateUpdate =
openedRecordInfo.shared.needsWatchStateUpdate; openedRecordInfo.shared.needsWatchStateUpdate;
// Check if we have reached renewal time for the watch
if (openedRecordInfo.shared.unionWatchState != null &&
openedRecordInfo.shared.unionWatchState!.renewalTime != null &&
now.value >
openedRecordInfo.shared.unionWatchState!.renewalTime!.value) {
wantsWatchStateUpdate = true;
}
if (wantsWatchStateUpdate) { if (wantsWatchStateUpdate) {
// Update union watch state // Update union watch state
final unionWatchState = final unionWatchState =

View file

@ -1,9 +1,5 @@
part of 'dht_record_pool.dart'; part of 'dht_record_pool.dart';
const int? _defaultWatchDurationSecs = null; // 600
const int _watchRenewalNumerator = 4;
const int _watchRenewalDenominator = 5;
// DHT crypto domain // DHT crypto domain
const String _cryptoDomainDHT = 'dht'; const String _cryptoDomainDHT = 'dht';
@ -14,21 +10,17 @@ const _sfListen = 'listen';
/// Watch state /// Watch state
@immutable @immutable
class _WatchState extends Equatable { class _WatchState extends Equatable {
const _WatchState( const _WatchState({
{required this.subkeys, required this.subkeys,
required this.expiration, required this.expiration,
required this.count, required this.count,
this.realExpiration, });
this.renewalTime});
final List<ValueSubkeyRange>? subkeys; final List<ValueSubkeyRange>? subkeys;
final Timestamp? expiration; final Timestamp? expiration;
final int? count; final int? count;
final Timestamp? realExpiration;
final Timestamp? renewalTime;
@override @override
List<Object?> get props => List<Object?> get props => [subkeys, expiration, count];
[subkeys, expiration, count, realExpiration, renewalTime];
} }
/// Data shared amongst all DHTRecord instances /// Data shared amongst all DHTRecord instances

View file

@ -4,6 +4,7 @@ import 'dart:typed_data';
import 'package:async_tools/async_tools.dart'; import 'package:async_tools/async_tools.dart';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import '../../../src/veilid_log.dart';
import '../../../veilid_support.dart'; import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto; import '../../proto/proto.dart' as proto;

View file

@ -383,6 +383,24 @@ class _DHTShortArrayHead {
// xxx: free list optimization here? // xxx: free list optimization here?
} }
/// Truncate index to a particular length
void truncate(int newLength) {
if (newLength >= _index.length) {
return;
} else if (newLength == 0) {
clearIndex();
return;
} else if (newLength < 0) {
throw StateError('can not truncate to negative length');
}
final newIndex = _index.sublist(0, newLength);
final freed = _index.sublist(newLength);
_index = newIndex;
_free.addAll(freed);
}
/// Validate the head from the DHT is properly formatted /// Validate the head from the DHT is properly formatted
/// and calculate the free list from it while we're here /// and calculate the free list from it while we're here
List<int> _makeFreeList( List<int> _makeFreeList(

View file

@ -60,8 +60,16 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
final chunks = Iterable<int>.generate(length) final chunks = Iterable<int>.generate(length)
.slices(kMaxDHTConcurrency) .slices(kMaxDHTConcurrency)
.map((chunk) => chunk .map((chunk) => chunk.map((pos) async {
.map((pos) async => get(pos + start, forceRefresh: forceRefresh))); try {
return get(pos + start, forceRefresh: forceRefresh);
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
rethrow;
}
}));
for (final chunk in chunks) { for (final chunk in chunks) {
final elems = await chunk.wait; final elems = await chunk.wait;

View file

@ -9,6 +9,7 @@ abstract class DHTShortArrayWriteOperations
DHTRandomWrite, DHTRandomWrite,
DHTInsertRemove, DHTInsertRemove,
DHTAdd, DHTAdd,
DHTTruncate,
DHTClear {} DHTClear {}
class _DHTShortArrayWrite extends _DHTShortArrayRead class _DHTShortArrayWrite extends _DHTShortArrayRead
@ -72,11 +73,17 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
final value = values[i]; final value = values[i];
final outSeqNum = outSeqNums[i]; final outSeqNum = outSeqNums[i];
dws.add((_) async { dws.add((_) async {
try {
final outValue = await lookup.record.tryWriteBytes(value, final outValue = await lookup.record.tryWriteBytes(value,
subkey: lookup.recordSubkey, outSeqNum: outSeqNum); subkey: lookup.recordSubkey, outSeqNum: outSeqNum);
if (outValue != null) { if (outValue != null) {
success = false; success = false;
} }
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
}
}); });
} }
@ -142,6 +149,11 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
_head.clearIndex(); _head.clearIndex();
} }
@override
Future<void> truncate(int newLength) async {
_head.truncate(newLength);
}
@override @override
Future<bool> tryWriteItem(int pos, Uint8List newValue, Future<bool> tryWriteItem(int pos, Uint8List newValue,
{Output<Uint8List>? output}) async { {Output<Uint8List>? output}) async {

View file

@ -2,20 +2,32 @@ class DHTExceptionOutdated implements Exception {
const DHTExceptionOutdated( const DHTExceptionOutdated(
[this.cause = 'operation failed due to newer dht value']); [this.cause = 'operation failed due to newer dht value']);
final String cause; final String cause;
@override
String toString() => 'DHTExceptionOutdated: $cause';
} }
class DHTExceptionInvalidData implements Exception { class DHTExceptionInvalidData implements Exception {
const DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']); const DHTExceptionInvalidData(this.cause);
final String cause; final String cause;
@override
String toString() => 'DHTExceptionInvalidData: $cause';
} }
class DHTExceptionCancelled implements Exception { class DHTExceptionCancelled implements Exception {
const DHTExceptionCancelled([this.cause = 'operation was cancelled']); const DHTExceptionCancelled([this.cause = 'operation was cancelled']);
final String cause; final String cause;
@override
String toString() => 'DHTExceptionCancelled: $cause';
} }
class DHTExceptionNotAvailable implements Exception { class DHTExceptionNotAvailable implements Exception {
const DHTExceptionNotAvailable( const DHTExceptionNotAvailable(
[this.cause = 'request could not be completed at this time']); [this.cause = 'request could not be completed at this time']);
final String cause; final String cause;
@override
String toString() => 'DHTExceptionNotAvailable: $cause';
} }

View file

@ -7,6 +7,7 @@ import 'package:protobuf/protobuf.dart';
import 'config.dart'; import 'config.dart';
import 'table_db.dart'; import 'table_db.dart';
import 'veilid_log.dart';
class PersistentQueue<T extends GeneratedMessage> class PersistentQueue<T extends GeneratedMessage>
with TableDBBackedFromBuffer<IList<T>> { with TableDBBackedFromBuffer<IList<T>> {
@ -46,7 +47,7 @@ class PersistentQueue<T extends GeneratedMessage>
} }
} }
Future<void> _init(_) async { Future<void> _init(Completer<void> _) async {
// Start the processor // Start the processor
unawaited(Future.delayed(Duration.zero, () async { unawaited(Future.delayed(Duration.zero, () async {
await _initWait(); await _initWait();
@ -182,10 +183,28 @@ class PersistentQueue<T extends GeneratedMessage>
@override @override
IList<T> valueFromBuffer(Uint8List bytes) { IList<T> valueFromBuffer(Uint8List bytes) {
final reader = CodedBufferReader(bytes);
var out = IList<T>(); var out = IList<T>();
try {
final reader = CodedBufferReader(bytes);
while (!reader.isAtEnd()) { while (!reader.isAtEnd()) {
out = out.add(_fromBuffer(reader.readBytesAsView())); final bytes = reader.readBytesAsView();
try {
final item = _fromBuffer(bytes);
out = out.add(item);
} on Exception catch (e, st) {
veilidLoggy.debug(
'Dropping invalid item from persistent queue: $bytes\n'
'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n',
e,
st);
}
}
} on Exception catch (e, st) {
veilidLoggy.debug(
'Dropping remainder of invalid persistent queue\n'
'tableName=${tableName()}:tableKeyName=${tableKeyName()}\n',
e,
st);
} }
return out; return out;
} }

View file

@ -9,6 +9,7 @@ import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart'; import 'package:protobuf/protobuf.dart';
import '../veilid_support.dart'; import '../veilid_support.dart';
import 'veilid_log.dart';
@immutable @immutable
class TableDBArrayUpdate extends Equatable { class TableDBArrayUpdate extends Equatable {
@ -262,7 +263,16 @@ class _TableDBArrayBase {
final dws = DelayedWaitSet<Uint8List, void>(); final dws = DelayedWaitSet<Uint8List, void>();
while (batchLen > 0) { while (batchLen > 0) {
final entry = await _getIndexEntry(pos); final entry = await _getIndexEntry(pos);
dws.add((_) async => (await _loadEntry(entry))!); dws.add((_) async {
try {
return (await _loadEntry(entry))!;
// Need some way to debug ParallelWaitError
// ignore: avoid_catches_without_on_clauses
} catch (e, st) {
veilidLoggy.error('$e\n$st\n');
rethrow;
}
});
pos++; pos++;
batchLen--; batchLen--;
} }

View file

@ -726,7 +726,7 @@ packages:
path: "../../../veilid/veilid-flutter" path: "../../../veilid/veilid-flutter"
relative: true relative: true
source: path source: path
version: "0.4.3" version: "0.4.4"
vm_service: vm_service:
dependency: transitive dependency: transitive
description: description: