From 152c8bdff439b47cf1752888cf113304c7bdfa24 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 21 Jun 2024 22:44:35 -0400 Subject: [PATCH] ui cleanup --- .../cubits/single_contact_messages_cubit.dart | 44 +- lib/chat_list/cubits/chat_list_cubit.dart | 73 +- .../cubits/contact_invitation_list_cubit.dart | 3 +- .../views/contact_invitation_display.dart | 8 +- lib/contacts/cubits/contact_list_cubit.dart | 27 +- lib/contacts/views/contact_item_widget.dart | 8 +- ...ve_single_contact_chat_bloc_map_cubit.dart | 1 - .../cubits/conversation_cubit.dart | 54 +- lib/layout/home/drawer_menu/drawer_menu.dart | 38 +- lib/theme/models/scale_scheme.dart | 4 +- lib/theme/views/widget_helpers.dart | 75 +- .../src/dht_record/dht_record.dart | 16 +- .../src/dht_record/dht_record_pool.dart | 1168 ++++++++--------- .../dht_record/dht_record_pool_private.dart | 77 ++ .../lib/src/persistent_queue.dart | 3 +- 15 files changed, 827 insertions(+), 772 deletions(-) create mode 100644 packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 515ed03..9854535 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -4,7 +4,6 @@ import 'package:async_tools/async_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; -import 'package:protobuf/protobuf.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; @@ -82,6 +81,16 @@ class SingleContactMessagesCubit extends Cubit { await _sentMessagesCubit?.close(); await _rcvdMessagesCubit?.close(); await _reconciledMessagesCubit?.close(); + + // If the local conversation record is gone, then delete the reconciled + // messages table as well + final conversationDead = await DHTRecordPool.instance + .isDeletedRecordKey(_localConversationRecordKey); + if (conversationDead) { + await SingleContactMessagesCubit.cleanupAndDeleteMessages( + localConversationRecordKey: _localConversationRecordKey); + } + await super.close(); } @@ -292,8 +301,14 @@ class SingleContactMessagesCubit extends Cubit { previousMessage = message; } + // _sendingMessages = messages; + + // _renderState(); + await _sentMessagesCubit!.operateAppendEventual((writer) => writer.addAll(messages.map((m) => m.writeToBuffer()).toList())); + + // _sendingMessages = const IList.empty(); } // Produce a state for this cubit from the input cubits and queues @@ -304,7 +319,7 @@ class SingleContactMessagesCubit extends Cubit { // Get all sent messages final sentMessages = _sentMessagesCubit?.state.state.asData?.value; //Get all items in the unsent queue - final unsentMessages = _unsentMessagesQueue.queue; + //final unsentMessages = _unsentMessagesQueue.queue; // If we aren't ready to render a state, say we're loading if (reconciledMessages == null || sentMessages == null) { @@ -329,7 +344,7 @@ class SingleContactMessagesCubit extends Cubit { // ); final renderedElements = []; - + final renderedIds = {}; for (final m in reconciledMessages.windowElements) { final isLocal = m.content.author.toVeilid() == _accountInfo.identityTypedPublicKey; @@ -346,13 +361,22 @@ class SingleContactMessagesCubit extends Cubit { sent: sent, sentOffline: sentOffline, )); + + renderedIds.add(m.content.authorUniqueIdString); } - for (final m in unsentMessages) { - renderedElements.add(RenderStateElement( - message: (m.deepCopy())..id = m.timestamp.toBytes(), - isLocal: true, - )); - } + + // Render in-flight messages at the bottom + // for (final m in _sendingMessages) { + // if (renderedIds.contains(m.authorUniqueIdString)) { + // continue; + // } + // renderedElements.add(RenderStateElement( + // message: m, + // isLocal: true, + // sent: true, + // sentOffline: true, + // )); + // } // Render the state final messages = renderedElements @@ -426,7 +450,7 @@ class SingleContactMessagesCubit extends Cubit { late final MessageReconciliation _reconciliation; late final PersistentQueue _unsentMessagesQueue; - + // IList _sendingMessages = const IList.empty(); StreamSubscription>? _sentSubscription; StreamSubscription>? _rcvdSubscription; StreamSubscription>? diff --git a/lib/chat_list/cubits/chat_list_cubit.dart b/lib/chat_list/cubits/chat_list_cubit.dart index fa262f6..6bb88c1 100644 --- a/lib/chat_list/cubits/chat_list_cubit.dart +++ b/lib/chat_list/cubits/chat_list_cubit.dart @@ -8,7 +8,6 @@ import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; import '../../chat/chat.dart'; import '../../proto/proto.dart' as proto; -import '../../tools/tools.dart'; ////////////////////////////////////////////////// @@ -58,9 +57,20 @@ class ChatListCubit extends DHTShortArrayCubit final remoteConversationRecordKey = contact.remoteConversationRecordKey.toVeilid(); + // Create 1:1 conversation type Chat + final chatMember = proto.ChatMember() + ..remoteIdentityPublicKey = remoteIdentityPublicKey.toProto() + ..remoteConversationRecordKey = remoteConversationRecordKey.toProto(); + + final directChat = proto.DirectChat() + ..settings = await getDefaultChatSettings(contact) + ..localConversationRecordKey = localConversationRecordKey.toProto() + ..remoteMember = chatMember; + + final chat = proto.Chat()..direct = directChat; + // Add Chat to account's list - // if this fails, don't keep retrying, user can try again later - await operateWrite((writer) async { + await operateWriteEventual((writer) async { // See if we have added this chat already for (var i = 0; i < writer.length; i++) { final cbuf = await writer.get(i); @@ -89,18 +99,6 @@ class ChatListCubit extends DHTShortArrayCubit } } - // Create 1:1 conversation type Chat - final chatMember = proto.ChatMember() - ..remoteIdentityPublicKey = remoteIdentityPublicKey.toProto() - ..remoteConversationRecordKey = remoteConversationRecordKey.toProto(); - - final directChat = proto.DirectChat() - ..settings = await getDefaultChatSettings(contact) - ..localConversationRecordKey = localConversationRecordKey.toProto() - ..remoteMember = chatMember; - - final chat = proto.Chat()..direct = directChat; - // Add chat await writer.add(chat.writeToBuffer()); }); @@ -110,37 +108,22 @@ class ChatListCubit extends DHTShortArrayCubit Future deleteChat( {required TypedKey localConversationRecordKey}) async { // Remove Chat from account's list - // if this fails, don't keep retrying, user can try again later - final deletedItem = - // Ensure followers get their changes before we return - await syncFollowers(() => operateWrite((writer) async { - if (_activeChatCubit.state == localConversationRecordKey) { - _activeChatCubit.setActiveChat(null); - } - for (var i = 0; i < writer.length; i++) { - final c = await writer.getProtobuf(proto.Chat.fromBuffer, i); - if (c == null) { - throw Exception('Failed to get chat'); - } - - if (c.localConversationRecordKey == - localConversationRecordKey) { - await writer.remove(i); - return c; - } - } - return null; - })); - // Since followers are synced, we can safetly remove the reconciled - // chat record now - if (deletedItem != null) { - try { - await SingleContactMessagesCubit.cleanupAndDeleteMessages( - localConversationRecordKey: localConversationRecordKey); - } on Exception catch (e) { - log.debug('error removing reconciled chat table: $e', e); + await operateWriteEventual((writer) async { + if (_activeChatCubit.state == localConversationRecordKey) { + _activeChatCubit.setActiveChat(null); } - } + for (var i = 0; i < writer.length; i++) { + final c = await writer.getProtobuf(proto.Chat.fromBuffer, i); + if (c == null) { + throw Exception('Failed to get chat'); + } + + if (c.localConversationRecordKey == localConversationRecordKey) { + await writer.remove(i); + return; + } + } + }); } /// StateMapFollowable ///////////////////////// diff --git a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart index cac3d6c..9bd589e 100644 --- a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart +++ b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart @@ -152,8 +152,7 @@ class ContactInvitationListCubit ..message = message; // Add ContactInvitationRecord to account's list - // if this fails, don't keep retrying, user can try again later - await operateWrite((writer) async { + await operateWriteEventual((writer) async { await writer.add(cinvrec.writeToBuffer()); }); }); diff --git a/lib/contact_invitation/views/contact_invitation_display.dart b/lib/contact_invitation/views/contact_invitation_display.dart index 374a309..2e4acad 100644 --- a/lib/contact_invitation/views/contact_invitation_display.dart +++ b/lib/contact_invitation/views/contact_invitation_display.dart @@ -46,7 +46,6 @@ class ContactInvitationDisplayDialog extends StatelessWidget { // ignore: prefer_expression_function_bodies Widget build(BuildContext context) { final theme = Theme.of(context); - //final scale = theme.extension()!; final textTheme = theme.textTheme; final signedContactInvitationBytesV = @@ -58,6 +57,9 @@ class ContactInvitationDisplayDialog extends StatelessWidget { return PopControl( dismissible: !signedContactInvitationBytesV.isLoading, child: Dialog( + shape: RoundedRectangleBorder( + side: const BorderSide(width: 2), + borderRadius: BorderRadius.circular(16)), backgroundColor: Colors.white, child: ConstrainedBox( constraints: BoxConstraints( @@ -90,6 +92,10 @@ class ContactInvitationDisplayDialog extends StatelessWidget { .paddingAll(8), ElevatedButton.icon( icon: const Icon(Icons.copy), + style: ElevatedButton.styleFrom( + foregroundColor: Colors.black, + backgroundColor: Colors.white, + side: const BorderSide()), label: Text(translate( 'create_invitation_dialog.copy_invitation')), onPressed: () async { diff --git a/lib/contacts/cubits/contact_list_cubit.dart b/lib/contacts/cubits/contact_list_cubit.dart index 47433c7..aaecca4 100644 --- a/lib/contacts/cubits/contact_list_cubit.dart +++ b/lib/contacts/cubits/contact_list_cubit.dart @@ -6,7 +6,6 @@ import 'package:protobuf/protobuf.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; -import '../../conversation/conversation.dart'; import '../../proto/proto.dart' as proto; import '../../tools/tools.dart'; @@ -17,8 +16,7 @@ class ContactListCubit extends DHTShortArrayCubit { ContactListCubit({ required AccountInfo accountInfo, required OwnedDHTRecordPointer contactListRecordPointer, - }) : _accountInfo = accountInfo, - super( + }) : super( open: () => _open(accountInfo.accountRecordKey, contactListRecordPointer), decodeElement: proto.Contact.fromBuffer); @@ -98,8 +96,7 @@ class ContactListCubit extends DHTShortArrayCubit { ..showAvailability = false; // Add Contact to account's list - // if this fails, don't keep retrying, user can try again later - await operateWrite((writer) async { + await operateWriteEventual((writer) async { await writer.add(contact.writeToBuffer()); }); } @@ -107,7 +104,7 @@ class ContactListCubit extends DHTShortArrayCubit { Future deleteContact( {required TypedKey localConversationRecordKey}) async { // Remove Contact from account's list - final deletedItem = await operateWrite((writer) async { + final deletedItem = await operateWriteEventual((writer) async { for (var i = 0; i < writer.length; i++) { final item = await writer.getProtobuf(proto.Contact.fromBuffer, i); if (item == null) { @@ -124,18 +121,11 @@ class ContactListCubit extends DHTShortArrayCubit { if (deletedItem != null) { try { - // Make a conversation cubit to manipulate the conversation - final conversationCubit = ConversationCubit( - accountInfo: _accountInfo, - remoteIdentityPublicKey: deletedItem.identityPublicKey.toVeilid(), - localConversationRecordKey: - deletedItem.localConversationRecordKey.toVeilid(), - remoteConversationRecordKey: - deletedItem.remoteConversationRecordKey.toVeilid(), - ); - - // Delete the local and remote conversation records - await conversationCubit.delete(); + // Mark the conversation records for deletion + await DHTRecordPool.instance + .deleteRecord(deletedItem.localConversationRecordKey.toVeilid()); + await DHTRecordPool.instance + .deleteRecord(deletedItem.remoteConversationRecordKey.toVeilid()); } on Exception catch (e) { log.debug('error deleting conversation records: $e', e); } @@ -144,5 +134,4 @@ class ContactListCubit extends DHTShortArrayCubit { final _contactProfileUpdateMap = SingleStateProcessorMap(); - final AccountInfo _accountInfo; } diff --git a/lib/contacts/views/contact_item_widget.dart b/lib/contacts/views/contact_item_widget.dart index a7c2836..a7441e9 100644 --- a/lib/contacts/views/contact_item_widget.dart +++ b/lib/contacts/views/contact_item_widget.dart @@ -74,13 +74,13 @@ class ContactItemWidget extends StatelessWidget { final contactListCubit = context.read(); final chatListCubit = context.read(); - // Remove any chats for this contact - await chatListCubit.deleteChat( - localConversationRecordKey: localConversationRecordKey); - // Delete the contact itself await contactListCubit.deleteContact( localConversationRecordKey: localConversationRecordKey); + + // Remove any chats for this contact + await chatListCubit.deleteChat( + localConversationRecordKey: localConversationRecordKey); }) ], ); diff --git a/lib/conversation/cubits/active_single_contact_chat_bloc_map_cubit.dart b/lib/conversation/cubits/active_single_contact_chat_bloc_map_cubit.dart index 6c6d4b8..88860c4 100644 --- a/lib/conversation/cubits/active_single_contact_chat_bloc_map_cubit.dart +++ b/lib/conversation/cubits/active_single_contact_chat_bloc_map_cubit.dart @@ -28,7 +28,6 @@ class _SingleContactChatState extends Equatable { final TypedKey remoteMessagesRecordKey; @override - // TODO: implement props List get props => [ remoteIdentityPublicKey, localConversationRecordKey, diff --git a/lib/conversation/cubits/conversation_cubit.dart b/lib/conversation/cubits/conversation_cubit.dart index 728489f..1947504 100644 --- a/lib/conversation/cubits/conversation_cubit.dart +++ b/lib/conversation/cubits/conversation_cubit.dart @@ -14,7 +14,6 @@ import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; import '../../proto/proto.dart' as proto; -import '../../tools/tools.dart'; const _sfUpdateAccountChange = 'updateAccountChange'; @@ -116,7 +115,7 @@ class ConversationCubit extends Cubit> { final accountRecordKey = _accountInfo.accountRecordKey; final writer = _accountInfo.identityWriter; - // Open with SMPL scheme for identity writer + // Open with SMPL schema for identity writer late final DHTRecord localConversationRecord; if (existingConversationRecordKey != null) { localConversationRecord = await pool.openRecordWrite( @@ -171,57 +170,6 @@ class ConversationCubit extends Cubit> { return out; } - /// Delete the conversation keys associated with this conversation - Future delete() async { - final pool = DHTRecordPool.instance; - - await _initWait(); - final localConversationCubit = _localConversationCubit; - final remoteConversationCubit = _remoteConversationCubit; - - final deleteSet = DelayedWaitSet(); - - if (localConversationCubit != null) { - final data = localConversationCubit.state.asData; - if (data == null) { - log.warning('could not delete local conversation'); - return false; - } - - deleteSet.add(() async { - _localConversationCubit = null; - await localConversationCubit.close(); - final conversation = data.value; - final messagesKey = conversation.messages.toVeilid(); - await pool.deleteRecord(messagesKey); - await pool.deleteRecord(_localConversationRecordKey!); - _localConversationRecordKey = null; - }); - } - - if (remoteConversationCubit != null) { - final data = remoteConversationCubit.state.asData; - if (data == null) { - log.warning('could not delete remote conversation'); - return false; - } - - deleteSet.add(() async { - _remoteConversationCubit = null; - await remoteConversationCubit.close(); - final conversation = data.value; - final messagesKey = conversation.messages.toVeilid(); - await pool.deleteRecord(messagesKey); - await pool.deleteRecord(_remoteConversationRecordKey!); - }); - } - - // Commit the delete futures - await deleteSet(); - - return true; - } - /// Force refresh of conversation keys Future refresh() async { await _initWait(); diff --git a/lib/layout/home/drawer_menu/drawer_menu.dart b/lib/layout/home/drawer_menu/drawer_menu.dart index 1e56c4b..218d7ed 100644 --- a/lib/layout/home/drawer_menu/drawer_menu.dart +++ b/lib/layout/home/drawer_menu/drawer_menu.dart @@ -71,11 +71,22 @@ class _DrawerMenuState extends State { shortname = abbrev; } - final avatar = AvatarImage( - size: 32, - backgroundColor: loggedIn ? scale.primary : scale.elementBackground, - foregroundColor: loggedIn ? scale.primaryText : scale.subtleText, - child: Text(shortname, style: theme.textTheme.titleLarge)); + final avatar = Container( + height: 34, + width: 34, + decoration: BoxDecoration( + shape: BoxShape.circle, + border: Border.all( + color: loggedIn ? scale.border : scale.subtleBorder, + width: 2, + strokeAlign: BorderSide.strokeAlignOutside), + color: Colors.blue, + ), + child: AvatarImage( + //size: 32, + backgroundColor: loggedIn ? scale.primary : scale.elementBackground, + foregroundColor: loggedIn ? scale.primaryText : scale.subtleText, + child: Text(shortname, style: theme.textTheme.titleLarge))); return AnimatedPadding( padding: EdgeInsets.fromLTRB(selected ? 0 : 0, 0, selected ? 0 : 8, 0), @@ -234,6 +245,7 @@ class _DrawerMenuState extends State { Widget build(BuildContext context) { final theme = Theme.of(context); final scale = theme.extension()!; + final scaleConfig = theme.extension()!; //final textTheme = theme.textTheme; final localAccounts = context.watch().state; final perAccountCollectionBlocMapState = @@ -269,13 +281,17 @@ class _DrawerMenuState extends State { fit: BoxFit.scaleDown, child: Row(children: [ SvgPicture.asset( - height: 48, - 'assets/images/icon.svg', - ).paddingLTRB(0, 0, 16, 0), + height: 48, + 'assets/images/icon.svg', + colorFilter: scaleConfig.useVisualIndicators + ? grayColorFilter + : null) + .paddingLTRB(0, 0, 16, 0), SvgPicture.asset( - height: 48, - 'assets/images/title.svg', - ), + height: 48, + 'assets/images/title.svg', + colorFilter: + scaleConfig.useVisualIndicators ? grayColorFilter : null), ])), const Spacer(), _getAccountList( diff --git a/lib/theme/models/scale_scheme.dart b/lib/theme/models/scale_scheme.dart index 990fe1e..642dfee 100644 --- a/lib/theme/models/scale_scheme.dart +++ b/lib/theme/models/scale_scheme.dart @@ -89,11 +89,9 @@ class ScaleScheme extends ThemeExtension { onError: errorScale.primaryText, // errorContainer: errorScale.hoverElementBackground, // onErrorContainer: errorScale.subtleText, - background: grayScale.appBackground, // reviewed - onBackground: grayScale.appText, // reviewed surface: primaryScale.primary, // reviewed onSurface: primaryScale.primaryText, // reviewed - surfaceVariant: secondaryScale.primary, + surfaceContainerHighest: secondaryScale.primary, onSurfaceVariant: secondaryScale.primaryText, // ?? reviewed a little outline: primaryScale.border, outlineVariant: secondaryScale.border, diff --git a/lib/theme/views/widget_helpers.dart b/lib/theme/views/widget_helpers.dart index 52f26ac..e3dfd94 100644 --- a/lib/theme/views/widget_helpers.dart +++ b/lib/theme/views/widget_helpers.dart @@ -5,7 +5,6 @@ import 'package:blurry_modal_progress_hud/blurry_modal_progress_hud.dart'; import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:flutter_spinkit/flutter_spinkit.dart'; -import 'package:flutter_translate/flutter_translate.dart'; import 'package:motion_toast/motion_toast.dart'; import 'package:quickalert/quickalert.dart'; @@ -122,36 +121,45 @@ Future showErrorModal( } void showErrorToast(BuildContext context, String message) { - MotionToast.error( - title: Text(translate('toast.error')), + final theme = Theme.of(context); + final scale = theme.extension()!; + final scaleConfig = theme.extension()!; + + MotionToast( + //title: Text(translate('toast.error')), description: Text(message), + constraints: BoxConstraints.loose(const Size(400, 100)), + contentPadding: const EdgeInsets.all(16), + primaryColor: scale.errorScale.elementBackground, + secondaryColor: scale.errorScale.calloutBackground, + borderRadius: 16, + toastDuration: const Duration(seconds: 4), + animationDuration: const Duration(milliseconds: 1000), + displayBorder: scaleConfig.useVisualIndicators, + icon: Icons.error, ).show(context); } void showInfoToast(BuildContext context, String message) { - MotionToast.info( - title: Text(translate('toast.info')), + final theme = Theme.of(context); + final scale = theme.extension()!; + final scaleConfig = theme.extension()!; + + MotionToast( + //title: Text(translate('toast.info')), description: Text(message), + constraints: BoxConstraints.loose(const Size(400, 100)), + contentPadding: const EdgeInsets.all(16), + primaryColor: scale.tertiaryScale.elementBackground, + secondaryColor: scale.tertiaryScale.calloutBackground, + borderRadius: 16, + toastDuration: const Duration(seconds: 2), + animationDuration: const Duration(milliseconds: 500), + displayBorder: scaleConfig.useVisualIndicators, + icon: Icons.info, ).show(context); } -// Widget insetBorder( -// {required BuildContext context, -// required bool enabled, -// required Color color, -// required Widget child}) { -// if (!enabled) { -// return child; -// } - -// return Stack({ -// children: [] { -// DecoratedBox(decoration: BoxDecoration() -// child, -// } -// }) -// } - Widget styledTitleContainer({ required BuildContext context, required String title, @@ -230,3 +238,26 @@ Widget styledBottomSheet({ bool get isPlatformDark => WidgetsBinding.instance.platformDispatcher.platformBrightness == Brightness.dark; + +const grayColorFilter = ColorFilter.matrix([ + 0.2126, + 0.7152, + 0.0722, + 0, + 0, + 0.2126, + 0.7152, + 0.0722, + 0, + 0, + 0.2126, + 0.7152, + 0.0722, + 0, + 0, + 0, + 0, + 0, + 1, + 0, +]); diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index 28fa907..e04af10 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -1,7 +1,5 @@ part of 'dht_record_pool.dart'; -const _sfListen = 'listen'; - @immutable class DHTRecordWatchChange extends Equatable { const DHTRecordWatchChange( @@ -41,7 +39,7 @@ enum DHTRecordRefreshMode { class DHTRecord implements DHTDeleteable { DHTRecord._( {required VeilidRoutingContext routingContext, - required SharedDHTRecordData sharedDHTRecordData, + required _SharedDHTRecordData sharedDHTRecordData, required int defaultSubkey, required KeyPair? writer, required VeilidCrypto crypto, @@ -241,7 +239,7 @@ class DHTRecord implements DHTDeleteable { // if so, shortcut and don't bother decrypting it if (newValueData.data.equals(encryptedNewValue)) { if (isUpdated) { - DHTRecordPool.instance.processLocalValueChange(key, newValue, subkey); + DHTRecordPool.instance._processLocalValueChange(key, newValue, subkey); } return null; } @@ -251,7 +249,7 @@ class DHTRecord implements DHTDeleteable { await (crypto ?? _crypto).decrypt(newValueData.data); if (isUpdated) { DHTRecordPool.instance - .processLocalValueChange(key, decryptedNewValue, subkey); + ._processLocalValueChange(key, decryptedNewValue, subkey); } return decryptedNewValue; } @@ -298,7 +296,7 @@ class DHTRecord implements DHTDeleteable { final isUpdated = newValueData.seq != lastSeq; if (isUpdated) { - DHTRecordPool.instance.processLocalValueChange(key, newValue, subkey); + DHTRecordPool.instance._processLocalValueChange(key, newValue, subkey); } } @@ -419,7 +417,7 @@ class DHTRecord implements DHTDeleteable { // Set up watch requirements which will get picked up by the next tick final oldWatchState = watchState; watchState = - WatchState(subkeys: subkeys, expiration: expiration, count: count); + _WatchState(subkeys: subkeys, expiration: expiration, count: count); if (oldWatchState != watchState) { _sharedDHTRecordData.needsWatchStateUpdate = true; } @@ -544,7 +542,7 @@ class DHTRecord implements DHTDeleteable { ////////////////////////////////////////////////////////////// - final SharedDHTRecordData _sharedDHTRecordData; + final _SharedDHTRecordData _sharedDHTRecordData; final VeilidRoutingContext _routingContext; final int _defaultSubkey; final KeyPair? _writer; @@ -554,5 +552,5 @@ class DHTRecord implements DHTDeleteable { int _openCount; StreamController? _watchController; @internal - WatchState? watchState; + _WatchState? watchState; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index 3efd8a7..b80db1f 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -16,20 +16,11 @@ export 'package:fast_immutable_collections/fast_immutable_collections.dart' part 'dht_record_pool.freezed.dart'; part 'dht_record_pool.g.dart'; part 'dht_record.dart'; - -const int watchBackoffMultiplier = 2; -const int watchBackoffMax = 30; - -const int? defaultWatchDurationSecs = null; // 600 -const int watchRenewalNumerator = 4; -const int watchRenewalDenominator = 5; +part 'dht_record_pool_private.dart'; // Maximum number of concurrent DHT operations to perform on the network const int maxDHTConcurrency = 8; -// DHT crypto domain -const String cryptoDomainDHT = 'dht'; - typedef DHTRecordPoolLogger = void Function(String message); /// Record pool that managed DHTRecords and allows for tagged deletion @@ -62,114 +53,18 @@ class OwnedDHTRecordPointer with _$OwnedDHTRecordPointer { _$OwnedDHTRecordPointerFromJson(json as Map); } -/// Watch state -@immutable -class WatchState extends Equatable { - const WatchState( - {required this.subkeys, - required this.expiration, - required this.count, - this.realExpiration, - this.renewalTime}); - final List? subkeys; - final Timestamp? expiration; - final int? count; - final Timestamp? realExpiration; - final Timestamp? renewalTime; - - @override - List get props => - [subkeys, expiration, count, realExpiration, renewalTime]; -} - -/// Data shared amongst all DHTRecord instances -class SharedDHTRecordData { - SharedDHTRecordData( - {required this.recordDescriptor, - required this.defaultWriter, - required this.defaultRoutingContext}); - DHTRecordDescriptor recordDescriptor; - KeyPair? defaultWriter; - VeilidRoutingContext defaultRoutingContext; - bool needsWatchStateUpdate = false; - WatchState? unionWatchState; -} - -// Per opened record data -class OpenedRecordInfo { - OpenedRecordInfo( - {required DHTRecordDescriptor recordDescriptor, - required KeyPair? defaultWriter, - required VeilidRoutingContext defaultRoutingContext}) - : shared = SharedDHTRecordData( - recordDescriptor: recordDescriptor, - defaultWriter: defaultWriter, - defaultRoutingContext: defaultRoutingContext); - SharedDHTRecordData shared; - Set records = {}; - - String get debugNames { - final r = records.toList() - ..sort((a, b) => a.key.toString().compareTo(b.key.toString())); - return '[${r.map((x) => x.debugName).join(',')}]'; - } - - String get details { - final r = records.toList() - ..sort((a, b) => a.key.toString().compareTo(b.key.toString())); - return '[${r.map((x) => "writer=${x._writer} " - "defaultSubkey=${x._defaultSubkey}").join(',')}]'; - } - - String get sharedDetails => shared.toString(); -} +////////////////////////////////////////////////////////////////////////////// +/// Allocator and management system for DHTRecord class DHTRecordPool with TableDBBackedJson { DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext) : _state = const DHTRecordPoolAllocations(), _mutex = Mutex(), - _opened = {}, + _opened = {}, _markedForDelete = {}, _routingContext = routingContext, _veilid = veilid; - // Logger - DHTRecordPoolLogger? _logger; - - // Persistent DHT record list - DHTRecordPoolAllocations _state; - // Create/open Mutex - final Mutex _mutex; - // Which DHT records are currently open - final Map _opened; - // Which DHT records are marked for deletion - final Set _markedForDelete; - // Default routing context to use for new keys - final VeilidRoutingContext _routingContext; - // Convenience accessor - final Veilid _veilid; - // If tick is already running or not - bool _inTick = false; - // Tick counter for backoff - int _tickCount = 0; - // Backoff timer - int _watchBackoffTimer = 1; - - static DHTRecordPool? _singleton; - - ////////////////////////////////////////////////////////////// - /// AsyncTableDBBacked - @override - String tableName() => 'dht_record_pool'; - @override - String tableKeyName() => 'pool_allocations'; - @override - DHTRecordPoolAllocations valueFromJson(Object? obj) => obj != null - ? DHTRecordPoolAllocations.fromJson(obj) - : const DHTRecordPoolAllocations(); - @override - Object? valueToJson(DHTRecordPoolAllocations? val) => val?.toJson(); - ////////////////////////////////////////////////////////////// static DHTRecordPool get instance => _singleton!; @@ -190,337 +85,8 @@ class DHTRecordPool with TableDBBackedJson { } } - Veilid get veilid => _veilid; - - void log(String message) { - _logger?.call(message); - } - - Future _recordCreateInner( - {required String debugName, - required VeilidRoutingContext dhtctx, - required DHTSchema schema, - KeyPair? writer, - TypedKey? parent}) async { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } - // Create the record - final recordDescriptor = await dhtctx.createDHTRecord(schema); - - log('createDHTRecord: debugName=$debugName key=${recordDescriptor.key}'); - - // Reopen if a writer is specified to ensure - // we switch the default writer - if (writer != null) { - await dhtctx.openDHTRecord(recordDescriptor.key, writer: writer); - } - final openedRecordInfo = OpenedRecordInfo( - recordDescriptor: recordDescriptor, - defaultWriter: writer ?? recordDescriptor.ownerKeyPair(), - defaultRoutingContext: dhtctx); - _opened[recordDescriptor.key] = openedRecordInfo; - - // Register the dependency - await _addDependencyInner( - parent, - recordDescriptor.key, - debugName: debugName, - ); - - return openedRecordInfo; - } - - Future _recordOpenInner( - {required String debugName, - required VeilidRoutingContext dhtctx, - required TypedKey recordKey, - KeyPair? writer, - TypedKey? parent}) async { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } - log('openDHTRecord: debugName=$debugName key=$recordKey'); - - // If we are opening a key that already exists - // make sure we are using the same parent if one was specified - _validateParentInner(parent, recordKey); - - // See if this has been opened yet - final openedRecordInfo = _opened[recordKey]; - if (openedRecordInfo == null) { - // Fresh open, just open the record - final recordDescriptor = - await dhtctx.openDHTRecord(recordKey, writer: writer); - final newOpenedRecordInfo = OpenedRecordInfo( - recordDescriptor: recordDescriptor, - defaultWriter: writer, - defaultRoutingContext: dhtctx); - _opened[recordDescriptor.key] = newOpenedRecordInfo; - - // Register the dependency - await _addDependencyInner( - parent, - recordKey, - debugName: debugName, - ); - - return newOpenedRecordInfo; - } - - // Already opened - - // See if we need to reopen the record with a default writer and possibly - // a different routing context - if (writer != null && openedRecordInfo.shared.defaultWriter == null) { - final newRecordDescriptor = - await dhtctx.openDHTRecord(recordKey, writer: writer); - openedRecordInfo.shared.defaultWriter = writer; - openedRecordInfo.shared.defaultRoutingContext = dhtctx; - if (openedRecordInfo.shared.recordDescriptor.ownerSecret == null) { - openedRecordInfo.shared.recordDescriptor = newRecordDescriptor; - } - } - - // Register the dependency - await _addDependencyInner( - parent, - recordKey, - debugName: debugName, - ); - - return openedRecordInfo; - } - - // Called when a DHTRecord is closed - // Cleans up the opened record housekeeping and processes any late deletions - Future _recordClosed(DHTRecord record) async { - await _mutex.protect(() async { - final key = record.key; - - log('closeDHTRecord: debugName=${record.debugName} key=$key'); - - final openedRecordInfo = _opened[key]; - if (openedRecordInfo == null || - !openedRecordInfo.records.remove(record)) { - throw StateError('record already closed'); - } - if (openedRecordInfo.records.isEmpty) { - await _routingContext.closeDHTRecord(key); - _opened.remove(key); - - await _checkForLateDeletesInner(key); - } - }); - } - - // Check to see if this key can finally be deleted - // If any parents are marked for deletion, try them first - Future _checkForLateDeletesInner(TypedKey key) async { - // Get parent list in bottom up order including our own key - final parents = []; - TypedKey? nextParent = key; - while (nextParent != null) { - parents.add(nextParent); - nextParent = getParentRecordKey(nextParent); - } - - // If any parent is ready to delete all its children do it - for (final parent in parents) { - if (_markedForDelete.contains(parent)) { - final deleted = await _deleteRecordInner(parent); - if (!deleted) { - // If we couldn't delete a child then no 'marked for delete' parents - // above us will be ready to delete either - break; - } - } - } - } - - // Collect all dependencies (including the record itself) - // in reverse (bottom-up/delete order) - List _collectChildrenInner(TypedKey recordKey) { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } - final allDeps = []; - final currentDeps = [recordKey]; - while (currentDeps.isNotEmpty) { - final nextDep = currentDeps.removeLast(); - - allDeps.add(nextDep); - final childDeps = - _state.childrenByParent[nextDep.toJson()]?.toList() ?? []; - currentDeps.addAll(childDeps); - } - return allDeps.reversedView; - } - - /// Collect all dependencies (including the record itself) - /// in reverse (bottom-up/delete order) - Future> collectChildren(TypedKey recordKey) => - _mutex.protect(() async => _collectChildrenInner(recordKey)); - - /// Print children - String debugChildren(TypedKey recordKey, {List? allDeps}) { - allDeps ??= _collectChildrenInner(recordKey); - // ignore: avoid_print - var out = - 'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n'; - for (final dep in allDeps) { - if (dep != recordKey) { - // ignore: avoid_print - out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n'; - } - } - return out; - } - - // Actual delete function - Future _finalizeDeleteRecordInner(TypedKey recordKey) async { - log('_finalizeDeleteRecordInner: key=$recordKey'); - - // Remove this child from parents - await _removeDependenciesInner([recordKey]); - await _routingContext.deleteDHTRecord(recordKey); - _markedForDelete.remove(recordKey); - } - - // Deep delete mechanism inside mutex - Future _deleteRecordInner(TypedKey recordKey) async { - final toDelete = _readyForDeleteInner(recordKey); - if (toDelete.isNotEmpty) { - // delete now - for (final deleteKey in toDelete) { - await _finalizeDeleteRecordInner(deleteKey); - } - return true; - } - // mark for deletion - _markedForDelete.add(recordKey); - return false; - } - - /// Delete a record and its children if they are all closed - /// otherwise mark that record for deletion eventually - /// Returns true if the deletion was processed immediately - /// Returns false if the deletion was marked for later - Future deleteRecord(TypedKey recordKey) async => - _mutex.protect(() async => _deleteRecordInner(recordKey)); - - // If everything underneath is closed including itself, return the - // list of children (and itself) to finally actually delete - List _readyForDeleteInner(TypedKey recordKey) { - final allDeps = _collectChildrenInner(recordKey); - for (final dep in allDeps) { - if (_opened.containsKey(dep)) { - return []; - } - } - return allDeps; - } - - void _validateParentInner(TypedKey? parent, TypedKey child) { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } - - final childJson = child.toJson(); - final existingParent = _state.parentByChild[childJson]; - if (parent == null) { - if (existingParent != null) { - throw StateError('Child is already parented: $child'); - } - } else { - if (_state.rootRecords.contains(child)) { - throw StateError('Child already added as root: $child'); - } - if (existingParent != null && existingParent != parent) { - throw StateError('Child has two parents: $child <- $parent'); - } - } - } - - Future _addDependencyInner(TypedKey? parent, TypedKey child, - {required String debugName}) async { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } - if (parent == null) { - if (_state.rootRecords.contains(child)) { - // Dependency already added - return; - } - _state = await store(_state.copyWith( - rootRecords: _state.rootRecords.add(child), - debugNames: _state.debugNames.add(child.toJson(), debugName))); - } else { - final childrenOfParent = - _state.childrenByParent[parent.toJson()] ?? ISet(); - if (childrenOfParent.contains(child)) { - // Dependency already added (consecutive opens, etc) - return; - } - _state = await store(_state.copyWith( - childrenByParent: _state.childrenByParent - .add(parent.toJson(), childrenOfParent.add(child)), - parentByChild: _state.parentByChild.add(child.toJson(), parent), - debugNames: _state.debugNames.add(child.toJson(), debugName))); - } - } - - Future _removeDependenciesInner(List childList) async { - if (!_mutex.isLocked) { - throw StateError('should be locked here'); - } - var state = _state; - - for (final child in childList) { - if (_state.rootRecords.contains(child)) { - state = state.copyWith( - rootRecords: state.rootRecords.remove(child), - debugNames: state.debugNames.remove(child.toJson())); - } else { - final parent = state.parentByChild[child.toJson()]; - if (parent == null) { - continue; - } - final children = state.childrenByParent[parent.toJson()]!.remove(child); - if (children.isEmpty) { - state = state.copyWith( - childrenByParent: state.childrenByParent.remove(parent.toJson()), - parentByChild: state.parentByChild.remove(child.toJson()), - debugNames: state.debugNames.remove(child.toJson())); - } else { - state = state.copyWith( - childrenByParent: - state.childrenByParent.add(parent.toJson(), children), - parentByChild: state.parentByChild.remove(child.toJson()), - debugNames: state.debugNames.remove(child.toJson())); - } - } - } - - if (state != _state) { - _state = await store(state); - } - } - - bool _isValidRecordKeyInner(TypedKey key) { - if (_state.rootRecords.contains(key)) { - return true; - } - if (_state.childrenByParent.containsKey(key.toJson())) { - return true; - } - return false; - } - - Future isValidRecordKey(TypedKey key) => - _mutex.protect(() async => _isValidRecordKeyInner(key)); - - /////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // Public Interface /// Create a root DHTRecord that has no dependent records Future createRecord({ @@ -653,23 +219,52 @@ class DHTRecordPool with TableDBBackedJson { return _state.parentByChild[childJson]; } - /// Handle the DHT record updates coming from internal to this app - void processLocalValueChange(TypedKey key, Uint8List data, int subkey) { - // Change - for (final kv in _opened.entries) { - if (kv.key == key) { - for (final rec in kv.value.records) { - rec._addLocalValueChange(data, subkey); - } - break; + /// Check if record is allocated + Future isValidRecordKey(TypedKey key) => + _mutex.protect(() async => _isValidRecordKeyInner(key)); + + /// Check if record is marked for deletion or already gone + Future isDeletedRecordKey(TypedKey key) => + _mutex.protect(() async => _isDeletedRecordKeyInner(key)); + + /// Delete a record and its children if they are all closed + /// otherwise mark that record for deletion eventually + /// Returns true if the deletion was processed immediately + /// Returns false if the deletion was marked for later + Future deleteRecord(TypedKey recordKey) async => + _mutex.protect(() async => _deleteRecordInner(recordKey)); + + // If everything underneath is closed including itself, return the + // list of children (and itself) to finally actually delete + List _readyForDeleteInner(TypedKey recordKey) { + final allDeps = _collectChildrenInner(recordKey); + for (final dep in allDeps) { + if (_opened.containsKey(dep)) { + return []; } } + return allDeps; } - /// Generate default VeilidCrypto for a writer - static Future privateCryptoFromTypedSecret( - TypedKey typedSecret) async => - VeilidCryptoPrivate.fromTypedKey(typedSecret, cryptoDomainDHT); + /// Collect all dependencies (including the record itself) + /// in reverse (bottom-up/delete order) + Future> collectChildren(TypedKey recordKey) => + _mutex.protect(() async => _collectChildrenInner(recordKey)); + + /// Print children + String debugChildren(TypedKey recordKey, {List? allDeps}) { + allDeps ??= _collectChildrenInner(recordKey); + // ignore: avoid_print + var out = + 'Parent: $recordKey (${_state.debugNames[recordKey.toString()]})\n'; + for (final dep in allDeps) { + if (dep != recordKey) { + // ignore: avoid_print + out += ' Child: $dep (${_state.debugNames[dep.toString()]})\n'; + } + } + return out; + } /// Handle the DHT record updates coming from Veilid void processRemoteValueChange(VeilidUpdateValueChange updateValueChange) { @@ -711,7 +306,360 @@ class DHTRecordPool with TableDBBackedJson { } } - WatchState? _collectUnionWatchState(Iterable records) { + /// Log the current record allocations + void debugPrintAllocations() { + final sortedAllocations = _state.debugNames.entries.asList() + ..sort((a, b) => a.key.compareTo(b.key)); + + log('DHTRecordPool Allocations: (count=${sortedAllocations.length})'); + + for (final entry in sortedAllocations) { + log(' ${entry.key}: ${entry.value}'); + } + } + + /// Log the current opened record details + void debugPrintOpened() { + final sortedOpened = _opened.entries.asList() + ..sort((a, b) => a.key.toString().compareTo(b.key.toString())); + + log('DHTRecordPool Opened Records: (count=${sortedOpened.length})'); + + for (final entry in sortedOpened) { + log(' ${entry.key}: \n' + ' debugNames=${entry.value.debugNames}\n' + ' details=${entry.value.details}\n' + ' sharedDetails=${entry.value.sharedDetails}\n'); + } + } + + /// Public interface to DHTRecordPool logger + void log(String message) { + _logger?.call(message); + } + + /// Generate default VeilidCrypto for a writer + static Future privateCryptoFromTypedSecret( + TypedKey typedSecret) async => + VeilidCryptoPrivate.fromTypedKey(typedSecret, _cryptoDomainDHT); + + //////////////////////////////////////////////////////////////////////////// + // Private Implementation + + Future<_OpenedRecordInfo> _recordCreateInner( + {required String debugName, + required VeilidRoutingContext dhtctx, + required DHTSchema schema, + KeyPair? writer, + TypedKey? parent}) async { + if (!_mutex.isLocked) { + throw StateError('should be locked here'); + } + // Create the record + final recordDescriptor = await dhtctx.createDHTRecord(schema); + + log('createDHTRecord: debugName=$debugName key=${recordDescriptor.key}'); + + // Reopen if a writer is specified to ensure + // we switch the default writer + if (writer != null) { + await dhtctx.openDHTRecord(recordDescriptor.key, writer: writer); + } + final openedRecordInfo = _OpenedRecordInfo( + recordDescriptor: recordDescriptor, + defaultWriter: writer ?? recordDescriptor.ownerKeyPair(), + defaultRoutingContext: dhtctx); + _opened[recordDescriptor.key] = openedRecordInfo; + + // Register the dependency + await _addDependencyInner( + parent, + recordDescriptor.key, + debugName: debugName, + ); + + return openedRecordInfo; + } + + Future<_OpenedRecordInfo> _recordOpenInner( + {required String debugName, + required VeilidRoutingContext dhtctx, + required TypedKey recordKey, + KeyPair? writer, + TypedKey? parent}) async { + if (!_mutex.isLocked) { + throw StateError('should be locked here'); + } + log('openDHTRecord: debugName=$debugName key=$recordKey'); + + // If we are opening a key that already exists + // make sure we are using the same parent if one was specified + _validateParentInner(parent, recordKey); + + // See if this has been opened yet + final openedRecordInfo = _opened[recordKey]; + if (openedRecordInfo == null) { + // Fresh open, just open the record + final recordDescriptor = + await dhtctx.openDHTRecord(recordKey, writer: writer); + final newOpenedRecordInfo = _OpenedRecordInfo( + recordDescriptor: recordDescriptor, + defaultWriter: writer, + defaultRoutingContext: dhtctx); + _opened[recordDescriptor.key] = newOpenedRecordInfo; + + // Register the dependency + await _addDependencyInner( + parent, + recordKey, + debugName: debugName, + ); + + return newOpenedRecordInfo; + } + + // Already opened + + // See if we need to reopen the record with a default writer and possibly + // a different routing context + if (writer != null && openedRecordInfo.shared.defaultWriter == null) { + await dhtctx.openDHTRecord(recordKey, writer: writer); + // New writer if we didn't specify one before + openedRecordInfo.shared.defaultWriter = writer; + // New default routing context if we opened it again + openedRecordInfo.shared.defaultRoutingContext = dhtctx; + } + + // Register the dependency + await _addDependencyInner( + parent, + recordKey, + debugName: debugName, + ); + + return openedRecordInfo; + } + + // Called when a DHTRecord is closed + // Cleans up the opened record housekeeping and processes any late deletions + Future _recordClosed(DHTRecord record) async { + await _mutex.protect(() async { + final key = record.key; + + log('closeDHTRecord: debugName=${record.debugName} key=$key'); + + final openedRecordInfo = _opened[key]; + if (openedRecordInfo == null || + !openedRecordInfo.records.remove(record)) { + throw StateError('record already closed'); + } + if (openedRecordInfo.records.isEmpty) { + await _watchStateProcessors.remove(key); + await _routingContext.closeDHTRecord(key); + _opened.remove(key); + + await _checkForLateDeletesInner(key); + } + }); + } + + // Check to see if this key can finally be deleted + // If any parents are marked for deletion, try them first + Future _checkForLateDeletesInner(TypedKey key) async { + // Get parent list in bottom up order including our own key + final parents = []; + TypedKey? nextParent = key; + while (nextParent != null) { + parents.add(nextParent); + nextParent = getParentRecordKey(nextParent); + } + + // If any parent is ready to delete all its children do it + for (final parent in parents) { + if (_markedForDelete.contains(parent)) { + final deleted = await _deleteRecordInner(parent); + if (!deleted) { + // If we couldn't delete a child then no 'marked for delete' parents + // above us will be ready to delete either + break; + } + } + } + } + + // Collect all dependencies (including the record itself) + // in reverse (bottom-up/delete order) + List _collectChildrenInner(TypedKey recordKey) { + if (!_mutex.isLocked) { + throw StateError('should be locked here'); + } + final allDeps = []; + final currentDeps = [recordKey]; + while (currentDeps.isNotEmpty) { + final nextDep = currentDeps.removeLast(); + + allDeps.add(nextDep); + final childDeps = + _state.childrenByParent[nextDep.toJson()]?.toList() ?? []; + currentDeps.addAll(childDeps); + } + return allDeps.reversedView; + } + + // Actual delete function + Future _finalizeDeleteRecordInner(TypedKey recordKey) async { + log('_finalizeDeleteRecordInner: key=$recordKey'); + + // Remove this child from parents + await _removeDependenciesInner([recordKey]); + await _routingContext.deleteDHTRecord(recordKey); + _markedForDelete.remove(recordKey); + } + + // Deep delete mechanism inside mutex + Future _deleteRecordInner(TypedKey recordKey) async { + final toDelete = _readyForDeleteInner(recordKey); + if (toDelete.isNotEmpty) { + // delete now + for (final deleteKey in toDelete) { + await _finalizeDeleteRecordInner(deleteKey); + } + return true; + } + // mark for deletion + _markedForDelete.add(recordKey); + return false; + } + + void _validateParentInner(TypedKey? parent, TypedKey child) { + if (!_mutex.isLocked) { + throw StateError('should be locked here'); + } + + final childJson = child.toJson(); + final existingParent = _state.parentByChild[childJson]; + if (parent == null) { + if (existingParent != null) { + throw StateError('Child is already parented: $child'); + } + } else { + if (_state.rootRecords.contains(child)) { + throw StateError('Child already added as root: $child'); + } + if (existingParent != null && existingParent != parent) { + throw StateError('Child has two parents: $child <- $parent'); + } + } + } + + Future _addDependencyInner(TypedKey? parent, TypedKey child, + {required String debugName}) async { + if (!_mutex.isLocked) { + throw StateError('should be locked here'); + } + if (parent == null) { + if (_state.rootRecords.contains(child)) { + // Dependency already added + return; + } + _state = await store(_state.copyWith( + rootRecords: _state.rootRecords.add(child), + debugNames: _state.debugNames.add(child.toJson(), debugName))); + } else { + final childrenOfParent = + _state.childrenByParent[parent.toJson()] ?? ISet(); + if (childrenOfParent.contains(child)) { + // Dependency already added (consecutive opens, etc) + return; + } + _state = await store(_state.copyWith( + childrenByParent: _state.childrenByParent + .add(parent.toJson(), childrenOfParent.add(child)), + parentByChild: _state.parentByChild.add(child.toJson(), parent), + debugNames: _state.debugNames.add(child.toJson(), debugName))); + } + } + + Future _removeDependenciesInner(List childList) async { + if (!_mutex.isLocked) { + throw StateError('should be locked here'); + } + var state = _state; + + for (final child in childList) { + if (_state.rootRecords.contains(child)) { + state = state.copyWith( + rootRecords: state.rootRecords.remove(child), + debugNames: state.debugNames.remove(child.toJson())); + } else { + final parent = state.parentByChild[child.toJson()]; + if (parent == null) { + continue; + } + final children = state.childrenByParent[parent.toJson()]!.remove(child); + if (children.isEmpty) { + state = state.copyWith( + childrenByParent: state.childrenByParent.remove(parent.toJson()), + parentByChild: state.parentByChild.remove(child.toJson()), + debugNames: state.debugNames.remove(child.toJson())); + } else { + state = state.copyWith( + childrenByParent: + state.childrenByParent.add(parent.toJson(), children), + parentByChild: state.parentByChild.remove(child.toJson()), + debugNames: state.debugNames.remove(child.toJson())); + } + } + } + + if (state != _state) { + _state = await store(state); + } + } + + bool _isValidRecordKeyInner(TypedKey key) { + if (_state.rootRecords.contains(key)) { + return true; + } + if (_state.childrenByParent.containsKey(key.toJson())) { + return true; + } + return false; + } + + bool _isDeletedRecordKeyInner(TypedKey key) { + // Is this key gone? + if (!_isValidRecordKeyInner(key)) { + return true; + } + + // Is this key on its way out because it or one of its parents + // is scheduled to delete everything underneath it? + TypedKey? nextParent = key; + while (nextParent != null) { + if (_markedForDelete.contains(nextParent)) { + return true; + } + nextParent = getParentRecordKey(nextParent); + } + + return false; + } + + /// Handle the DHT record updates coming from internal to this app + void _processLocalValueChange(TypedKey key, Uint8List data, int subkey) { + // Change + for (final kv in _opened.entries) { + if (kv.key == key) { + for (final rec in kv.value.records) { + rec._addLocalValueChange(data, subkey); + } + break; + } + } + } + + static _WatchState? _collectUnionWatchState(Iterable records) { // Collect union of opened record watch states int? totalCount; Timestamp? maxExpiration; @@ -770,19 +718,19 @@ class DHTRecordPool with TableDBBackedJson { return null; } - return WatchState( + return _WatchState( subkeys: allSubkeys, expiration: maxExpiration, count: totalCount, renewalTime: earliestRenewalTime); } - void _updateWatchRealExpirations(Iterable records, + static void _updateWatchRealExpirations(Iterable records, Timestamp realExpiration, Timestamp renewalTime) { for (final rec in records) { final ws = rec.watchState; if (ws != null) { - rec.watchState = WatchState( + rec.watchState = _WatchState( subkeys: ws.subkeys, expiration: ws.expiration, count: ws.count, @@ -792,154 +740,194 @@ class DHTRecordPool with TableDBBackedJson { } } + Future _watchStateChange( + TypedKey openedRecordKey, _WatchState? unionWatchState) async { + // Get the current state for this watch + final openedRecordInfo = _opened[openedRecordKey]; + if (openedRecordInfo == null) { + // Record is gone, nothing to do + return; + } + final currentWatchState = openedRecordInfo.shared.unionWatchState; + final dhtctx = openedRecordInfo.shared.defaultRoutingContext; + + // If it's the same as our desired state there is nothing to do here + if (currentWatchState == unionWatchState) { + return; + } + + // Apply watch changes for record + if (unionWatchState == null) { + // Record needs watch cancel + // Only try this once, if it doesn't succeed then it can just expire + // on its own. + try { + final cancelled = await dhtctx.cancelDHTWatch(openedRecordKey); + + log('cancelDHTWatch: key=$openedRecordKey, cancelled=$cancelled, ' + 'debugNames=${openedRecordInfo.debugNames}'); + + openedRecordInfo.shared.unionWatchState = null; + openedRecordInfo.shared.needsWatchStateUpdate = false; + } on VeilidAPIException catch (e) { + // Failed to cancel DHT watch, try again next tick + log('Exception in watch cancel: $e'); + } + return; + } + + // Record needs new watch + try { + final subkeys = unionWatchState.subkeys?.toList(); + final count = unionWatchState.count; + final expiration = unionWatchState.expiration; + final now = veilid.now(); + + final realExpiration = await dhtctx.watchDHTValues(openedRecordKey, + subkeys: unionWatchState.subkeys?.toList(), + count: unionWatchState.count, + expiration: unionWatchState.expiration ?? + (_defaultWatchDurationSecs == null + ? null + : veilid.now().offset(TimestampDuration.fromMillis( + _defaultWatchDurationSecs! * 1000)))); + + final expirationDuration = realExpiration.diff(now); + final renewalTime = now.offset(TimestampDuration( + value: expirationDuration.value * + BigInt.from(_watchRenewalNumerator) ~/ + BigInt.from(_watchRenewalDenominator))); + + log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, ' + 'count=$count, expiration=$expiration, ' + 'realExpiration=$realExpiration, ' + 'renewalTime=$renewalTime, ' + 'debugNames=${openedRecordInfo.debugNames}'); + + // Update watch states with real expiration + if (realExpiration.value != BigInt.zero) { + openedRecordInfo.shared.unionWatchState = unionWatchState; + _updateWatchRealExpirations( + openedRecordInfo.records, realExpiration, renewalTime); + openedRecordInfo.shared.needsWatchStateUpdate = false; + } + } on VeilidAPIException catch (e) { + // Failed to cancel DHT watch, try again next tick + log('Exception in watch update: $e'); + } + } + + void _pollWatch(TypedKey openedRecordKey, _OpenedRecordInfo openedRecordInfo, + _WatchState unionWatchState) { + singleFuture((this, _sfPollWatch, openedRecordKey), () async { + final dhtctx = openedRecordInfo.shared.defaultRoutingContext; + + // Get single subkey to poll + // XXX: veilid api limits this for now until everyone supports + // inspectDHTRecord + final pollSubkey = unionWatchState.subkeys?.firstSubkey; + if (pollSubkey == null) { + return; + } + final pollSubkeys = [ValueSubkeyRange.single(pollSubkey)]; + + final currentReport = + await dhtctx.inspectDHTRecord(openedRecordKey, subkeys: pollSubkeys); + final currentSeq = currentReport.localSeqs.firstOrNull ?? -1; + + final valueData = await dhtctx.getDHTValue(openedRecordKey, pollSubkey, + forceRefresh: true); + if (valueData == null) { + return; + } + if (valueData.seq > currentSeq) { + processRemoteValueChange(VeilidUpdateValueChange( + key: openedRecordKey, + subkeys: pollSubkeys, + count: 0xFFFFFFFF, + value: valueData)); + } + }); + } + /// Ticker to check watch state change requests Future tick() async { - if (_tickCount < _watchBackoffTimer) { - _tickCount++; - return; - } - if (_inTick) { - return; - } - _inTick = true; - _tickCount = 0; final now = veilid.now(); - try { - final allSuccess = await _mutex.protect(() async { - // See if any opened records need watch state changes - final unord = Function()>[]; + await _mutex.protect(() async { + // See if any opened records need watch state changes + for (final kv in _opened.entries) { + final openedRecordKey = kv.key; + final openedRecordInfo = kv.value; - for (final kv in _opened.entries) { - final openedRecordKey = kv.key; - final openedRecordInfo = kv.value; - final dhtctx = openedRecordInfo.shared.defaultRoutingContext; + var wantsWatchStateUpdate = + openedRecordInfo.shared.needsWatchStateUpdate; - var wantsWatchStateUpdate = - openedRecordInfo.shared.needsWatchStateUpdate; - - // Check if we have reached renewal time for the watch - if (openedRecordInfo.shared.unionWatchState != null && - openedRecordInfo.shared.unionWatchState!.renewalTime != null && - now.value > - openedRecordInfo.shared.unionWatchState!.renewalTime!.value) { - wantsWatchStateUpdate = true; - } - - if (wantsWatchStateUpdate) { - // Update union watch state - final unionWatchState = openedRecordInfo.shared.unionWatchState = - _collectUnionWatchState(openedRecordInfo.records); - - // Apply watch changes for record - if (unionWatchState == null) { - unord.add(() async { - // Record needs watch cancel - var success = false; - try { - success = await dhtctx.cancelDHTWatch(openedRecordKey); - - log('cancelDHTWatch: key=$openedRecordKey, success=$success, ' - 'debugNames=${openedRecordInfo.debugNames}'); - - openedRecordInfo.shared.needsWatchStateUpdate = false; - } on VeilidAPIException catch (e) { - // Failed to cancel DHT watch, try again next tick - log('Exception in watch cancel: $e'); - } - return success; - }); - } else { - unord.add(() async { - // Record needs new watch - var success = false; - try { - final subkeys = unionWatchState.subkeys?.toList(); - final count = unionWatchState.count; - final expiration = unionWatchState.expiration; - final now = veilid.now(); - - final realExpiration = await dhtctx.watchDHTValues( - openedRecordKey, - subkeys: unionWatchState.subkeys?.toList(), - count: unionWatchState.count, - expiration: unionWatchState.expiration ?? - (defaultWatchDurationSecs == null - ? null - : veilid.now().offset( - TimestampDuration.fromMillis( - defaultWatchDurationSecs! * 1000)))); - - final expirationDuration = realExpiration.diff(now); - final renewalTime = now.offset(TimestampDuration( - value: expirationDuration.value * - BigInt.from(watchRenewalNumerator) ~/ - BigInt.from(watchRenewalDenominator))); - - log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, ' - 'count=$count, expiration=$expiration, ' - 'realExpiration=$realExpiration, ' - 'renewalTime=$renewalTime, ' - 'debugNames=${openedRecordInfo.debugNames}'); - - // Update watch states with real expiration - if (realExpiration.value != BigInt.zero) { - openedRecordInfo.shared.needsWatchStateUpdate = false; - _updateWatchRealExpirations( - openedRecordInfo.records, realExpiration, renewalTime); - success = true; - } - } on VeilidAPIException catch (e) { - // Failed to cancel DHT watch, try again next tick - log('Exception in watch update: $e'); - } - return success; - }); - } - } + // Check if we have reached renewal time for the watch + if (openedRecordInfo.shared.unionWatchState != null && + openedRecordInfo.shared.unionWatchState!.renewalTime != null && + now.value > + openedRecordInfo.shared.unionWatchState!.renewalTime!.value) { + wantsWatchStateUpdate = true; } - // Process all watch changes - return unord.isEmpty || - (await unord.map((f) => f()).wait).reduce((a, b) => a && b); - }); + if (wantsWatchStateUpdate) { + // Update union watch state + final unionWatchState = + _collectUnionWatchState(openedRecordInfo.records); - // If any watched did not success, back off the attempts to - // update the watches for a bit + final processed = _watchStateProcessors.updateState( + openedRecordKey, + unionWatchState, + (newState) => + _watchStateChange(openedRecordKey, unionWatchState)); - if (!allSuccess) { - _watchBackoffTimer *= watchBackoffMultiplier; - _watchBackoffTimer = min(_watchBackoffTimer, watchBackoffMax); - } else { - _watchBackoffTimer = 1; + // In lieu of a completed watch, set off a polling operation + // on the first value of the watched range, which, due to current + // veilid limitations can only be one subkey at a time right now + if (!processed && unionWatchState != null) { + _pollWatch(openedRecordKey, openedRecordInfo, unionWatchState); + } + } } - } finally { - _inTick = false; - } + }); } - void debugPrintAllocations() { - final sortedAllocations = _state.debugNames.entries.asList() - ..sort((a, b) => a.key.compareTo(b.key)); + ////////////////////////////////////////////////////////////// + // AsyncTableDBBacked + @override + String tableName() => 'dht_record_pool'; + @override + String tableKeyName() => 'pool_allocations'; + @override + DHTRecordPoolAllocations valueFromJson(Object? obj) => obj != null + ? DHTRecordPoolAllocations.fromJson(obj) + : const DHTRecordPoolAllocations(); + @override + Object? valueToJson(DHTRecordPoolAllocations? val) => val?.toJson(); - log('DHTRecordPool Allocations: (count=${sortedAllocations.length})'); + //////////////////////////////////////////////////////////////////////////// + // Fields - for (final entry in sortedAllocations) { - log(' ${entry.key}: ${entry.value}'); - } - } + // Logger + DHTRecordPoolLogger? _logger; - void debugPrintOpened() { - final sortedOpened = _opened.entries.asList() - ..sort((a, b) => a.key.toString().compareTo(b.key.toString())); + // Persistent DHT record list + DHTRecordPoolAllocations _state; + // Create/open Mutex + final Mutex _mutex; + // Which DHT records are currently open + final Map _opened; + // Which DHT records are marked for deletion + final Set _markedForDelete; + // Default routing context to use for new keys + final VeilidRoutingContext _routingContext; + // Convenience accessor + final Veilid _veilid; + Veilid get veilid => _veilid; + // Watch state processors + final _watchStateProcessors = + SingleStateProcessorMap(); - log('DHTRecordPool Opened Records: (count=${sortedOpened.length})'); - - for (final entry in sortedOpened) { - log(' ${entry.key}: \n' - ' debugNames=${entry.value.debugNames}\n' - ' details=${entry.value.details}\n' - ' sharedDetails=${entry.value.sharedDetails}\n'); - } - } + static DHTRecordPool? _singleton; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart new file mode 100644 index 0000000..b7cbba8 --- /dev/null +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool_private.dart @@ -0,0 +1,77 @@ +part of 'dht_record_pool.dart'; + +const int _watchBackoffMultiplier = 2; +const int _watchBackoffMax = 30; + +const int? _defaultWatchDurationSecs = null; // 600 +const int _watchRenewalNumerator = 4; +const int _watchRenewalDenominator = 5; + +// DHT crypto domain +const String _cryptoDomainDHT = 'dht'; + +// Singlefuture keys +const _sfPollWatch = '_pollWatch'; +const _sfListen = 'listen'; + +/// Watch state +@immutable +class _WatchState extends Equatable { + const _WatchState( + {required this.subkeys, + required this.expiration, + required this.count, + this.realExpiration, + this.renewalTime}); + final List? subkeys; + final Timestamp? expiration; + final int? count; + final Timestamp? realExpiration; + final Timestamp? renewalTime; + + @override + List get props => + [subkeys, expiration, count, realExpiration, renewalTime]; +} + +/// Data shared amongst all DHTRecord instances +class _SharedDHTRecordData { + _SharedDHTRecordData( + {required this.recordDescriptor, + required this.defaultWriter, + required this.defaultRoutingContext}); + DHTRecordDescriptor recordDescriptor; + KeyPair? defaultWriter; + VeilidRoutingContext defaultRoutingContext; + bool needsWatchStateUpdate = false; + _WatchState? unionWatchState; +} + +// Per opened record data +class _OpenedRecordInfo { + _OpenedRecordInfo( + {required DHTRecordDescriptor recordDescriptor, + required KeyPair? defaultWriter, + required VeilidRoutingContext defaultRoutingContext}) + : shared = _SharedDHTRecordData( + recordDescriptor: recordDescriptor, + defaultWriter: defaultWriter, + defaultRoutingContext: defaultRoutingContext); + _SharedDHTRecordData shared; + Set records = {}; + + String get debugNames { + final r = records.toList() + ..sort((a, b) => a.key.toString().compareTo(b.key.toString())); + return '[${r.map((x) => x.debugName).join(',')}]'; + } + + String get details { + final r = records.toList() + ..sort((a, b) => a.key.toString().compareTo(b.key.toString())); + return '[${r.map((x) => "writer=${x._writer} " + "defaultSubkey=${x._defaultSubkey}").join(',')}]'; + } + + String get sharedDetails => shared.toString(); +} diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart index f0cf17a..c7abe97 100644 --- a/packages/veilid_support/lib/src/persistent_queue.dart +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -8,8 +8,7 @@ import 'package:protobuf/protobuf.dart'; import 'table_db.dart'; class PersistentQueue - /*extends Cubit>>*/ with - TableDBBackedFromBuffer> { + with TableDBBackedFromBuffer> { // PersistentQueue( {required String table,