diff --git a/lib/account_manager/cubits/per_account_collection_cubit.dart b/lib/account_manager/cubits/per_account_collection_cubit.dart index 6cb8d5d..5f208fb 100644 --- a/lib/account_manager/cubits/per_account_collection_cubit.dart +++ b/lib/account_manager/cubits/per_account_collection_cubit.dart @@ -44,7 +44,7 @@ class PerAccountCollectionCubit extends Cubit { await super.close(); } - Future _init() async { + Future _init(Completer _cancel) async { // subscribe to accountInfo changes _processor.follow(accountInfoCubit.stream, accountInfoCubit.state, _followAccountInfoState); @@ -235,7 +235,7 @@ class PerAccountCollectionCubit extends Cubit { final Locator _locator; final _processor = SingleStateProcessor(); - final _initWait = WaitSet(); + final _initWait = WaitSet(); // Per-account cubits regardless of login state final AccountInfoCubit accountInfoCubit; diff --git a/lib/chat/cubits/chat_component_cubit.dart b/lib/chat/cubits/chat_component_cubit.dart index 12ed135..2f2a2d0 100644 --- a/lib/chat/cubits/chat_component_cubit.dart +++ b/lib/chat/cubits/chat_component_cubit.dart @@ -68,7 +68,7 @@ class ChatComponentCubit extends Cubit { messagesCubit: messagesCubit, ); - Future _init() async { + Future _init(Completer _cancel) async { // Get local user info and account record cubit _localUserIdentityKey = _accountInfo.identityTypedPublicKey; @@ -420,7 +420,7 @@ class ChatComponentCubit extends Cubit { //////////////////////////////////////////////////////////////////////////// - final _initWait = WaitSet(); + final _initWait = WaitSet(); final AccountInfo _accountInfo; final AccountRecordCubit _accountRecordCubit; final ContactListCubit _contactListCubit; diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index ce4368a..97e474f 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -95,7 +95,7 @@ class SingleContactMessagesCubit extends Cubit { } // Initialize everything - Future _init() async { + Future _init(Completer _cancel) async { _unsentMessagesQueue = PersistentQueue( table: 'SingleContactUnsentMessages', key: _remoteConversationRecordKey.toString(), @@ -445,7 +445,7 @@ class SingleContactMessagesCubit extends Cubit { ///////////////////////////////////////////////////////////////////////// - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); late final AccountInfo _accountInfo; final TypedKey _remoteIdentityPublicKey; final TypedKey _localConversationRecordKey; diff --git a/lib/contacts/cubits/contact_list_cubit.dart b/lib/contacts/cubits/contact_list_cubit.dart index aaecca4..76079b3 100644 --- a/lib/contacts/cubits/contact_list_cubit.dart +++ b/lib/contacts/cubits/contact_list_cubit.dart @@ -71,7 +71,7 @@ class ContactListCubit extends DHTShortArrayCubit { final updated = await writer.tryWriteItemProtobuf( proto.Contact.fromBuffer, pos, newContact); if (!updated) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } break; } diff --git a/lib/conversation/cubits/conversation_cubit.dart b/lib/conversation/cubits/conversation_cubit.dart index 1947504..11ba12f 100644 --- a/lib/conversation/cubits/conversation_cubit.dart +++ b/lib/conversation/cubits/conversation_cubit.dart @@ -46,12 +46,13 @@ class ConversationCubit extends Cubit> { _identityWriter = _accountInfo.identityWriter; if (_localConversationRecordKey != null) { - _initWait.add(() async { + _initWait.add((_) async { await _setLocalConversation(() async { // Open local record key if it is specified final pool = DHTRecordPool.instance; final crypto = await _cachedConversationCrypto(); final writer = _identityWriter; + final record = await pool.openRecordWrite( _localConversationRecordKey!, writer, debugName: 'ConversationCubit::LocalConversation', @@ -64,16 +65,18 @@ class ConversationCubit extends Cubit> { } if (_remoteConversationRecordKey != null) { - _initWait.add(() async { + _initWait.add((cancel) async { await _setRemoteConversation(() async { // Open remote record key if it is specified final pool = DHTRecordPool.instance; final crypto = await _cachedConversationCrypto(); + final record = await pool.openRecordRead(_remoteConversationRecordKey, debugName: 'ConversationCubit::RemoteConversation', parent: pool.getParentRecordKey(_remoteConversationRecordKey) ?? accountInfo.accountRecordKey, crypto: crypto); + return record; }); }); @@ -326,5 +329,5 @@ class ConversationCubit extends Cubit> { ConversationState _incrementalState = const ConversationState( localConversation: null, remoteConversation: null); VeilidCrypto? _conversationCrypto; - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); } diff --git a/lib/tools/exceptions.dart b/lib/tools/exceptions.dart new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/lib/tools/exceptions.dart @@ -0,0 +1 @@ + diff --git a/lib/tools/tools.dart b/lib/tools/tools.dart index 2963df0..470b648 100644 --- a/lib/tools/tools.dart +++ b/lib/tools/tools.dart @@ -1,4 +1,5 @@ export 'animations.dart'; +export 'exceptions.dart'; export 'loggy.dart'; export 'misc.dart'; export 'package_info.dart'; diff --git a/lib/veilid_processor/views/developer.dart b/lib/veilid_processor/views/developer.dart index bb0b256..0078935 100644 --- a/lib/veilid_processor/views/developer.dart +++ b/lib/veilid_processor/views/developer.dart @@ -18,6 +18,7 @@ import 'package:xterm/xterm.dart'; import '../../layout/layout.dart'; import '../../theme/theme.dart'; import '../../tools/tools.dart'; +import 'history_text_editing_controller.dart'; final globalDebugTerminal = Terminal( maxLines: 50000, @@ -36,17 +37,12 @@ class DeveloperPage extends StatefulWidget { } class _DeveloperPageState extends State { - final _terminalController = TerminalController(); - final _debugCommandController = TextEditingController(); - final _logLevelController = DropdownController(duration: 250.ms); - final List> _logLevelDropdownItems = []; - var _logLevelDropDown = log.level.logLevel; - var _showEllet = false; - @override void initState() { super.initState(); + _historyController = HistoryTextEditingController(setState: setState); + _terminalController.addListener(() { setState(() {}); }); @@ -66,42 +62,71 @@ class _DeveloperPageState extends State { globalDebugTerminal.write(colorOut.replaceAll('\n', '\r\n')); } - Future _sendDebugCommand(String debugCommand) async { - if (debugCommand == 'pool allocations') { - DHTRecordPool.instance.debugPrintAllocations(); - return; - } - - if (debugCommand == 'pool opened') { - DHTRecordPool.instance.debugPrintOpened(); - return; - } - - if (debugCommand.startsWith('change_log_ignore ')) { - final args = debugCommand.split(' '); - if (args.length < 3) { - _debugOut('Incorrect number of arguments'); - return; - } - final layer = args[1]; - final changes = args[2].split(','); - Veilid.instance.changeLogIgnore(layer, changes); - return; - } - - if (debugCommand == 'ellet') { - setState(() { - _showEllet = !_showEllet; - }); - return; - } - - _debugOut('DEBUG >>>\n$debugCommand\n'); + Future _sendDebugCommand(String debugCommand) async { try { - final out = await Veilid.instance.debug(debugCommand); - _debugOut('<<< DEBUG\n$out\n'); - } on Exception catch (e, st) { - _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); + setState(() { + _busy = true; + }); + + if (debugCommand == 'pool allocations') { + try { + DHTRecordPool.instance.debugPrintAllocations(); + } on Exception catch (e, st) { + _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); + return false; + } + return true; + } + + if (debugCommand == 'pool opened') { + try { + DHTRecordPool.instance.debugPrintOpened(); + } on Exception catch (e, st) { + _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); + return false; + } + return true; + } + + if (debugCommand.startsWith('change_log_ignore ')) { + final args = debugCommand.split(' '); + if (args.length < 3) { + _debugOut('Incorrect number of arguments'); + return false; + } + final layer = args[1]; + final changes = args[2].split(','); + try { + Veilid.instance.changeLogIgnore(layer, changes); + } on Exception catch (e, st) { + _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); + return false; + } + + return true; + } + + if (debugCommand == 'ellet') { + setState(() { + _showEllet = !_showEllet; + }); + return true; + } + + _debugOut('DEBUG >>>\n$debugCommand\n'); + try { + final out = await Veilid.instance.debug(debugCommand); + _debugOut('<<< DEBUG\n$out\n'); + } on Exception catch (e, st) { + _debugOut('<<< ERROR\n$e\n<<< STACK\n$st'); + return false; + } + + return true; + } finally { + setState(() { + _busy = false; + }); } } @@ -284,7 +309,9 @@ class _DeveloperPageState extends State { }) ]).expanded(), TextField( - controller: _debugCommandController, + enabled: !_busy, + controller: _historyController.controller, + focusNode: _historyController.focusNode, onTapOutside: (event) { FocusManager.instance.primaryFocus?.unfocus(); }, @@ -303,28 +330,54 @@ class _DeveloperPageState extends State { hintText: translate('developer.command'), suffixIcon: IconButton( icon: Icon(Icons.send, - color: _debugCommandController.text.isEmpty + color: _historyController.controller.text.isEmpty ? scale.primaryScale.primary.withAlpha(0x3F) : scale.primaryScale.primary), - onPressed: _debugCommandController.text.isEmpty - ? null - : () async { - final debugCommand = _debugCommandController.text; - _debugCommandController.clear(); - await _sendDebugCommand(debugCommand); - }, + onPressed: + (_historyController.controller.text.isEmpty || _busy) + ? null + : () async { + final debugCommand = + _historyController.controller.text; + _historyController.controller.clear(); + await _sendDebugCommand(debugCommand); + }, )), onChanged: (_) { setState(() => {}); }, + onEditingComplete: () { + // part of the default action if onEditingComplete is null + _historyController.controller.clearComposing(); + // don't give up focus though + }, onSubmitted: (debugCommand) async { - _debugCommandController.clear(); - await _sendDebugCommand(debugCommand); + if (debugCommand.isEmpty) { + return; + } + + final ok = await _sendDebugCommand(debugCommand); + if (ok) { + setState(() { + _historyController.submit(debugCommand); + }); + } }, ).paddingAll(4) ])))); } + //////////////////////////////////////////////////////////////////////////// + + final _terminalController = TerminalController(); + late final HistoryTextEditingController _historyController; + + final _logLevelController = DropdownController(duration: 250.ms); + final List> _logLevelDropdownItems = []; + var _logLevelDropDown = log.level.logLevel; + var _showEllet = false; + var _busy = false; + @override void debugFillProperties(DiagnosticPropertiesBuilder properties) { super.debugFillProperties(properties); diff --git a/lib/veilid_processor/views/history_text_editing_controller.dart b/lib/veilid_processor/views/history_text_editing_controller.dart new file mode 100644 index 0000000..f9646e5 --- /dev/null +++ b/lib/veilid_processor/views/history_text_editing_controller.dart @@ -0,0 +1,69 @@ +import 'package:flutter/material.dart'; +import 'package:flutter/services.dart'; + +/// TextField History Controller +class HistoryTextEditingController { + HistoryTextEditingController( + {required this.setState, TextEditingController? controller}) + : _controller = controller ?? TextEditingController() { + _historyFocusNode = FocusNode(onKeyEvent: (_node, event) { + if (event.runtimeType == KeyDownEvent && + event.logicalKey == LogicalKeyboardKey.arrowUp) { + if (_historyPosition > 0) { + if (_historyPosition == _history.length) { + _historyCurrentEdit = _controller.text; + } + _historyPosition -= 1; + setState(() { + _controller.text = _history[_historyPosition]; + }); + } + return KeyEventResult.handled; + } else if (event.runtimeType == KeyDownEvent && + event.logicalKey == LogicalKeyboardKey.arrowDown) { + if (_historyPosition < _history.length) { + _historyPosition += 1; + setState(() { + if (_historyPosition == _history.length) { + _controller.text = _historyCurrentEdit; + } else { + _controller.text = _history[_historyPosition]; + } + }); + } + return KeyEventResult.handled; + } else if (event.runtimeType == KeyDownEvent) { + _historyPosition = _history.length; + _historyCurrentEdit = _controller.text; + } + return KeyEventResult.ignored; + }); + } + + void submit(String v) { + // add to history + if (_history.isEmpty || _history.last != v) { + _history.add(v); + if (_history.length > 100) { + _history.removeAt(0); + } + } + _historyPosition = _history.length; + setState(() { + _controller.text = ''; + }); + } + + FocusNode get focusNode => _historyFocusNode; + TextEditingController get controller => _controller; + + //////////////////////////////////////////////////////////////////////////// + + late void Function(void Function()) setState; + final TextEditingController _controller; + late final FocusNode _historyFocusNode; + + final List _history = []; + int _historyPosition = 0; + String _historyCurrentEdit = ''; +} diff --git a/packages/veilid_support/example/pubspec.lock b/packages/veilid_support/example/pubspec.lock index 304670e..2c2b1ad 100644 --- a/packages/veilid_support/example/pubspec.lock +++ b/packages/veilid_support/example/pubspec.lock @@ -37,10 +37,10 @@ packages: dependency: "direct dev" description: name: async_tools - sha256: f35f5590711f1422c7eff990351e879c5433486f9b5be5df30818521bf6ab8d6 + sha256: "375da1e5b51974a9e84469b7630f36d1361fb53d3fcc818a5a0121ab51fe0343" url: "https://pub.dev" source: hosted - version: "0.1.3" + version: "0.1.4" bloc: dependency: transitive description: @@ -53,10 +53,10 @@ packages: dependency: transitive description: name: bloc_advanced_tools - sha256: "11f534f9e89561302de3bd07ab2dfe1c2dacaa8db9794ccdb57c55cfc02dffc6" + sha256: "0599d860eb096c5b12457c60bdf7f66bfcb7171bc94ccf2c7c752b6a716a79f9" url: "https://pub.dev" source: hosted - version: "0.1.3" + version: "0.1.4" boolean_selector: dependency: transitive description: @@ -650,7 +650,7 @@ packages: path: "../../../../veilid/veilid-flutter" relative: true source: path - version: "0.3.2" + version: "0.3.3" veilid_support: dependency: "direct main" description: diff --git a/packages/veilid_support/example/pubspec.yaml b/packages/veilid_support/example/pubspec.yaml index 8f76235..a885f94 100644 --- a/packages/veilid_support/example/pubspec.yaml +++ b/packages/veilid_support/example/pubspec.yaml @@ -14,7 +14,7 @@ dependencies: path: ../ dev_dependencies: - async_tools: ^0.1.3 + async_tools: ^0.1.4 integration_test: sdk: flutter lint_hard: ^4.0.0 diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart index 570474f..8fd565b 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_cubit.dart @@ -43,13 +43,26 @@ class DHTLogCubit extends Cubit> required T Function(List data) decodeElement, }) : _decodeElement = decodeElement, super(const BlocBusyState(AsyncValue.loading())) { - _initWait.add(() async { - // Open DHT record - _log = await open(); - _wantsCloseRecord = true; - + _initWait.add((cancel) async { + try { + // Do record open/create + while (!cancel.isCompleted) { + try { + // Open DHT record + _log = await open(); + _wantsCloseRecord = true; + break; + } on VeilidAPIExceptionTryAgain { + // Wait for a bit + await asyncSleep(); + } + } + } on Exception catch (e, st) { + emit(DHTLogBusyState(AsyncValue.error(e, st))); + return; + } // Make initial state update - await _refreshNoWait(); + _initialUpdate(); _subscription = await _log.listen(_update); }); } @@ -156,7 +169,7 @@ class DHTLogCubit extends Cubit> // Run at most one background update process // Because this is async, we could get an update while we're // still processing the last one. Only called after init future has run - // so we dont have to wait for that here. + // or during it, so we dont have to wait for that here. // Accumulate head and tail deltas _headDelta += upd.headDelta; @@ -188,9 +201,15 @@ class DHTLogCubit extends Cubit> }); } + void _initialUpdate() { + _sspUpdate.busyUpdate>(busy, (emit) async { + await _refreshInner(emit); + }); + } + @override Future close() async { - await _initWait(); + await _initWait(cancelValue: true); await _subscription?.cancel(); _subscription = null; if (_wantsCloseRecord) { @@ -217,7 +236,7 @@ class DHTLogCubit extends Cubit> return _log.operateAppendEventual(closure, timeout: timeout); } - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); late final DHTLog _log; final T Function(List data) _decodeElement; StreamSubscription? _subscription; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart index d7b3541..90b8428 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -45,8 +45,9 @@ class _DHTLogRead implements DHTLogReadOperations { final out = []; (start, length) = _clampStartLen(start, length); - final chunks = Iterable.generate(length).slices(maxDHTConcurrency).map( - (chunk) => + final chunks = Iterable.generate(length) + .slices(kMaxDHTConcurrency) + .map((chunk) => chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh))); for (final chunk in chunks) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index ca0074f..070c494 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -147,7 +147,7 @@ class _DHTLogSpine { if (!await writeSpineHead(old: (oldHead, oldTail))) { // Failed to write head means head got overwritten so write should // be considered failed - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } return out; } on Exception { @@ -187,7 +187,7 @@ class _DHTLogSpine { try { out = await closure(this); break; - } on DHTExceptionTryAgain { + } on DHTExceptionOutdated { // Failed to write in closure resets state _head = oldHead; _tail = oldTail; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart index ca47e00..7f4d9ce 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_write.dart @@ -26,10 +26,10 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { final success = await write.tryWriteItem(lookup.pos, newValue, output: output); if (!success) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } })); - } on DHTExceptionTryAgain { + } on DHTExceptionOutdated { return false; } return true; @@ -71,14 +71,14 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { final success = await bWrite .tryWriteItem(bLookup.pos, aItem, output: bItem); if (!success) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } }); } final success = await aWrite.tryWriteItem(aLookup.pos, bItem.value!); if (!success) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } }))); } @@ -114,7 +114,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { _spine.allocateTail(values.length); // Look up the first position and shortarray - final dws = DelayedWaitSet(); + final dws = DelayedWaitSet(); var success = true; for (var valueIdx = 0; valueIdx < values.length;) { @@ -128,7 +128,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos); final sublistValues = values.sublist(valueIdx, valueIdx + sacount); - dws.add(() async { + dws.add((_) async { try { await lookup.scope((sa) async => sa.operateWrite((write) async { // If this a new segment, then clear it in @@ -141,7 +141,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { } return write.addAll(sublistValues); })); - } on DHTExceptionTryAgain { + } on DHTExceptionOutdated { success = false; } }); @@ -152,7 +152,7 @@ class _DHTLogWrite extends _DHTLogRead implements DHTLogWriteOperations { await dws(); if (!success) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart index 54d1dec..da75e06 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart @@ -12,7 +12,7 @@ typedef StateFunction = Future Function( DHTRecord, List, Uint8List?); typedef WatchFunction = Future Function(DHTRecord); -class DHTRecordCubit extends Cubit> { +abstract class DHTRecordCubit extends Cubit> { DHTRecordCubit({ required Future Function() open, required InitialStateFunction initialStateFunction, @@ -21,10 +21,24 @@ class DHTRecordCubit extends Cubit> { }) : _wantsCloseRecord = false, _stateFunction = stateFunction, super(const AsyncValue.loading()) { - initWait.add(() async { - // Do record open/create - _record = await open(); - _wantsCloseRecord = true; + initWait.add((cancel) async { + try { + // Do record open/create + while (!cancel.isCompleted) { + try { + _record = await open(); + _wantsCloseRecord = true; + break; + } on VeilidAPIExceptionKeyNotFound { + } on VeilidAPIExceptionTryAgain { + // Wait for a bit + await asyncSleep(); + } + } + } on Exception catch (e, st) { + emit(AsyncValue.error(e, st)); + return; + } await _init(initialStateFunction, stateFunction, watchFunction); }); } @@ -60,7 +74,7 @@ class DHTRecordCubit extends Cubit> { @override Future close() async { - await initWait(); + await initWait(cancelValue: true); await _record.cancelWatch(); await _subscription?.cancel(); _subscription = null; @@ -98,7 +112,7 @@ class DHTRecordCubit extends Cubit> { DHTRecord get record => _record; @protected - final WaitSet initWait = WaitSet(); + final WaitSet initWait = WaitSet(); StreamSubscription? _subscription; late DHTRecord _record; diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index b80db1f..5aa7915 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -18,8 +18,11 @@ part 'dht_record_pool.g.dart'; part 'dht_record.dart'; part 'dht_record_pool_private.dart'; -// Maximum number of concurrent DHT operations to perform on the network -const int maxDHTConcurrency = 8; +/// Maximum number of concurrent DHT operations to perform on the network +const int kMaxDHTConcurrency = 8; + +/// Number of times to retry a 'key not found' +const int kDHTKeyNotFoundRetry = 3; typedef DHTRecordPoolLogger = void Function(String message); @@ -60,6 +63,7 @@ class DHTRecordPool with TableDBBackedJson { DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext) : _state = const DHTRecordPoolAllocations(), _mutex = Mutex(), + _recordTagLock = AsyncTagLock(), _opened = {}, _markedForDelete = {}, _routingContext = routingContext, @@ -132,24 +136,17 @@ class DHTRecordPool with TableDBBackedJson { TypedKey? parent, int defaultSubkey = 0, VeilidCrypto? crypto}) async => - _mutex.protect(() async { + _recordTagLock.protect(recordKey, closure: () async { final dhtctx = routingContext ?? _routingContext; - final openedRecordInfo = await _recordOpenInner( + final rec = await _recordOpenCommon( debugName: debugName, dhtctx: dhtctx, recordKey: recordKey, - parent: parent); - - final rec = DHTRecord._( - debugName: debugName, - routingContext: dhtctx, - defaultSubkey: defaultSubkey, - sharedDHTRecordData: openedRecordInfo.shared, + crypto: crypto ?? const VeilidCryptoPublic(), writer: null, - crypto: crypto ?? const VeilidCryptoPublic()); - - openedRecordInfo.records.add(rec); + parent: parent, + defaultSubkey: defaultSubkey); return rec; }); @@ -164,27 +161,20 @@ class DHTRecordPool with TableDBBackedJson { int defaultSubkey = 0, VeilidCrypto? crypto, }) async => - _mutex.protect(() async { + _recordTagLock.protect(recordKey, closure: () async { final dhtctx = routingContext ?? _routingContext; - final openedRecordInfo = await _recordOpenInner( - debugName: debugName, - dhtctx: dhtctx, - recordKey: recordKey, - parent: parent, - writer: writer); - - final rec = DHTRecord._( - debugName: debugName, - routingContext: dhtctx, - defaultSubkey: defaultSubkey, - writer: writer, - sharedDHTRecordData: openedRecordInfo.shared, - crypto: crypto ?? - await privateCryptoFromTypedSecret( - TypedKey(kind: recordKey.kind, value: writer.secret))); - - openedRecordInfo.records.add(rec); + final rec = await _recordOpenCommon( + debugName: debugName, + dhtctx: dhtctx, + recordKey: recordKey, + crypto: crypto ?? + await privateCryptoFromTypedSecret( + TypedKey(kind: recordKey.kind, value: writer.secret)), + writer: writer, + parent: parent, + defaultSubkey: defaultSubkey, + ); return rec; }); @@ -381,41 +371,72 @@ class DHTRecordPool with TableDBBackedJson { return openedRecordInfo; } - Future<_OpenedRecordInfo> _recordOpenInner( + Future _recordOpenCommon( {required String debugName, required VeilidRoutingContext dhtctx, required TypedKey recordKey, - KeyPair? writer, - TypedKey? parent}) async { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } + required VeilidCrypto crypto, + required KeyPair? writer, + required TypedKey? parent, + required int defaultSubkey}) async { log('openDHTRecord: debugName=$debugName key=$recordKey'); - // If we are opening a key that already exists - // make sure we are using the same parent if one was specified - _validateParentInner(parent, recordKey); - // See if this has been opened yet - final openedRecordInfo = _opened[recordKey]; + final openedRecordInfo = await _mutex.protect(() async { + // If we are opening a key that already exists + // make sure we are using the same parent if one was specified + _validateParentInner(parent, recordKey); + + return _opened[recordKey]; + }); + if (openedRecordInfo == null) { // Fresh open, just open the record - final recordDescriptor = - await dhtctx.openDHTRecord(recordKey, writer: writer); + var retry = kDHTKeyNotFoundRetry; + late final DHTRecordDescriptor recordDescriptor; + while (true) { + try { + recordDescriptor = + await dhtctx.openDHTRecord(recordKey, writer: writer); + break; + } on VeilidAPIExceptionKeyNotFound { + await asyncSleep(); + retry--; + if (retry == 0) { + rethrow; + } + } + } + final newOpenedRecordInfo = _OpenedRecordInfo( recordDescriptor: recordDescriptor, defaultWriter: writer, defaultRoutingContext: dhtctx); - _opened[recordDescriptor.key] = newOpenedRecordInfo; - // Register the dependency - await _addDependencyInner( - parent, - recordKey, - debugName: debugName, - ); + final rec = DHTRecord._( + debugName: debugName, + routingContext: dhtctx, + defaultSubkey: defaultSubkey, + sharedDHTRecordData: newOpenedRecordInfo.shared, + writer: writer, + crypto: crypto); - return newOpenedRecordInfo; + await _mutex.protect(() async { + // Register the opened record + _opened[recordDescriptor.key] = newOpenedRecordInfo; + + // Register the dependency + await _addDependencyInner( + parent, + recordKey, + debugName: debugName, + ); + + // Register the newly opened record + newOpenedRecordInfo.records.add(rec); + }); + + return rec; } // Already opened @@ -430,37 +451,50 @@ class DHTRecordPool with TableDBBackedJson { openedRecordInfo.shared.defaultRoutingContext = dhtctx; } - // Register the dependency - await _addDependencyInner( - parent, - recordKey, - debugName: debugName, - ); + final rec = DHTRecord._( + debugName: debugName, + routingContext: dhtctx, + defaultSubkey: defaultSubkey, + sharedDHTRecordData: openedRecordInfo.shared, + writer: writer, + crypto: crypto); - return openedRecordInfo; + await _mutex.protect(() async { + // Register the dependency + await _addDependencyInner( + parent, + recordKey, + debugName: debugName, + ); + + openedRecordInfo.records.add(rec); + }); + + return rec; } // Called when a DHTRecord is closed // Cleans up the opened record housekeeping and processes any late deletions Future _recordClosed(DHTRecord record) async { - await _mutex.protect(() async { - final key = record.key; + await _recordTagLock.protect(record.key, + closure: () => _mutex.protect(() async { + final key = record.key; - log('closeDHTRecord: debugName=${record.debugName} key=$key'); + log('closeDHTRecord: debugName=${record.debugName} key=$key'); - final openedRecordInfo = _opened[key]; - if (openedRecordInfo == null || - !openedRecordInfo.records.remove(record)) { - throw StateError('record already closed'); - } - if (openedRecordInfo.records.isEmpty) { - await _watchStateProcessors.remove(key); - await _routingContext.closeDHTRecord(key); - _opened.remove(key); + final openedRecordInfo = _opened[key]; + if (openedRecordInfo == null || + !openedRecordInfo.records.remove(record)) { + throw StateError('record already closed'); + } + if (openedRecordInfo.records.isEmpty) { + await _watchStateProcessors.remove(key); + await _routingContext.closeDHTRecord(key); + _opened.remove(key); - await _checkForLateDeletesInner(key); - } - }); + await _checkForLateDeletesInner(key); + } + })); } // Check to see if this key can finally be deleted @@ -916,6 +950,8 @@ class DHTRecordPool with TableDBBackedJson { DHTRecordPoolAllocations _state; // Create/open Mutex final Mutex _mutex; + // Record key tag lock + final AsyncTagLock _recordTagLock; // Which DHT records are currently open final Map _opened; // Which DHT records are marked for deletion diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart index d9e1e57..97bf1d9 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart @@ -30,13 +30,29 @@ class DHTShortArrayCubit extends Cubit> required T Function(List data) decodeElement, }) : _decodeElement = decodeElement, super(const BlocBusyState(AsyncValue.loading())) { - _initWait.add(() async { - // Open DHT record - _shortArray = await open(); - _wantsCloseRecord = true; + _initWait.add((cancel) async { + try { + // Do record open/create + while (!cancel.isCompleted) { + try { + // Open DHT record + _shortArray = await open(); + _wantsCloseRecord = true; + break; + } on VeilidAPIExceptionTryAgain { + // Wait for a bit + await asyncSleep(); + } + } + } on Exception catch (e, st) { + emit(DHTShortArrayBusyState(AsyncValue.error(e, st))); + return; + } - // Make initial state update - await _refreshNoWait(); + // Kick off initial update + _update(); + + // Subscribe to changes _subscription = await _shortArray.listen(_update); }); } @@ -82,7 +98,8 @@ class DHTShortArrayCubit extends Cubit> void _update() { // Run at most one background update process // Because this is async, we could get an update while we're - // still processing the last one. Only called after init future has run + // still processing the last one. + // Only called after init future has run, or during it // so we dont have to wait for that here. _sspUpdate.busyUpdate>( busy, (emit) async => _refreshInner(emit)); @@ -90,7 +107,7 @@ class DHTShortArrayCubit extends Cubit> @override Future close() async { - await _initWait(); + await _initWait(cancelValue: true); await _subscription?.cancel(); _subscription = null; if (_wantsCloseRecord) { @@ -118,7 +135,7 @@ class DHTShortArrayCubit extends Cubit> return _shortArray.operateWriteEventual(closure, timeout: timeout); } - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); late final DHTShortArray _shortArray; final T Function(List data) _decodeElement; StreamSubscription? _subscription; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 45c4e71..d2aa84a 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -91,7 +91,7 @@ class _DHTShortArrayHead { if (!await _writeHead()) { // Failed to write head means head got overwritten so write should // be considered failed - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } onUpdatedHead?.call(); @@ -143,7 +143,7 @@ class _DHTShortArrayHead { try { out = await closure(this); break; - } on DHTExceptionTryAgain { + } on DHTExceptionOutdated { // Failed to write in closure resets state _linkedRecords = List.of(oldLinkedRecords); _index = List.of(oldIndex); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index abe7198..94d58b8 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -54,8 +54,9 @@ class _DHTShortArrayRead implements DHTShortArrayReadOperations { final out = []; (start, length) = _clampStartLen(start, length); - final chunks = Iterable.generate(length).slices(maxDHTConcurrency).map( - (chunk) => + final chunks = Iterable.generate(length) + .slices(kMaxDHTConcurrency) + .map((chunk) => chunk.map((pos) => get(pos + start, forceRefresh: forceRefresh))); for (final chunk in chunks) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index 665ea00..c52a7b2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -40,7 +40,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } } if (!success) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } } @@ -66,12 +66,12 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } // Write items in parallel - final dws = DelayedWaitSet(); + final dws = DelayedWaitSet(); for (var i = 0; i < values.length; i++) { final lookup = lookups[i]; final value = values[i]; final outSeqNum = outSeqNums[i]; - dws.add(() async { + dws.add((_) async { final outValue = await lookup.record.tryWriteBytes(value, subkey: lookup.recordSubkey, outSeqNum: outSeqNum); if (outValue != null) { @@ -80,7 +80,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead }); } - await dws(chunkSize: maxDHTConcurrency, onChunkDone: (_) => success); + await dws(chunkSize: kMaxDHTConcurrency, onChunkDone: (_) => success); } finally { // Update sequence numbers for (var i = 0; i < values.length; i++) { @@ -97,7 +97,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead } } if (!success) { - throw DHTExceptionTryAgain(); + throw DHTExceptionOutdated(); } } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart index 529c308..bced3dd 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/exceptions.dart @@ -1,5 +1,5 @@ -class DHTExceptionTryAgain implements Exception { - DHTExceptionTryAgain( +class DHTExceptionOutdated implements Exception { + DHTExceptionOutdated( [this.cause = 'operation failed due to newer dht value']); String cause; } @@ -8,3 +8,8 @@ class DHTExceptionInvalidData implements Exception { DHTExceptionInvalidData([this.cause = 'dht data structure is corrupt']); String cause; } + +class DHTExceptionCancelled implements Exception { + DHTExceptionCancelled([this.cause = 'operation was cancelled']); + String cause; +} diff --git a/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart b/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart index 710eec4..d637ee1 100644 --- a/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart +++ b/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart @@ -22,7 +22,7 @@ abstract class AsyncTableDBBackedCubit extends Cubit> await super.close(); } - Future _build() async { + Future _build(_) async { try { await _mutex.protect(() async { emit(AsyncValue.data(await load())); @@ -42,6 +42,6 @@ abstract class AsyncTableDBBackedCubit extends Cubit> } } - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); final Mutex _mutex = Mutex(); } diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart index 598d8a7..e59a470 100644 --- a/packages/veilid_support/lib/src/persistent_queue.dart +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -45,7 +45,7 @@ class PersistentQueue } } - Future _init() async { + Future _init(_) async { // Start the processor unawaited(Future.delayed(Duration.zero, () async { await _initWait(); @@ -202,7 +202,7 @@ class PersistentQueue final String _key; final T Function(Uint8List) _fromBuffer; final bool _deleteOnClose; - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); final Mutex _queueMutex = Mutex(); IList _queue = IList.empty(); final StreamController> _syncAddController = StreamController(); diff --git a/packages/veilid_support/lib/src/table_db.dart b/packages/veilid_support/lib/src/table_db.dart index 773309b..522a837 100644 --- a/packages/veilid_support/lib/src/table_db.dart +++ b/packages/veilid_support/lib/src/table_db.dart @@ -134,7 +134,7 @@ class TableDBValue extends TableDBBackedJson { _tableKeyName = tableKeyName, _makeInitialValue = makeInitialValue, _streamController = StreamController.broadcast() { - _initWait.add(() async { + _initWait.add((_) async { await get(); }); } @@ -172,7 +172,7 @@ class TableDBValue extends TableDBBackedJson { final T? Function(Object? obj) _valueFromJson; final Object? Function(T? obj) _valueToJson; final StreamController _streamController; - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); ////////////////////////////////////////////////////////////// /// AsyncTableDBBacked diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index ad4c586..7c3d4d4 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -45,7 +45,7 @@ class _TableDBArrayBase { await _initWait(); } - Future _init() async { + Future _init(_) async { // Load the array details await _mutex.protect(() async { _tableDB = await Veilid.instance.openTableDB(_table, 1); @@ -259,10 +259,10 @@ class _TableDBArrayBase { for (var pos = start; pos < end;) { var batchLen = min(batchSize, end - pos); - final dws = DelayedWaitSet(); + final dws = DelayedWaitSet(); while (batchLen > 0) { final entry = await _getIndexEntry(pos); - dws.add(() async => (await _loadEntry(entry))!); + dws.add((_) async => (await _loadEntry(entry))!); pos++; batchLen--; } @@ -613,7 +613,7 @@ class _TableDBArrayBase { var _open = true; var _initDone = false; final VeilidCrypto _crypto; - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); final Mutex _mutex = Mutex(); // Change tracking diff --git a/packages/veilid_support/lib/src/table_db_array_protobuf_cubit.dart b/packages/veilid_support/lib/src/table_db_array_protobuf_cubit.dart index 606ded5..89408ac 100644 --- a/packages/veilid_support/lib/src/table_db_array_protobuf_cubit.dart +++ b/packages/veilid_support/lib/src/table_db_array_protobuf_cubit.dart @@ -46,7 +46,7 @@ class TableDBArrayProtobufCubit TableDBArrayProtobufCubit({ required Future> Function() open, }) : super(const BlocBusyState(AsyncValue.loading())) { - _initWait.add(() async { + _initWait.add((_) async { // Open table db array _array = await open(); _wantsCloseArray = true; @@ -180,7 +180,7 @@ class TableDBArrayProtobufCubit return closure(_array); } - final WaitSet _initWait = WaitSet(); + final WaitSet _initWait = WaitSet(); late final TableDBArrayProtobuf _array; StreamSubscription? _subscription; bool _wantsCloseArray = false; diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index 2f71a50..7f08359 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -37,10 +37,10 @@ packages: dependency: "direct main" description: name: async_tools - sha256: f35f5590711f1422c7eff990351e879c5433486f9b5be5df30818521bf6ab8d6 + sha256: "375da1e5b51974a9e84469b7630f36d1361fb53d3fcc818a5a0121ab51fe0343" url: "https://pub.dev" source: hosted - version: "0.1.3" + version: "0.1.4" bloc: dependency: "direct main" description: @@ -53,10 +53,10 @@ packages: dependency: "direct main" description: name: bloc_advanced_tools - sha256: "11f534f9e89561302de3bd07ab2dfe1c2dacaa8db9794ccdb57c55cfc02dffc6" + sha256: "0599d860eb096c5b12457c60bdf7f66bfcb7171bc94ccf2c7c752b6a716a79f9" url: "https://pub.dev" source: hosted - version: "0.1.3" + version: "0.1.4" boolean_selector: dependency: transitive description: @@ -718,7 +718,7 @@ packages: path: "../../../veilid/veilid-flutter" relative: true source: path - version: "0.3.2" + version: "0.3.3" vm_service: dependency: transitive description: diff --git a/packages/veilid_support/pubspec.yaml b/packages/veilid_support/pubspec.yaml index 5fb9af9..90d7ad8 100644 --- a/packages/veilid_support/pubspec.yaml +++ b/packages/veilid_support/pubspec.yaml @@ -7,9 +7,9 @@ environment: sdk: '>=3.2.0 <4.0.0' dependencies: - async_tools: ^0.1.3 + async_tools: ^0.1.4 bloc: ^8.1.4 - bloc_advanced_tools: ^0.1.3 + bloc_advanced_tools: ^0.1.4 charcode: ^1.3.1 collection: ^1.18.0 equatable: ^2.0.5 diff --git a/pubspec.lock b/pubspec.lock index 6bde90f..42ac495 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -85,10 +85,10 @@ packages: dependency: "direct main" description: name: async_tools - sha256: f35f5590711f1422c7eff990351e879c5433486f9b5be5df30818521bf6ab8d6 + sha256: "375da1e5b51974a9e84469b7630f36d1361fb53d3fcc818a5a0121ab51fe0343" url: "https://pub.dev" source: hosted - version: "0.1.3" + version: "0.1.4" awesome_extensions: dependency: "direct main" description: @@ -141,10 +141,10 @@ packages: dependency: "direct main" description: name: bloc_advanced_tools - sha256: "11f534f9e89561302de3bd07ab2dfe1c2dacaa8db9794ccdb57c55cfc02dffc6" + sha256: "0599d860eb096c5b12457c60bdf7f66bfcb7171bc94ccf2c7c752b6a716a79f9" url: "https://pub.dev" source: hosted - version: "0.1.3" + version: "0.1.4" blurry_modal_progress_hud: dependency: "direct main" description: diff --git a/pubspec.yaml b/pubspec.yaml index 0187a89..ca47d9d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -14,12 +14,12 @@ dependencies: animated_theme_switcher: ^2.0.10 ansicolor: ^2.0.2 archive: ^3.6.1 - async_tools: ^0.1.3 + async_tools: ^0.1.4 awesome_extensions: ^2.0.16 badges: ^3.1.2 basic_utils: ^5.7.0 bloc: ^8.1.4 - bloc_advanced_tools: ^0.1.3 + bloc_advanced_tools: ^0.1.4 blurry_modal_progress_hud: ^1.1.1 change_case: ^2.1.0 charcode: ^1.3.1