init failure retry work

This commit is contained in:
Christien Rioux 2024-07-17 16:53:47 -04:00
parent 519571628f
commit ba191d3903
31 changed files with 438 additions and 218 deletions

View File

@ -44,7 +44,7 @@ class PerAccountCollectionCubit extends Cubit<PerAccountCollectionState> {
await super.close();
}
Future<void> _init() async {
Future<void> _init(Completer<void> _cancel) async {
// subscribe to accountInfo changes
_processor.follow(accountInfoCubit.stream, accountInfoCubit.state,
_followAccountInfoState);
@ -235,7 +235,7 @@ class PerAccountCollectionCubit extends Cubit<PerAccountCollectionState> {
final Locator _locator;
final _processor = SingleStateProcessor<AccountInfo>();
final _initWait = WaitSet<void>();
final _initWait = WaitSet<void, void>();
// Per-account cubits regardless of login state
final AccountInfoCubit accountInfoCubit;

View File

@ -68,7 +68,7 @@ class ChatComponentCubit extends Cubit<ChatComponentState> {
messagesCubit: messagesCubit,
);
Future<void> _init() async {
Future<void> _init(Completer<void> _cancel) async {
// Get local user info and account record cubit
_localUserIdentityKey = _accountInfo.identityTypedPublicKey;
@ -420,7 +420,7 @@ class ChatComponentCubit extends Cubit<ChatComponentState> {
////////////////////////////////////////////////////////////////////////////
final _initWait = WaitSet<void>();
final _initWait = WaitSet<void, void>();
final AccountInfo _accountInfo;
final AccountRecordCubit _accountRecordCubit;
final ContactListCubit _contactListCubit;

View File

@ -95,7 +95,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}
// Initialize everything
Future<void> _init() async {
Future<void> _init(Completer<void> _cancel) async {
_unsentMessagesQueue = PersistentQueue<proto.Message>(
table: 'SingleContactUnsentMessages',
key: _remoteConversationRecordKey.toString(),
@ -445,7 +445,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
/////////////////////////////////////////////////////////////////////////
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
late final AccountInfo _accountInfo;
final TypedKey _remoteIdentityPublicKey;
final TypedKey _localConversationRecordKey;

View File

@ -71,7 +71,7 @@ class ContactListCubit extends DHTShortArrayCubit<proto.Contact> {
final updated = await writer.tryWriteItemProtobuf(
proto.Contact.fromBuffer, pos, newContact);
if (!updated) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
break;
}

View File

@ -46,12 +46,13 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
_identityWriter = _accountInfo.identityWriter;
if (_localConversationRecordKey != null) {
_initWait.add(() async {
_initWait.add((_) async {
await _setLocalConversation(() async {
// Open local record key if it is specified
final pool = DHTRecordPool.instance;
final crypto = await _cachedConversationCrypto();
final writer = _identityWriter;
final record = await pool.openRecordWrite(
_localConversationRecordKey!, writer,
debugName: 'ConversationCubit::LocalConversation',
@ -64,16 +65,18 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
}
if (_remoteConversationRecordKey != null) {
_initWait.add(() async {
_initWait.add((cancel) async {
await _setRemoteConversation(() async {
// Open remote record key if it is specified
final pool = DHTRecordPool.instance;
final crypto = await _cachedConversationCrypto();
final record = await pool.openRecordRead(_remoteConversationRecordKey,
debugName: 'ConversationCubit::RemoteConversation',
parent: pool.getParentRecordKey(_remoteConversationRecordKey) ??
accountInfo.accountRecordKey,
crypto: crypto);
return record;
});
});
@ -326,5 +329,5 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
ConversationState _incrementalState = const ConversationState(
localConversation: null, remoteConversation: null);
VeilidCrypto? _conversationCrypto;
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
}

View File

@ -0,0 +1 @@

View File

@ -1,4 +1,5 @@
export 'animations.dart';
export 'exceptions.dart';
export 'loggy.dart';
export 'misc.dart';
export 'package_info.dart';

View File

@ -18,6 +18,7 @@ import 'package:xterm/xterm.dart';
import '../../layout/layout.dart';
import '../../theme/theme.dart';
import '../../tools/tools.dart';
import 'history_text_editing_controller.dart';
final globalDebugTerminal = Terminal(
maxLines: 50000,
@ -36,17 +37,12 @@ class DeveloperPage extends StatefulWidget {
}
class _DeveloperPageState extends State<DeveloperPage> {
final _terminalController = TerminalController();
final _debugCommandController = TextEditingController();
final _logLevelController = DropdownController(duration: 250.ms);
final List<CoolDropdownItem<LogLevel>> _logLevelDropdownItems = [];
var _logLevelDropDown = log.level.logLevel;
var _showEllet = false;
@override
void initState() {
super.initState();
_historyController = HistoryTextEditingController(setState: setState);
_terminalController.addListener(() {
setState(() {});
});
@ -66,42 +62,71 @@ class _DeveloperPageState extends State<DeveloperPage> {
globalDebugTerminal.write(colorOut.replaceAll('\n', '\r\n'));
}
Future<void> _sendDebugCommand(String debugCommand) async {
if (debugCommand == 'pool allocations') {
DHTRecordPool.instance.debugPrintAllocations();
return;
}
if (debugCommand == 'pool opened') {
DHTRecordPool.instance.debugPrintOpened();
return;
}
if (debugCommand.startsWith('change_log_ignore ')) {
final args = debugCommand.split(' ');
if (args.length < 3) {
_debugOut('Incorrect number of arguments');
return;
}
final layer = args[1];
final changes = args[2].split(',');
Veilid.instance.changeLogIgnore(layer, changes);
return;
}
if (debugCommand == 'ellet') {
setState(() {
_showEllet = !_showEllet;
});
return;
}
_debugOut('DEBUG >>>\n$debugCommand\n');
Future<bool> _sendDebugCommand(String debugCommand) async {
try {
final out = await Veilid.instance.debug(debugCommand);
_debugOut('<<< DEBUG\n$out\n');
} on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');
setState(() {
_busy = true;
});
if (debugCommand == 'pool allocations') {
try {
DHTRecordPool.instance.debugPrintAllocations();
} on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');
return false;
}
return true;
}
if (debugCommand == 'pool opened') {
try {
DHTRecordPool.instance.debugPrintOpened();
} on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');
return false;
}
return true;
}
if (debugCommand.startsWith('change_log_ignore ')) {
final args = debugCommand.split(' ');
if (args.length < 3) {
_debugOut('Incorrect number of arguments');
return false;
}
final layer = args[1];
final changes = args[2].split(',');
try {
Veilid.instance.changeLogIgnore(layer, changes);
} on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');
return false;
}
return true;
}
if (debugCommand == 'ellet') {
setState(() {
_showEllet = !_showEllet;
});
return true;
}
_debugOut('DEBUG >>>\n$debugCommand\n');
try {
final out = await Veilid.instance.debug(debugCommand);
_debugOut('<<< DEBUG\n$out\n');
} on Exception catch (e, st) {
_debugOut('<<< ERROR\n$e\n<<< STACK\n$st');
return false;
}
return true;
} finally {
setState(() {
_busy = false;
});
}
}
@ -284,7 +309,9 @@ class _DeveloperPageState extends State<DeveloperPage> {
})
]).expanded(),
TextField(
controller: _debugCommandController,
enabled: !_busy,
controller: _historyController.controller,
focusNode: _historyController.focusNode,
onTapOutside: (event) {
FocusManager.instance.primaryFocus?.unfocus();
},
@ -303,28 +330,54 @@ class _DeveloperPageState extends State<DeveloperPage> {
hintText: translate('developer.command'),
suffixIcon: IconButton(
icon: Icon(Icons.send,
color: _debugCommandController.text.isEmpty
color: _historyController.controller.text.isEmpty
? scale.primaryScale.primary.withAlpha(0x3F)
: scale.primaryScale.primary),
onPressed: _debugCommandController.text.isEmpty
? null
: () async {
final debugCommand = _debugCommandController.text;
_debugCommandController.clear();
await _sendDebugCommand(debugCommand);
},
onPressed:
(_historyController.controller.text.isEmpty || _busy)
? null
: () async {
final debugCommand =
_historyController.controller.text;
_historyController.controller.clear();
await _sendDebugCommand(debugCommand);
},
)),
onChanged: (_) {
setState(() => {});
},
onEditingComplete: () {
// part of the default action if onEditingComplete is null
_historyController.controller.clearComposing();
// don't give up focus though
},
onSubmitted: (debugCommand) async {
_debugCommandController.clear();
await _sendDebugCommand(debugCommand);
if (debugCommand.isEmpty) {
return;
}
final ok = await _sendDebugCommand(debugCommand);
if (ok) {
setState(() {
_historyController.submit(debugCommand);
});
}
},
).paddingAll(4)
]))));
}
////////////////////////////////////////////////////////////////////////////
final _terminalController = TerminalController();
late final HistoryTextEditingController _historyController;
final _logLevelController = DropdownController(duration: 250.ms);
final List<CoolDropdownItem<LogLevel>> _logLevelDropdownItems = [];
var _logLevelDropDown = log.level.logLevel;
var _showEllet = false;
var _busy = false;
@override
void debugFillProperties(DiagnosticPropertiesBuilder properties) {
super.debugFillProperties(properties);

View File

@ -0,0 +1,69 @@
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
/// TextField History Controller
class HistoryTextEditingController {
HistoryTextEditingController(
{required this.setState, TextEditingController? controller})
: _controller = controller ?? TextEditingController() {
_historyFocusNode = FocusNode(onKeyEvent: (_node, event) {
if (event.runtimeType == KeyDownEvent &&
event.logicalKey == LogicalKeyboardKey.arrowUp) {
if (_historyPosition > 0) {
if (_historyPosition == _history.length) {
_historyCurrentEdit = _controller.text;
}
_historyPosition -= 1;
setState(() {
_controller.text = _history[_historyPosition];
});
}
return KeyEventResult.handled;
} else if (event.runtimeType == KeyDownEvent &&
event.logicalKey == LogicalKeyboardKey.arrowDown) {
if (_historyPosition < _history.length) {
_historyPosition += 1;
setState(() {
if (_historyPosition == _history.length) {
_controller.text = _historyCurrentEdit;
} else {
_controller.text = _history[_historyPosition];
}
});
}
return KeyEventResult.handled;
} else if (event.runtimeType == KeyDownEvent) {
_historyPosition = _history.length;
_historyCurrentEdit = _controller.text;
}
return KeyEventResult.ignored;
});
}
void submit(String v) {
// add to history
if (_history.isEmpty || _history.last != v) {
_history.add(v);
if (_history.length > 100) {
_history.removeAt(0);
}
}
_historyPosition = _history.length;
setState(() {
_controller.text = '';
});
}
FocusNode get focusNode => _historyFocusNode;
TextEditingController get controller => _controller;
////////////////////////////////////////////////////////////////////////////
late void Function(void Function()) setState;
final TextEditingController _controller;
late final FocusNode _historyFocusNode;
final List<String> _history = [];
int _historyPosition = 0;
String _historyCurrentEdit = '';
}

View File

@ -37,10 +37,10 @@ packages:
dependency: "direct dev"
description:
name: async_tools
sha256: f35f5590711f1422c7eff990351e879c5433486f9b5be5df30818521bf6ab8d6
sha256: "375da1e5b51974a9e84469b7630f36d1361fb53d3fcc818a5a0121ab51fe0343"
url: "https://pub.dev"
source: hosted
version: "0.1.3"
version: "0.1.4"
bloc:
dependency: transitive
description:
@ -53,10 +53,10 @@ packages:
dependency: transitive
description:
name: bloc_advanced_tools
sha256: "11f534f9e89561302de3bd07ab2dfe1c2dacaa8db9794ccdb57c55cfc02dffc6"
sha256: "0599d860eb096c5b12457c60bdf7f66bfcb7171bc94ccf2c7c752b6a716a79f9"
url: "https://pub.dev"
source: hosted
version: "0.1.3"
version: "0.1.4"
boolean_selector:
dependency: transitive
description:
@ -650,7 +650,7 @@ packages:
path: "../../../../veilid/veilid-flutter"
relative: true
source: path
version: "0.3.2"
version: "0.3.3"
veilid_support:
dependency: "direct main"
description:

View File

@ -14,7 +14,7 @@ dependencies:
path: ../
dev_dependencies:
async_tools: ^0.1.3
async_tools: ^0.1.4
integration_test:
sdk: flutter
lint_hard: ^4.0.0

View File

@ -43,13 +43,26 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
required T Function(List<int> data) decodeElement,
}) : _decodeElement = decodeElement,
super(const BlocBusyState(AsyncValue.loading())) {
_initWait.add(() async {
// Open DHT record
_log = await open();
_wantsCloseRecord = true;
_initWait.add((cancel) async {
try {
// Do record open/create
while (!cancel.isCompleted) {
try {
// Open DHT record
_log = await open();
_wantsCloseRecord = true;
break;
} on VeilidAPIExceptionTryAgain {
// Wait for a bit
await asyncSleep();
}
}
} on Exception catch (e, st) {
emit(DHTLogBusyState(AsyncValue.error(e, st)));
return;
}
// Make initial state update
await _refreshNoWait();
_initialUpdate();
_subscription = await _log.listen(_update);
});
}
@ -156,7 +169,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
// Run at most one background update process
// Because this is async, we could get an update while we're
// still processing the last one. Only called after init future has run
// so we dont have to wait for that here.
// or during it, so we dont have to wait for that here.
// Accumulate head and tail deltas
_headDelta += upd.headDelta;
@ -188,9 +201,15 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
});
}
void _initialUpdate() {
_sspUpdate.busyUpdate<T, DHTLogState<T>>(busy, (emit) async {
await _refreshInner(emit);
});
}
@override
Future<void> close() async {
await _initWait();
await _initWait(cancelValue: true);
await _subscription?.cancel();
_subscription = null;
if (_wantsCloseRecord) {
@ -217,7 +236,7 @@ class DHTLogCubit<T> extends Cubit<DHTLogBusyState<T>>
return _log.operateAppendEventual(closure, timeout: timeout);
}
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, bool> _initWait = WaitSet();
late final DHTLog _log;
final T Function(List<int> data) _decodeElement;
StreamSubscription<void>? _subscription;

View File

@ -45,8 +45,9 @@ class _DHTLogRead implements DHTLogReadOperations {
final out = <Uint8List>[];
(start, length) = _clampStartLen(start, length);
final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map(
(chunk) =>
final chunks = Iterable<int>.generate(length)
.slices(kMaxDHTConcurrency)
.map((chunk) =>
chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh)));
for (final chunk in chunks) {

View File

@ -147,7 +147,7 @@ class _DHTLogSpine {
if (!await writeSpineHead(old: (oldHead, oldTail))) {
// Failed to write head means head got overwritten so write should
// be considered failed
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
return out;
} on Exception {
@ -187,7 +187,7 @@ class _DHTLogSpine {
try {
out = await closure(this);
break;
} on DHTExceptionTryAgain {
} on DHTExceptionOutdated {
// Failed to write in closure resets state
_head = oldHead;
_tail = oldTail;

View File

@ -26,10 +26,10 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
final success =
await write.tryWriteItem(lookup.pos, newValue, output: output);
if (!success) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
}));
} on DHTExceptionTryAgain {
} on DHTExceptionOutdated {
return false;
}
return true;
@ -71,14 +71,14 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
final success = await bWrite
.tryWriteItem(bLookup.pos, aItem, output: bItem);
if (!success) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
});
}
final success =
await aWrite.tryWriteItem(aLookup.pos, bItem.value!);
if (!success) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
})));
}
@ -114,7 +114,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
_spine.allocateTail(values.length);
// Look up the first position and shortarray
final dws = DelayedWaitSet<void>();
final dws = DelayedWaitSet<void, void>();
var success = true;
for (var valueIdx = 0; valueIdx < values.length;) {
@ -128,7 +128,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos);
final sublistValues = values.sublist(valueIdx, valueIdx + sacount);
dws.add(() async {
dws.add((_) async {
try {
await lookup.scope((sa) async => sa.operateWrite((write) async {
// If this a new segment, then clear it in
@ -141,7 +141,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
}
return write.addAll(sublistValues);
}));
} on DHTExceptionTryAgain {
} on DHTExceptionOutdated {
success = false;
}
});
@ -152,7 +152,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations {
await dws();
if (!success) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
}

View File

@ -12,7 +12,7 @@ typedef StateFunction<T> = Future<T?> Function(
DHTRecord, List<ValueSubkeyRange>, Uint8List?);
typedef WatchFunction = Future<void> Function(DHTRecord);
class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
abstract class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecordCubit({
required Future<DHTRecord> Function() open,
required InitialStateFunction<T> initialStateFunction,
@ -21,10 +21,24 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
}) : _wantsCloseRecord = false,
_stateFunction = stateFunction,
super(const AsyncValue.loading()) {
initWait.add(() async {
// Do record open/create
_record = await open();
_wantsCloseRecord = true;
initWait.add((cancel) async {
try {
// Do record open/create
while (!cancel.isCompleted) {
try {
_record = await open();
_wantsCloseRecord = true;
break;
} on VeilidAPIExceptionKeyNotFound {
} on VeilidAPIExceptionTryAgain {
// Wait for a bit
await asyncSleep();
}
}
} on Exception catch (e, st) {
emit(AsyncValue.error(e, st));
return;
}
await _init(initialStateFunction, stateFunction, watchFunction);
});
}
@ -60,7 +74,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
@override
Future<void> close() async {
await initWait();
await initWait(cancelValue: true);
await _record.cancelWatch();
await _subscription?.cancel();
_subscription = null;
@ -98,7 +112,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
DHTRecord get record => _record;
@protected
final WaitSet<void> initWait = WaitSet();
final WaitSet<void, bool> initWait = WaitSet();
StreamSubscription<DHTRecordWatchChange>? _subscription;
late DHTRecord _record;

View File

@ -18,8 +18,11 @@ part 'dht_record_pool.g.dart';
part 'dht_record.dart';
part 'dht_record_pool_private.dart';
// Maximum number of concurrent DHT operations to perform on the network
const int maxDHTConcurrency = 8;
/// Maximum number of concurrent DHT operations to perform on the network
const int kMaxDHTConcurrency = 8;
/// Number of times to retry a 'key not found'
const int kDHTKeyNotFoundRetry = 3;
typedef DHTRecordPoolLogger = void Function(String message);
@ -60,6 +63,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext)
: _state = const DHTRecordPoolAllocations(),
_mutex = Mutex(),
_recordTagLock = AsyncTagLock(),
_opened = <TypedKey, _OpenedRecordInfo>{},
_markedForDelete = <TypedKey>{},
_routingContext = routingContext,
@ -132,24 +136,17 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
TypedKey? parent,
int defaultSubkey = 0,
VeilidCrypto? crypto}) async =>
_mutex.protect(() async {
_recordTagLock.protect(recordKey, closure: () async {
final dhtctx = routingContext ?? _routingContext;
final openedRecordInfo = await _recordOpenInner(
final rec = await _recordOpenCommon(
debugName: debugName,
dhtctx: dhtctx,
recordKey: recordKey,
parent: parent);
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
sharedDHTRecordData: openedRecordInfo.shared,
crypto: crypto ?? const VeilidCryptoPublic(),
writer: null,
crypto: crypto ?? const VeilidCryptoPublic());
openedRecordInfo.records.add(rec);
parent: parent,
defaultSubkey: defaultSubkey);
return rec;
});
@ -164,27 +161,20 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
int defaultSubkey = 0,
VeilidCrypto? crypto,
}) async =>
_mutex.protect(() async {
_recordTagLock.protect(recordKey, closure: () async {
final dhtctx = routingContext ?? _routingContext;
final openedRecordInfo = await _recordOpenInner(
debugName: debugName,
dhtctx: dhtctx,
recordKey: recordKey,
parent: parent,
writer: writer);
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
writer: writer,
sharedDHTRecordData: openedRecordInfo.shared,
crypto: crypto ??
await privateCryptoFromTypedSecret(
TypedKey(kind: recordKey.kind, value: writer.secret)));
openedRecordInfo.records.add(rec);
final rec = await _recordOpenCommon(
debugName: debugName,
dhtctx: dhtctx,
recordKey: recordKey,
crypto: crypto ??
await privateCryptoFromTypedSecret(
TypedKey(kind: recordKey.kind, value: writer.secret)),
writer: writer,
parent: parent,
defaultSubkey: defaultSubkey,
);
return rec;
});
@ -381,41 +371,72 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
return openedRecordInfo;
}
Future<_OpenedRecordInfo> _recordOpenInner(
Future<DHTRecord> _recordOpenCommon(
{required String debugName,
required VeilidRoutingContext dhtctx,
required TypedKey recordKey,
KeyPair? writer,
TypedKey? parent}) async {
if (!_mutex.isLocked) {
throw StateError('should be locked here');
}
required VeilidCrypto crypto,
required KeyPair? writer,
required TypedKey? parent,
required int defaultSubkey}) async {
log('openDHTRecord: debugName=$debugName key=$recordKey');
// If we are opening a key that already exists
// make sure we are using the same parent if one was specified
_validateParentInner(parent, recordKey);
// See if this has been opened yet
final openedRecordInfo = _opened[recordKey];
final openedRecordInfo = await _mutex.protect(() async {
// If we are opening a key that already exists
// make sure we are using the same parent if one was specified
_validateParentInner(parent, recordKey);
return _opened[recordKey];
});
if (openedRecordInfo == null) {
// Fresh open, just open the record
final recordDescriptor =
await dhtctx.openDHTRecord(recordKey, writer: writer);
var retry = kDHTKeyNotFoundRetry;
late final DHTRecordDescriptor recordDescriptor;
while (true) {
try {
recordDescriptor =
await dhtctx.openDHTRecord(recordKey, writer: writer);
break;
} on VeilidAPIExceptionKeyNotFound {
await asyncSleep();
retry--;
if (retry == 0) {
rethrow;
}
}
}
final newOpenedRecordInfo = _OpenedRecordInfo(
recordDescriptor: recordDescriptor,
defaultWriter: writer,
defaultRoutingContext: dhtctx);
_opened[recordDescriptor.key] = newOpenedRecordInfo;
// Register the dependency
await _addDependencyInner(
parent,
recordKey,
debugName: debugName,
);
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
sharedDHTRecordData: newOpenedRecordInfo.shared,
writer: writer,
crypto: crypto);
return newOpenedRecordInfo;
await _mutex.protect(() async {
// Register the opened record
_opened[recordDescriptor.key] = newOpenedRecordInfo;
// Register the dependency
await _addDependencyInner(
parent,
recordKey,
debugName: debugName,
);
// Register the newly opened record
newOpenedRecordInfo.records.add(rec);
});
return rec;
}
// Already opened
@ -430,37 +451,50 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
openedRecordInfo.shared.defaultRoutingContext = dhtctx;
}
// Register the dependency
await _addDependencyInner(
parent,
recordKey,
debugName: debugName,
);
final rec = DHTRecord._(
debugName: debugName,
routingContext: dhtctx,
defaultSubkey: defaultSubkey,
sharedDHTRecordData: openedRecordInfo.shared,
writer: writer,
crypto: crypto);
return openedRecordInfo;
await _mutex.protect(() async {
// Register the dependency
await _addDependencyInner(
parent,
recordKey,
debugName: debugName,
);
openedRecordInfo.records.add(rec);
});
return rec;
}
// Called when a DHTRecord is closed
// Cleans up the opened record housekeeping and processes any late deletions
Future<void> _recordClosed(DHTRecord record) async {
await _mutex.protect(() async {
final key = record.key;
await _recordTagLock.protect(record.key,
closure: () => _mutex.protect(() async {
final key = record.key;
log('closeDHTRecord: debugName=${record.debugName} key=$key');
log('closeDHTRecord: debugName=${record.debugName} key=$key');
final openedRecordInfo = _opened[key];
if (openedRecordInfo == null ||
!openedRecordInfo.records.remove(record)) {
throw StateError('record already closed');
}
if (openedRecordInfo.records.isEmpty) {
await _watchStateProcessors.remove(key);
await _routingContext.closeDHTRecord(key);
_opened.remove(key);
final openedRecordInfo = _opened[key];
if (openedRecordInfo == null ||
!openedRecordInfo.records.remove(record)) {
throw StateError('record already closed');
}
if (openedRecordInfo.records.isEmpty) {
await _watchStateProcessors.remove(key);
await _routingContext.closeDHTRecord(key);
_opened.remove(key);
await _checkForLateDeletesInner(key);
}
});
await _checkForLateDeletesInner(key);
}
}));
}
// Check to see if this key can finally be deleted
@ -916,6 +950,8 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
DHTRecordPoolAllocations _state;
// Create/open Mutex
final Mutex _mutex;
// Record key tag lock
final AsyncTagLock<TypedKey> _recordTagLock;
// Which DHT records are currently open
final Map<TypedKey, _OpenedRecordInfo> _opened;
// Which DHT records are marked for deletion

View File

@ -30,13 +30,29 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
required T Function(List<int> data) decodeElement,
}) : _decodeElement = decodeElement,
super(const BlocBusyState(AsyncValue.loading())) {
_initWait.add(() async {
// Open DHT record
_shortArray = await open();
_wantsCloseRecord = true;
_initWait.add((cancel) async {
try {
// Do record open/create
while (!cancel.isCompleted) {
try {
// Open DHT record
_shortArray = await open();
_wantsCloseRecord = true;
break;
} on VeilidAPIExceptionTryAgain {
// Wait for a bit
await asyncSleep();
}
}
} on Exception catch (e, st) {
emit(DHTShortArrayBusyState<T>(AsyncValue.error(e, st)));
return;
}
// Make initial state update
await _refreshNoWait();
// Kick off initial update
_update();
// Subscribe to changes
_subscription = await _shortArray.listen(_update);
});
}
@ -82,7 +98,8 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
void _update() {
// Run at most one background update process
// Because this is async, we could get an update while we're
// still processing the last one. Only called after init future has run
// still processing the last one.
// Only called after init future has run, or during it
// so we dont have to wait for that here.
_sspUpdate.busyUpdate<T, DHTShortArrayState<T>>(
busy, (emit) async => _refreshInner(emit));
@ -90,7 +107,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
@override
Future<void> close() async {
await _initWait();
await _initWait(cancelValue: true);
await _subscription?.cancel();
_subscription = null;
if (_wantsCloseRecord) {
@ -118,7 +135,7 @@ class DHTShortArrayCubit<T> extends Cubit<DHTShortArrayBusyState<T>>
return _shortArray.operateWriteEventual(closure, timeout: timeout);
}
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, bool> _initWait = WaitSet();
late final DHTShortArray _shortArray;
final T Function(List<int> data) _decodeElement;
StreamSubscription<void>? _subscription;

View File

@ -91,7 +91,7 @@ class _DHTShortArrayHead {
if (!await _writeHead()) {
// Failed to write head means head got overwritten so write should
// be considered failed
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
onUpdatedHead?.call();
@ -143,7 +143,7 @@ class _DHTShortArrayHead {
try {
out = await closure(this);
break;
} on DHTExceptionTryAgain {
} on DHTExceptionOutdated {
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);

View File

@ -54,8 +54,9 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations {
final out = <Uint8List>[];
(start, length) = _clampStartLen(start, length);
final chunks = Iterable<int>.generate(length).slices(maxDHTConcurrency).map(
(chunk) =>
final chunks = Iterable<int>.generate(length)
.slices(kMaxDHTConcurrency)
.map((chunk) =>
chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh)));
for (final chunk in chunks) {

View File

@ -40,7 +40,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
}
}
if (!success) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
}
@ -66,12 +66,12 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
}
// Write items in parallel
final dws = DelayedWaitSet<void>();
final dws = DelayedWaitSet<void, void>();
for (var i = 0; i < values.length; i++) {
final lookup = lookups[i];
final value = values[i];
final outSeqNum = outSeqNums[i];
dws.add(() async {
dws.add((_) async {
final outValue = await lookup.record.tryWriteBytes(value,
subkey: lookup.recordSubkey, outSeqNum: outSeqNum);
if (outValue != null) {
@ -80,7 +80,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
});
}
await dws(chunkSize: maxDHTConcurrency, onChunkDone: (_) => success);
await dws(chunkSize: kMaxDHTConcurrency, onChunkDone: (_) => success);
} finally {
// Update sequence numbers
for (var i = 0; i < values.length; i++) {
@ -97,7 +97,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
}
}
if (!success) {
throw DHTExceptionTryAgain();
throw DHTExceptionOutdated();
}
}

View File

@ -1,5 +1,5 @@
class DHTExceptionTryAgain implements Exception {
DHTExceptionTryAgain(
class DHTExceptionOutdated implements Exception {
DHTExceptionOutdated(
[this.cause = 'operation failed due to newer dht value']);
String cause;
}
@ -8,3 +8,8 @@ class DHTExceptionInvalidData implements Exception {
DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']);
String cause;
}
class DHTExceptionCancelled implements Exception {
DHTExceptionCancelled([this.cause = 'operation was cancelled']);
String cause;
}

View File

@ -22,7 +22,7 @@ abstract class AsyncTableDBBackedCubit<T> extends Cubit<AsyncValue<T?>>
await super.close();
}
Future<void> _build() async {
Future<void> _build(_) async {
try {
await _mutex.protect(() async {
emit(AsyncValue.data(await load()));
@ -42,6 +42,6 @@ abstract class AsyncTableDBBackedCubit<T> extends Cubit<AsyncValue<T?>>
}
}
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _mutex = Mutex();
}

View File

@ -45,7 +45,7 @@ class PersistentQueue<T extends GeneratedMessage>
}
}
Future<void> _init() async {
Future<void> _init(_) async {
// Start the processor
unawaited(Future.delayed(Duration.zero, () async {
await _initWait();
@ -202,7 +202,7 @@ class PersistentQueue<T extends GeneratedMessage>
final String _key;
final T Function(Uint8List) _fromBuffer;
final bool _deleteOnClose;
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _queueMutex = Mutex();
IList<T> _queue = IList<T>.empty();
final StreamController<Iterable<T>> _syncAddController = StreamController();

View File

@ -134,7 +134,7 @@ class TableDBValue<T> extends TableDBBackedJson<T> {
_tableKeyName = tableKeyName,
_makeInitialValue = makeInitialValue,
_streamController = StreamController<T>.broadcast() {
_initWait.add(() async {
_initWait.add((_) async {
await get();
});
}
@ -172,7 +172,7 @@ class TableDBValue<T> extends TableDBBackedJson<T> {
final T? Function(Object? obj) _valueFromJson;
final Object? Function(T? obj) _valueToJson;
final StreamController<T> _streamController;
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
//////////////////////////////////////////////////////////////
/// AsyncTableDBBacked

View File

@ -45,7 +45,7 @@ class _TableDBArrayBase {
await _initWait();
}
Future<void> _init() async {
Future<void> _init(_) async {
// Load the array details
await _mutex.protect(() async {
_tableDB = await Veilid.instance.openTableDB(_table, 1);
@ -259,10 +259,10 @@ class _TableDBArrayBase {
for (var pos = start; pos < end;) {
var batchLen = min(batchSize, end - pos);
final dws = DelayedWaitSet<Uint8List>();
final dws = DelayedWaitSet<Uint8List, void>();
while (batchLen > 0) {
final entry = await _getIndexEntry(pos);
dws.add(() async => (await _loadEntry(entry))!);
dws.add((_) async => (await _loadEntry(entry))!);
pos++;
batchLen--;
}
@ -613,7 +613,7 @@ class _TableDBArrayBase {
var _open = true;
var _initDone = false;
final VeilidCrypto _crypto;
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _mutex = Mutex();
// Change tracking

View File

@ -46,7 +46,7 @@ class TableDBArrayProtobufCubit<T extends GeneratedMessage>
TableDBArrayProtobufCubit({
required Future<TableDBArrayProtobuf<T>> Function() open,
}) : super(const BlocBusyState(AsyncValue.loading())) {
_initWait.add(() async {
_initWait.add((_) async {
// Open table db array
_array = await open();
_wantsCloseArray = true;
@ -180,7 +180,7 @@ class TableDBArrayProtobufCubit<T extends GeneratedMessage>
return closure(_array);
}
final WaitSet<void> _initWait = WaitSet();
final WaitSet<void, void> _initWait = WaitSet();
late final TableDBArrayProtobuf<T> _array;
StreamSubscription<void>? _subscription;
bool _wantsCloseArray = false;

View File

@ -37,10 +37,10 @@ packages:
dependency: "direct main"
description:
name: async_tools
sha256: f35f5590711f1422c7eff990351e879c5433486f9b5be5df30818521bf6ab8d6
sha256: "375da1e5b51974a9e84469b7630f36d1361fb53d3fcc818a5a0121ab51fe0343"
url: "https://pub.dev"
source: hosted
version: "0.1.3"
version: "0.1.4"
bloc:
dependency: "direct main"
description:
@ -53,10 +53,10 @@ packages:
dependency: "direct main"
description:
name: bloc_advanced_tools
sha256: "11f534f9e89561302de3bd07ab2dfe1c2dacaa8db9794ccdb57c55cfc02dffc6"
sha256: "0599d860eb096c5b12457c60bdf7f66bfcb7171bc94ccf2c7c752b6a716a79f9"
url: "https://pub.dev"
source: hosted
version: "0.1.3"
version: "0.1.4"
boolean_selector:
dependency: transitive
description:
@ -718,7 +718,7 @@ packages:
path: "../../../veilid/veilid-flutter"
relative: true
source: path
version: "0.3.2"
version: "0.3.3"
vm_service:
dependency: transitive
description:

View File

@ -7,9 +7,9 @@ environment:
sdk: '>=3.2.0 <4.0.0'
dependencies:
async_tools: ^0.1.3
async_tools: ^0.1.4
bloc: ^8.1.4
bloc_advanced_tools: ^0.1.3
bloc_advanced_tools: ^0.1.4
charcode: ^1.3.1
collection: ^1.18.0
equatable: ^2.0.5

View File

@ -85,10 +85,10 @@ packages:
dependency: "direct main"
description:
name: async_tools
sha256: f35f5590711f1422c7eff990351e879c5433486f9b5be5df30818521bf6ab8d6
sha256: "375da1e5b51974a9e84469b7630f36d1361fb53d3fcc818a5a0121ab51fe0343"
url: "https://pub.dev"
source: hosted
version: "0.1.3"
version: "0.1.4"
awesome_extensions:
dependency: "direct main"
description:
@ -141,10 +141,10 @@ packages:
dependency: "direct main"
description:
name: bloc_advanced_tools
sha256: "11f534f9e89561302de3bd07ab2dfe1c2dacaa8db9794ccdb57c55cfc02dffc6"
sha256: "0599d860eb096c5b12457c60bdf7f66bfcb7171bc94ccf2c7c752b6a716a79f9"
url: "https://pub.dev"
source: hosted
version: "0.1.3"
version: "0.1.4"
blurry_modal_progress_hud:
dependency: "direct main"
description:

View File

@ -14,12 +14,12 @@ dependencies:
animated_theme_switcher: ^2.0.10
ansicolor: ^2.0.2
archive: ^3.6.1
async_tools: ^0.1.3
async_tools: ^0.1.4
awesome_extensions: ^2.0.16
badges: ^3.1.2
basic_utils: ^5.7.0
bloc: ^8.1.4
bloc_advanced_tools: ^0.1.3
bloc_advanced_tools: ^0.1.4
blurry_modal_progress_hud: ^1.1.1
change_case: ^2.1.0
charcode: ^1.3.1