From 450bdf9c7c872beec0c6df57f82c59af5faa6877 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 20 Feb 2024 17:57:05 -0500 Subject: [PATCH] state follower --- lib/chat/cubits/active_chat_cubit.dart | 9 +- lib/chat/views/chat_component.dart | 2 +- .../active_conversation_messages_cubit.dart | 2 +- ... active_conversations_bloc_map_cubit.dart} | 5 +- lib/chat_list/cubits/cubits.dart | 2 +- .../chat_single_contact_item_widget.dart | 2 +- .../cubits/contact_invitation_list_cubit.dart | 188 +++++++-------- .../cubits/contact_request_inbox_cubit.dart | 151 ++++++++++++ lib/contact_invitation/cubits/cubits.dart | 3 + .../cubits/waiting_invitation_cubit.dart | 221 ++++++++++++++++++ .../waiting_invitations_bloc_map_cubit.dart | 58 +++++ .../models/accepted_contact.dart | 11 +- lib/contacts/views/contact_item_widget.dart | 15 -- .../home_account_ready_shell.dart | 11 +- lib/tools/async_transformer_cubit.dart | 62 +++++ lib/tools/bloc_map_cubit.dart | 8 + lib/tools/bloc_tools.dart | 12 + lib/tools/state_follower.dart | 78 +++++++ lib/tools/tools.dart | 3 + .../veilid_support/lib/src/memory_tools.dart | 72 ++++++ 20 files changed, 787 insertions(+), 128 deletions(-) rename lib/chat_list/cubits/{active_conversations_cubit.dart => active_conversations_bloc_map_cubit.dart} (94%) create mode 100644 lib/contact_invitation/cubits/contact_request_inbox_cubit.dart create mode 100644 lib/contact_invitation/cubits/waiting_invitation_cubit.dart create mode 100644 lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart create mode 100644 lib/tools/async_transformer_cubit.dart create mode 100644 lib/tools/bloc_tools.dart create mode 100644 lib/tools/state_follower.dart create mode 100644 packages/veilid_support/lib/src/memory_tools.dart diff --git a/lib/chat/cubits/active_chat_cubit.dart b/lib/chat/cubits/active_chat_cubit.dart index 6215098..fa88d56 100644 --- a/lib/chat/cubits/active_chat_cubit.dart +++ b/lib/chat/cubits/active_chat_cubit.dart @@ -1,13 +1,12 @@ import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:veilid_support/veilid_support.dart'; -class ActiveChatCubit extends Cubit { - ActiveChatCubit(super.initialState, this.setHasActiveChat); +import '../../tools/tools.dart'; + +class ActiveChatCubit extends Cubit with BlocTools { + ActiveChatCubit(super.initialState); void setActiveChat(TypedKey? activeChatRemoteConversationRecordKey) { - setHasActiveChat(activeChatRemoteConversationRecordKey != null); emit(activeChatRemoteConversationRecordKey); } - - void Function(bool) setHasActiveChat; } diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index ba866c4..8e80e15 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -52,7 +52,7 @@ class ChatComponent extends StatelessWidget { if (contactList == null) { return debugPage('should always have a contact list here'); } - final avconversation = context.select?>( (x) => x.state[remoteConversationRecordKey]); if (avconversation == null) { diff --git a/lib/chat_list/cubits/active_conversation_messages_cubit.dart b/lib/chat_list/cubits/active_conversation_messages_cubit.dart index bb6209c..aad822d 100644 --- a/lib/chat_list/cubits/active_conversation_messages_cubit.dart +++ b/lib/chat_list/cubits/active_conversation_messages_cubit.dart @@ -8,7 +8,7 @@ import '../../account_manager/account_manager.dart'; import '../../chat/chat.dart'; import '../../proto/proto.dart' as proto; import '../../tools/tools.dart'; -import 'active_conversations_cubit.dart'; +import 'active_conversations_bloc_map_cubit.dart'; class ActiveConversationMessagesCubit extends BlocMapCubit>, MessagesCubit> { diff --git a/lib/chat_list/cubits/active_conversations_cubit.dart b/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart similarity index 94% rename from lib/chat_list/cubits/active_conversations_cubit.dart rename to lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart index dccd9c9..946ec99 100644 --- a/lib/chat_list/cubits/active_conversations_cubit.dart +++ b/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart @@ -32,9 +32,10 @@ typedef ActiveConversationsBlocMapState // Map of remoteConversationRecordKey to ActiveConversationCubit // Wraps a conversation cubit to only expose completely built conversations -class ActiveConversationsCubit extends BlocMapCubit, ActiveConversationCubit> { - ActiveConversationsCubit({required ActiveAccountInfo activeAccountInfo}) + ActiveConversationsBlocMapCubit( + {required ActiveAccountInfo activeAccountInfo}) : _activeAccountInfo = activeAccountInfo; // Add an active conversation to be tracked for changes diff --git a/lib/chat_list/cubits/cubits.dart b/lib/chat_list/cubits/cubits.dart index 474f5cb..7ff0db1 100644 --- a/lib/chat_list/cubits/cubits.dart +++ b/lib/chat_list/cubits/cubits.dart @@ -1,3 +1,3 @@ export 'active_conversation_messages_cubit.dart'; -export 'active_conversations_cubit.dart'; +export 'active_conversations_bloc_map_cubit.dart'; export 'chat_list_cubit.dart'; diff --git a/lib/chat_list/views/chat_single_contact_item_widget.dart b/lib/chat_list/views/chat_single_contact_item_widget.dart index 64f5a1f..5bb39ee 100644 --- a/lib/chat_list/views/chat_single_contact_item_widget.dart +++ b/lib/chat_list/views/chat_single_contact_item_widget.dart @@ -69,7 +69,7 @@ class ChatSingleContactItemWidget extends StatelessWidget { child: ListTile( onTap: () { final activeConversationsCubit = - context.read(); + context.read(); singleFuture(activeChatCubit, () async { await activeConversationsCubit.addConversation( contact: _contact); diff --git a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart index 9c484a7..429be53 100644 --- a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart +++ b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart @@ -5,9 +5,7 @@ import 'package:flutter/foundation.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; -import '../../contacts/contacts.dart'; import '../../proto/proto.dart' as proto; -import '../../tools/tools.dart'; import '../models/models.dart'; ////////////////////////////////////////////////// @@ -22,12 +20,6 @@ typedef GetEncryptionKeyCallback = Future Function( EncryptionKeyType encryptionKeyType, Uint8List encryptedSecret); -@immutable -class InvitationStatus { - const InvitationStatus({required this.acceptedContact}); - final AcceptedContact? acceptedContact; -} - ////////////////////////////////////////////////// ////////////////////////////////////////////////// @@ -271,109 +263,109 @@ class ContactInvitationListCubit return out; } - Future checkInvitationStatus( - {required proto.ContactInvitationRecord contactInvitationRecord}) async { - // Open the contact request inbox - try { - final pool = DHTRecordPool.instance; - final accountRecordKey = _activeAccountInfo - .userLogin.accountRecordInfo.accountRecord.recordKey; - final writerKey = contactInvitationRecord.writerKey.toVeilid(); - final writerSecret = contactInvitationRecord.writerSecret.toVeilid(); - final recordKey = - contactInvitationRecord.contactRequestInbox.recordKey.toVeilid(); - final writer = TypedKeyPair( - kind: recordKey.kind, key: writerKey, secret: writerSecret); - final acceptReject = await (await pool.openRead(recordKey, - crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer), - parent: accountRecordKey, - defaultSubkey: 1)) - .scope((contactRequestInbox) async { - // - final signedContactResponse = await contactRequestInbox.getProtobuf( - proto.SignedContactResponse.fromBuffer, - forceRefresh: true); - if (signedContactResponse == null) { - return null; - } + // Future checkInvitationStatus( + // {required proto.ContactInvitationRecord contactInvitationRecord}) async { + // // Open the contact request inbox + // try { + // final pool = DHTRecordPool.instance; + // final accountRecordKey = _activeAccountInfo + // .userLogin.accountRecordInfo.accountRecord.recordKey; + // final writerKey = contactInvitationRecord.writerKey.toVeilid(); + // final writerSecret = contactInvitationRecord.writerSecret.toVeilid(); + // final recordKey = + // contactInvitationRecord.contactRequestInbox.recordKey.toVeilid(); + // final writer = TypedKeyPair( + // kind: recordKey.kind, key: writerKey, secret: writerSecret); + // final acceptReject = await (await pool.openRead(recordKey, + // crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer), + // parent: accountRecordKey, + // defaultSubkey: 1)) + // .scope((contactRequestInbox) async { + // // + // final signedContactResponse = await contactRequestInbox.getProtobuf( + // proto.SignedContactResponse.fromBuffer, + // forceRefresh: true); + // if (signedContactResponse == null) { + // return null; + // } - final contactResponseBytes = - Uint8List.fromList(signedContactResponse.contactResponse); - final contactResponse = - proto.ContactResponse.fromBuffer(contactResponseBytes); - final contactIdentityMasterRecordKey = - contactResponse.identityMasterRecordKey.toVeilid(); - final cs = await pool.veilid.getCryptoSystem(recordKey.kind); + // final contactResponseBytes = + // Uint8List.fromList(signedContactResponse.contactResponse); + // final contactResponse = + // proto.ContactResponse.fromBuffer(contactResponseBytes); + // final contactIdentityMasterRecordKey = + // contactResponse.identityMasterRecordKey.toVeilid(); + // final cs = await pool.veilid.getCryptoSystem(recordKey.kind); - // Fetch the remote contact's account master - final contactIdentityMaster = await openIdentityMaster( - identityMasterRecordKey: contactIdentityMasterRecordKey); + // // Fetch the remote contact's account master + // final contactIdentityMaster = await openIdentityMaster( + // identityMasterRecordKey: contactIdentityMasterRecordKey); - // Verify - final signature = signedContactResponse.identitySignature.toVeilid(); - await cs.verify(contactIdentityMaster.identityPublicKey, - contactResponseBytes, signature); + // // Verify + // final signature = signedContactResponse.identitySignature.toVeilid(); + // await cs.verify(contactIdentityMaster.identityPublicKey, + // contactResponseBytes, signature); - // Check for rejection - if (!contactResponse.accept) { - return const InvitationStatus(acceptedContact: null); - } + // // Check for rejection + // if (!contactResponse.accept) { + // return const InvitationStatus(acceptedContact: null); + // } - // Pull profile from remote conversation key - final remoteConversationRecordKey = - contactResponse.remoteConversationRecordKey.toVeilid(); + // // Pull profile from remote conversation key + // final remoteConversationRecordKey = + // contactResponse.remoteConversationRecordKey.toVeilid(); - final conversation = ConversationCubit( - activeAccountInfo: _activeAccountInfo, - remoteIdentityPublicKey: - contactIdentityMaster.identityPublicTypedKey(), - remoteConversationRecordKey: remoteConversationRecordKey); - await conversation.refresh(); + // final conversation = ConversationCubit( + // activeAccountInfo: _activeAccountInfo, + // remoteIdentityPublicKey: + // contactIdentityMaster.identityPublicTypedKey(), + // remoteConversationRecordKey: remoteConversationRecordKey); + // await conversation.refresh(); - final remoteConversation = - conversation.state.data?.value.remoteConversation; - if (remoteConversation == null) { - log.info('Remote conversation could not be read. Waiting...'); - return null; - } + // final remoteConversation = + // conversation.state.data?.value.remoteConversation; + // if (remoteConversation == null) { + // log.info('Remote conversation could not be read. Waiting...'); + // return null; + // } - // Complete the local conversation now that we have the remote profile - final localConversationRecordKey = - contactInvitationRecord.localConversationRecordKey.toVeilid(); - return conversation.initLocalConversation( - existingConversationRecordKey: localConversationRecordKey, - profile: _account.profile, - // ignore: prefer_expression_function_bodies - callback: (localConversation) async { - return InvitationStatus( - acceptedContact: AcceptedContact( - remoteProfile: remoteConversation.profile, - remoteIdentity: contactIdentityMaster, - remoteConversationRecordKey: remoteConversationRecordKey, - localConversationRecordKey: localConversationRecordKey)); - }); - }); + // // Complete the local conversation now that we have the remote profile + // final localConversationRecordKey = + // contactInvitationRecord.localConversationRecordKey.toVeilid(); + // return conversation.initLocalConversation( + // existingConversationRecordKey: localConversationRecordKey, + // profile: _account.profile, + // // ignore: prefer_expression_function_bodies + // callback: (localConversation) async { + // return InvitationStatus( + // acceptedContact: AcceptedContact( + // remoteProfile: remoteConversation.profile, + // remoteIdentity: contactIdentityMaster, + // remoteConversationRecordKey: remoteConversationRecordKey, + // localConversationRecordKey: localConversationRecordKey)); + // }); + // }); - if (acceptReject == null) { - return null; - } + // if (acceptReject == null) { + // return null; + // } - // Delete invitation and return the accepted or rejected contact - await deleteInvitation( - accepted: acceptReject.acceptedContact != null, - contactInvitationRecord: contactInvitationRecord); + // // Delete invitation and return the accepted or rejected contact + // await deleteInvitation( + // accepted: acceptReject.acceptedContact != null, + // contactInvitationRecord: contactInvitationRecord); - return acceptReject; - } on Exception catch (e) { - log.error('Exception in checkAcceptRejectContact: $e', e); + // return acceptReject; + // } on Exception catch (e) { + // log.error('Exception in checkInvitationStatus: $e', e); - // Attempt to clean up. All this needs better lifetime management - await deleteInvitation( - accepted: false, contactInvitationRecord: contactInvitationRecord); + // // Attempt to clean up. All this needs better lifetime management + // await deleteInvitation( + // accepted: false, contactInvitationRecord: contactInvitationRecord); - rethrow; - } - } + // rethrow; + // } + // } // final ActiveAccountInfo _activeAccountInfo; diff --git a/lib/contact_invitation/cubits/contact_request_inbox_cubit.dart b/lib/contact_invitation/cubits/contact_request_inbox_cubit.dart new file mode 100644 index 0000000..eea29ec --- /dev/null +++ b/lib/contact_invitation/cubits/contact_request_inbox_cubit.dart @@ -0,0 +1,151 @@ +import 'package:veilid_support/veilid_support.dart'; + +import '../../account_manager/account_manager.dart'; +import '../../proto/proto.dart' as proto; + +class ContactRequestInboxCubit + extends DefaultDHTRecordCubit { + ContactRequestInboxCubit( + {required this.activeAccountInfo, required this.contactInvitationRecord}) + : super( + open: () => _open( + activeAccountInfo: activeAccountInfo, + contactInvitationRecord: contactInvitationRecord), + decodeState: proto.SignedContactResponse.fromBuffer); + + ContactRequestInboxCubit.value( + {required super.record, + required this.activeAccountInfo, + required this.contactInvitationRecord}) + : super.value(decodeState: proto.SignedContactResponse.fromBuffer); + + static Future _open( + {required ActiveAccountInfo activeAccountInfo, + required proto.ContactInvitationRecord contactInvitationRecord}) async { + final pool = DHTRecordPool.instance; + final accountRecordKey = + activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey; + final writerKey = contactInvitationRecord.writerKey.toVeilid(); + final writerSecret = contactInvitationRecord.writerSecret.toVeilid(); + final recordKey = + contactInvitationRecord.contactRequestInbox.recordKey.toVeilid(); + final writer = TypedKeyPair( + kind: recordKey.kind, key: writerKey, secret: writerSecret); + return pool.openRead(recordKey, + crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer), + parent: accountRecordKey, + defaultSubkey: 1); + } + + final ActiveAccountInfo activeAccountInfo; + final proto.ContactInvitationRecord contactInvitationRecord; +} + // Future checkInvitationStatus( + // {}) async { + // // Open the contact request inbox + // try { + // final pool = DHTRecordPool.instance; + // final accountRecordKey = _activeAccountInfo + // .userLogin.accountRecordInfo.accountRecord.recordKey; + // final writerKey = contactInvitationRecord.writerKey.toVeilid(); + // final writerSecret = contactInvitationRecord.writerSecret.toVeilid(); + // final recordKey = + // contactInvitationRecord.contactRequestInbox.recordKey.toVeilid(); + // final writer = TypedKeyPair( + // kind: recordKey.kind, key: writerKey, secret: writerSecret); + // final acceptReject = await (await pool.openRead(recordKey, + // crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer), + // parent: accountRecordKey, + // defaultSubkey: 1)) + // .scope((contactRequestInbox) async { + // // + // final signedContactResponse = await contactRequestInbox.getProtobuf( + // proto.SignedContactResponse.fromBuffer, + // forceRefresh: true); + // if (signedContactResponse == null) { + // return null; + // } + + // final contactResponseBytes = + // Uint8List.fromList(signedContactResponse.contactResponse); + // final contactResponse = + // proto.ContactResponse.fromBuffer(contactResponseBytes); + // final contactIdentityMasterRecordKey = + // contactResponse.identityMasterRecordKey.toVeilid(); + // final cs = await pool.veilid.getCryptoSystem(recordKey.kind); + + // // Fetch the remote contact's account master + // final contactIdentityMaster = await openIdentityMaster( + // identityMasterRecordKey: contactIdentityMasterRecordKey); + + // // Verify + // final signature = signedContactResponse.identitySignature.toVeilid(); + // await cs.verify(contactIdentityMaster.identityPublicKey, + // contactResponseBytes, signature); + + // // Check for rejection + // if (!contactResponse.accept) { + // return const InvitationStatus(acceptedContact: null); + // } + + // // Pull profile from remote conversation key + // final remoteConversationRecordKey = + // contactResponse.remoteConversationRecordKey.toVeilid(); + + // final conversation = ConversationCubit( + // activeAccountInfo: _activeAccountInfo, + // remoteIdentityPublicKey: + // contactIdentityMaster.identityPublicTypedKey(), + // remoteConversationRecordKey: remoteConversationRecordKey); + // await conversation.refresh(); + + // final remoteConversation = + // conversation.state.data?.value.remoteConversation; + // if (remoteConversation == null) { + // log.info('Remote conversation could not be read. Waiting...'); + // return null; + // } + + // // Complete the local conversation now that we have the remote profile + // final localConversationRecordKey = + // contactInvitationRecord.localConversationRecordKey.toVeilid(); + // return conversation.initLocalConversation( + // existingConversationRecordKey: localConversationRecordKey, + // profile: _account.profile, + // // ignore: prefer_expression_function_bodies + // callback: (localConversation) async { + // return InvitationStatus( + // acceptedContact: AcceptedContact( + // remoteProfile: remoteConversation.profile, + // remoteIdentity: contactIdentityMaster, + // remoteConversationRecordKey: remoteConversationRecordKey, + // localConversationRecordKey: localConversationRecordKey)); + // }); + // }); + + // if (acceptReject == null) { + // return null; + // } + + // // Delete invitation and return the accepted or rejected contact + // await deleteInvitation( + // accepted: acceptReject.acceptedContact != null, + // contactInvitationRecord: contactInvitationRecord); + + // return acceptReject; + // } on Exception catch (e) { + // log.error('Exception in checkInvitationStatus: $e', e); + + // // Attempt to clean up. All this needs better lifetime management + // await deleteInvitation( + // accepted: false, contactInvitationRecord: contactInvitationRecord); + + // rethrow; + // } + + + + + + + diff --git a/lib/contact_invitation/cubits/cubits.dart b/lib/contact_invitation/cubits/cubits.dart index c16213a..c55e119 100644 --- a/lib/contact_invitation/cubits/cubits.dart +++ b/lib/contact_invitation/cubits/cubits.dart @@ -1 +1,4 @@ export 'contact_invitation_list_cubit.dart'; +export 'contact_request_inbox_cubit.dart'; +export 'waiting_invitation_cubit.dart'; +export 'waiting_invitations_bloc_map_cubit.dart'; diff --git a/lib/contact_invitation/cubits/waiting_invitation_cubit.dart b/lib/contact_invitation/cubits/waiting_invitation_cubit.dart new file mode 100644 index 0000000..44e0f36 --- /dev/null +++ b/lib/contact_invitation/cubits/waiting_invitation_cubit.dart @@ -0,0 +1,221 @@ +import 'dart:typed_data'; + +import 'package:async_tools/async_tools.dart'; +import 'package:equatable/equatable.dart'; +import 'package:meta/meta.dart'; +import 'package:veilid_support/veilid_support.dart'; + +import '../../account_manager/account_manager.dart'; +import '../../contacts/contacts.dart'; +import '../../proto/proto.dart' as proto; +import '../../tools/tools.dart'; +import '../models/accepted_contact.dart'; +import 'contact_request_inbox_cubit.dart'; + +@immutable +class InvitationStatus extends Equatable { + const InvitationStatus({required this.acceptedContact}); + final AcceptedContact? acceptedContact; + + @override + List get props => [acceptedContact]; +} + +class WaitingInvitationCubit extends AsyncTransformerCubit { + WaitingInvitationCubit(ContactRequestInboxCubit super.input, + {required ActiveAccountInfo activeAccountInfo, + required proto.Account account, + required proto.ContactInvitationRecord contactInvitationRecord}) + : super( + transform: (signedContactResponse) => _transform( + signedContactResponse, + activeAccountInfo: activeAccountInfo, + account: account, + contactInvitationRecord: contactInvitationRecord)); + + static Future> _transform( + proto.SignedContactResponse signedContactResponse, + {required ActiveAccountInfo activeAccountInfo, + required proto.Account account, + required proto.ContactInvitationRecord contactInvitationRecord}) async { + final pool = DHTRecordPool.instance; + final contactResponseBytes = + Uint8List.fromList(signedContactResponse.contactResponse); + final contactResponse = + proto.ContactResponse.fromBuffer(contactResponseBytes); + final contactIdentityMasterRecordKey = + contactResponse.identityMasterRecordKey.toVeilid(); + final cs = + await pool.veilid.getCryptoSystem(contactIdentityMasterRecordKey.kind); + + // Fetch the remote contact's account master + final contactIdentityMaster = await openIdentityMaster( + identityMasterRecordKey: contactIdentityMasterRecordKey); + + // Verify + final signature = signedContactResponse.identitySignature.toVeilid(); + await cs.verify(contactIdentityMaster.identityPublicKey, + contactResponseBytes, signature); + + // Check for rejection + if (!contactResponse.accept) { + // Rejection + return const AsyncValue.data(InvitationStatus(acceptedContact: null)); + } + + // Pull profile from remote conversation key + final remoteConversationRecordKey = + contactResponse.remoteConversationRecordKey.toVeilid(); + + final conversation = ConversationCubit( + activeAccountInfo: activeAccountInfo, + remoteIdentityPublicKey: contactIdentityMaster.identityPublicTypedKey(), + remoteConversationRecordKey: remoteConversationRecordKey); + + // wait for remote conversation for up to 20 seconds + proto.Conversation? remoteConversation; + var retryCount = 20; + do { + await conversation.refresh(); + remoteConversation = conversation.state.data?.value.remoteConversation; + if (remoteConversation != null) { + break; + } + log.info('Remote conversation could not be read. Waiting...'); + await Future.delayed(const Duration(seconds: 1)); + retryCount--; + } while (retryCount > 0); + if (remoteConversation == null) { + return AsyncValue.error('Invitation accept timed out.'); + } + + // Complete the local conversation now that we have the remote profile + final remoteProfile = remoteConversation.profile; + final localConversationRecordKey = + contactInvitationRecord.localConversationRecordKey.toVeilid(); + return conversation.initLocalConversation( + existingConversationRecordKey: localConversationRecordKey, + profile: account.profile, + // ignore: prefer_expression_function_bodies + callback: (localConversation) async { + return AsyncValue.data(InvitationStatus( + acceptedContact: AcceptedContact( + remoteProfile: remoteProfile, + remoteIdentity: contactIdentityMaster, + remoteConversationRecordKey: remoteConversationRecordKey, + localConversationRecordKey: localConversationRecordKey))); + }); + } +} + + + // Future checkInvitationStatus( + // {}) async { + // // Open the contact request inbox + // try { + // final pool = DHTRecordPool.instance; + // final accountRecordKey = _activeAccountInfo + // .userLogin.accountRecordInfo.accountRecord.recordKey; + // final writerKey = contactInvitationRecord.writerKey.toVeilid(); + // final writerSecret = contactInvitationRecord.writerSecret.toVeilid(); + // final recordKey = + // contactInvitationRecord.contactRequestInbox.recordKey.toVeilid(); + // final writer = TypedKeyPair( + // kind: recordKey.kind, key: writerKey, secret: writerSecret); + // final acceptReject = await (await pool.openRead(recordKey, + // crypto: await DHTRecordCryptoPrivate.fromTypedKeyPair(writer), + // parent: accountRecordKey, + // defaultSubkey: 1)) + // .scope((contactRequestInbox) async { + // // + // final signedContactResponse = await contactRequestInbox.getProtobuf( + // proto.SignedContactResponse.fromBuffer, + // forceRefresh: true); + // if (signedContactResponse == null) { + // return null; + // } + + // final contactResponseBytes = + // Uint8List.fromList(signedContactResponse.contactResponse); + // final contactResponse = + // proto.ContactResponse.fromBuffer(contactResponseBytes); + // final contactIdentityMasterRecordKey = + // contactResponse.identityMasterRecordKey.toVeilid(); + // final cs = await pool.veilid.getCryptoSystem(recordKey.kind); + + // // Fetch the remote contact's account master + // final contactIdentityMaster = await openIdentityMaster( + // identityMasterRecordKey: contactIdentityMasterRecordKey); + + // // Verify + // final signature = signedContactResponse.identitySignature.toVeilid(); + // await cs.verify(contactIdentityMaster.identityPublicKey, + // contactResponseBytes, signature); + + // // Check for rejection + // if (!contactResponse.accept) { + // return const InvitationStatus(acceptedContact: null); + // } + + // // Pull profile from remote conversation key + // final remoteConversationRecordKey = + // contactResponse.remoteConversationRecordKey.toVeilid(); + + // final conversation = ConversationCubit( + // activeAccountInfo: _activeAccountInfo, + // remoteIdentityPublicKey: + // contactIdentityMaster.identityPublicTypedKey(), + // remoteConversationRecordKey: remoteConversationRecordKey); + // await conversation.refresh(); + + // final remoteConversation = + // conversation.state.data?.value.remoteConversation; + // if (remoteConversation == null) { + // log.info('Remote conversation could not be read. Waiting...'); + // return null; + // } + + // // Complete the local conversation now that we have the remote profile + // final localConversationRecordKey = + // contactInvitationRecord.localConversationRecordKey.toVeilid(); + // return conversation.initLocalConversation( + // existingConversationRecordKey: localConversationRecordKey, + // profile: _account.profile, + // // ignore: prefer_expression_function_bodies + // callback: (localConversation) async { + // return InvitationStatus( + // acceptedContact: AcceptedContact( + // remoteProfile: remoteConversation.profile, + // remoteIdentity: contactIdentityMaster, + // remoteConversationRecordKey: remoteConversationRecordKey, + // localConversationRecordKey: localConversationRecordKey)); + // }); + // }); + + // if (acceptReject == null) { + // return null; + // } + + // // Delete invitation and return the accepted or rejected contact + // await deleteInvitation( + // accepted: acceptReject.acceptedContact != null, + // contactInvitationRecord: contactInvitationRecord); + + // return acceptReject; + // } on Exception catch (e) { + // log.error('Exception in checkInvitationStatus: $e', e); + + // // Attempt to clean up. All this needs better lifetime management + // await deleteInvitation( + // accepted: false, contactInvitationRecord: contactInvitationRecord); + + // rethrow; + // } + + + + + + + diff --git a/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart b/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart new file mode 100644 index 0000000..812537e --- /dev/null +++ b/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart @@ -0,0 +1,58 @@ +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:veilid_support/veilid_support.dart'; + +import '../../account_manager/account_manager.dart'; +import '../../proto/proto.dart' as proto; +import '../../tools/tools.dart'; +import 'cubits.dart'; + +typedef WaitingInvitationsBlocMapState + = BlocMapState>; + +// Map of contactInvitationListRecordKey to WaitingInvitationCubit +// Wraps a contact invitation cubit to watch for accept/reject +// Automatically follows the state of a ContactInvitiationListCubit. +class WaitingInvitationsBlocMapCubit extends BlocMapCubit, WaitingInvitationCubit> + with + StateFollower>, + TypedKey, proto.ContactInvitationRecord> { + WaitingInvitationsBlocMapCubit( + {required this.activeAccountInfo, required this.account}); + Future addWaitingInvitation( + {required proto.ContactInvitationRecord + contactInvitationRecord}) async => + add(() => MapEntry( + contactInvitationRecord.contactRequestInbox.recordKey.toVeilid(), + WaitingInvitationCubit( + ContactRequestInboxCubit( + activeAccountInfo: activeAccountInfo, + contactInvitationRecord: contactInvitationRecord), + activeAccountInfo: activeAccountInfo, + account: account, + contactInvitationRecord: contactInvitationRecord))); + + final ActiveAccountInfo activeAccountInfo; + final proto.Account account; + + /// StateFollower ///////////////////////// + @override + IMap getStateMap( + AsyncValue> avstate) { + final state = avstate.data?.value; + if (state == null) { + return IMap(); + } + return IMap.fromIterable(state, + keyMapper: (e) => e.contactRequestInbox.recordKey.toVeilid(), + valueMapper: (e) => e); + } + + @override + Future removeFromState(TypedKey key) => remove(key); + + @override + Future updateState(TypedKey key, proto.ContactInvitationRecord value) => + addWaitingInvitation(contactInvitationRecord: value); +} diff --git a/lib/contact_invitation/models/accepted_contact.dart b/lib/contact_invitation/models/accepted_contact.dart index 4623b60..ac8edc2 100644 --- a/lib/contact_invitation/models/accepted_contact.dart +++ b/lib/contact_invitation/models/accepted_contact.dart @@ -1,10 +1,11 @@ +import 'package:equatable/equatable.dart'; import 'package:meta/meta.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../proto/proto.dart' as proto; @immutable -class AcceptedContact { +class AcceptedContact extends Equatable { const AcceptedContact({ required this.remoteProfile, required this.remoteIdentity, @@ -16,4 +17,12 @@ class AcceptedContact { final IdentityMaster remoteIdentity; final TypedKey remoteConversationRecordKey; final TypedKey localConversationRecordKey; + + @override + List get props => [ + remoteProfile, + remoteIdentity, + remoteConversationRecordKey, + localConversationRecordKey + ]; } diff --git a/lib/contacts/views/contact_item_widget.dart b/lib/contacts/views/contact_item_widget.dart index 8823150..864b9ab 100644 --- a/lib/contacts/views/contact_item_widget.dart +++ b/lib/contacts/views/contact_item_widget.dart @@ -80,20 +80,6 @@ class ContactItemWidget extends StatelessWidget { await MainPager.of(context)?.pageController.animateToPage(1, duration: 250.ms, curve: Curves.easeInOut); } - - // // ignore: use_build_context_synchronously - // if (!context.mounted) { - // return; - // } - // await showDialog( - // context: context, - // builder: (context) => ContactInvitationDisplayDialog( - // name: activeAccountInfo.localAccount.name, - // message: contactInvitationRecord.message, - // generator: Uint8List.fromList( - // contactInvitationRecord.invitation), - // )); - // } }, title: Text(contact.editedProfile.name), subtitle: (contact.editedProfile.pronouns.isNotEmpty) @@ -101,7 +87,6 @@ class ContactItemWidget extends StatelessWidget { : null, iconColor: scale.tertiaryScale.background, textColor: scale.tertiaryScale.text, - //Text(Timestamp.fromInt64(contactInvitationRecord.expiration) / ), leading: const Icon(Icons.person)))); } diff --git a/lib/layout/home/home_account_ready/home_account_ready_shell.dart b/lib/layout/home/home_account_ready/home_account_ready_shell.dart index 03fb82e..2434003 100644 --- a/lib/layout/home/home_account_ready/home_account_ready_shell.dart +++ b/lib/layout/home/home_account_ready/home_account_ready_shell.dart @@ -61,11 +61,16 @@ class HomeAccountReadyShellState extends State { activeAccountInfo: activeAccountInfo, account: account)), BlocProvider( - create: (context) => ActiveConversationsCubit( + create: (context) => ActiveConversationsBlocMapCubit( activeAccountInfo: activeAccountInfo)), BlocProvider( - create: (context) => - ActiveChatCubit(null, routerCubit.setHasActiveChat)) + create: (context) => ActiveChatCubit(null) + ..withStateListen((event) { + routerCubit.setHasActiveChat(event != null); + })), + BlocProvider( + create: (context) => WaitingInvitationsBlocMapCubit( + activeAccountInfo: activeAccountInfo, account: account)) ], child: widget.child); }))); } diff --git a/lib/tools/async_transformer_cubit.dart b/lib/tools/async_transformer_cubit.dart new file mode 100644 index 0000000..9ce4bf1 --- /dev/null +++ b/lib/tools/async_transformer_cubit.dart @@ -0,0 +1,62 @@ +import 'dart:async'; + +import 'package:async_tools/async_tools.dart'; +import 'package:bloc/bloc.dart'; + +class AsyncTransformerCubit extends Cubit> { + AsyncTransformerCubit(this.input, {required this.transform}) + : super(const AsyncValue.loading()) { + _asyncTransform(input.state); + _subscription = input.stream.listen(_asyncTransform); + } + void _asyncTransform(AsyncValue newInputState) { + // Use a singlefuture here to ensure we get dont lose any updates + // If the input stream gives us an update while we are + // still processing the last update, the most recent input state will + // be saved and processed eventually. + singleFuture(this, () async { + var newState = newInputState; + var done = false; + while (!done) { + // Emit the transformed state + try { + if (newState is AsyncLoading) { + return AsyncValue.loading(); + } + if (newState is AsyncError) { + final newStateError = newState as AsyncError; + return AsyncValue.error( + newStateError.error, newStateError.stackTrace); + } + final transformedState = await transform(newState.data!.value); + emit(transformedState); + } on Exception catch (e, st) { + emit(AsyncValue.error(e, st)); + } + // See if there's another state change to process + final next = _nextInputState; + _nextInputState = null; + if (next != null) { + newState = next; + } else { + done = true; + } + } + }, onBusy: () { + // Keep this state until we process again + _nextInputState = newInputState; + }); + } + + @override + Future close() async { + await _subscription.cancel(); + await input.close(); + await super.close(); + } + + Cubit> input; + AsyncValue? _nextInputState; + Future> Function(S) transform; + late final StreamSubscription> _subscription; +} diff --git a/lib/tools/bloc_map_cubit.dart b/lib/tools/bloc_map_cubit.dart index 04d7693..2553c66 100644 --- a/lib/tools/bloc_map_cubit.dart +++ b/lib/tools/bloc_map_cubit.dart @@ -12,6 +12,14 @@ class _ItemEntry { final StreamSubscription subscription; } +// Streaming container cubit that is a map from some immutable key +// to a some other cubit's output state. Output state for this container +// cubit is an immutable map of the key to the output state of the contained +// cubits. +// +// K = Key type for the bloc map, used to look up some mapped cubit +// S = State type for the value, keys will look up values of this type +// B = Bloc/cubit type for the value, output states of type S abstract class BlocMapCubit> extends Cubit> { BlocMapCubit() diff --git a/lib/tools/bloc_tools.dart b/lib/tools/bloc_tools.dart new file mode 100644 index 0000000..44940da --- /dev/null +++ b/lib/tools/bloc_tools.dart @@ -0,0 +1,12 @@ +import 'package:bloc/bloc.dart'; + +mixin BlocTools on BlocBase { + void withStateListen(void Function(State event)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + if (onData != null) { + onData(state); + } + stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } +} diff --git a/lib/tools/state_follower.dart b/lib/tools/state_follower.dart new file mode 100644 index 0000000..cf1ab9a --- /dev/null +++ b/lib/tools/state_follower.dart @@ -0,0 +1,78 @@ +import 'dart:async'; + +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; + +// Mixin that automatically keeps two blocs/cubits in sync with each other +// Useful for having a BlocMapCubit 'follow' the state of another input cubit. +// As the state of the input cubit changes, the BlocMapCubit can add/remove +// mapped Cubits that automatically process the input state reactively. +// +// S = Input state type +// K = Key derived from elements of input state +// V = Value derived from elements of input state +abstract mixin class StateFollower { + void follow({ + required S initialInputState, + required Stream stream, + }) { + // + _lastInputStateMap = getStateMap(initialInputState); + _subscription = stream.listen(_updateFollow); + } + + Future close() async { + await _subscription.cancel(); + } + + IMap getStateMap(S state); + Future removeFromState(K key); + Future updateState(K key, V value); + + void _updateFollow(S newInputState) { + // Use a singlefuture here to ensure we get dont lose any updates + // If the input stream gives us an update while we are + // still processing the last update, the most recent input state will + // be saved and processed eventually. + final newInputStateMap = getStateMap(newInputState); + + singleFuture(this, () async { + var newStateMap = newInputStateMap; + var done = false; + while (!done) { + for (final k in _lastInputStateMap.keys) { + if (!newStateMap.containsKey(k)) { + // deleted + await removeFromState(k); + } + } + for (final newEntry in newStateMap.entries) { + final v = _lastInputStateMap.get(newEntry.key); + if (v == null || v != newEntry.value) { + // added or changed + await updateState(newEntry.key, newEntry.value); + } + } + + // Keep this state map for the next time + _lastInputStateMap = newStateMap; + + // See if there's another state change to process + final next = _nextInputStateMap; + _nextInputStateMap = null; + if (next != null) { + newStateMap = next; + } else { + done = true; + } + } + }, onBusy: () { + // Keep this state until we process again + _nextInputStateMap = newInputStateMap; + }); + } + + late IMap _lastInputStateMap; + IMap? _nextInputStateMap; + late final StreamSubscription _subscription; +} diff --git a/lib/tools/tools.dart b/lib/tools/tools.dart index 35792b8..4c9cf07 100644 --- a/lib/tools/tools.dart +++ b/lib/tools/tools.dart @@ -1,5 +1,7 @@ export 'animations.dart'; +export 'async_transformer_cubit.dart'; export 'bloc_map_cubit.dart'; +export 'bloc_tools.dart'; export 'enter_password.dart'; export 'enter_pin.dart'; export 'future_cubit.dart'; @@ -8,6 +10,7 @@ export 'phono_byte.dart'; export 'responsive.dart'; export 'scanner_error_widget.dart'; export 'shared_preferences.dart'; +export 'state_follower.dart'; export 'state_logger.dart'; export 'stream_listenable.dart'; export 'stream_wrapper_cubit.dart'; diff --git a/packages/veilid_support/lib/src/memory_tools.dart b/packages/veilid_support/lib/src/memory_tools.dart new file mode 100644 index 0000000..08aa8dc --- /dev/null +++ b/packages/veilid_support/lib/src/memory_tools.dart @@ -0,0 +1,72 @@ +import 'dart:math'; +import 'dart:typed_data'; + +/// Compares two [Uint8List] contents for equality by comparing words at a time. +/// Returns true if this == other +extension Uint8ListCompare on Uint8List { + bool equals(Uint8List other) { + if (identical(this, other)) { + return true; + } + if (length != other.length) { + return false; + } + + final words = buffer.asUint32List(); + final otherwords = other.buffer.asUint32List(); + final wordLen = words.length; + + var i = 0; + for (; i < wordLen; i++) { + if (words[i] != otherwords[i]) { + break; + } + } + i <<= 2; + for (; i < length; i++) { + if (this[i] != other[i]) { + return false; + } + } + return true; + } + + /// Compares two [Uint8List] contents for + /// numeric ordering by comparing words at a time. + /// Returns -1 for this < other, 1 for this > other, and 0 for this == other. + int compare(Uint8List other) { + if (identical(this, other)) { + return 0; + } + + final words = buffer.asUint32List(); + final otherwords = other.buffer.asUint32List(); + final minWordLen = min(words.length, otherwords.length); + + var i = 0; + for (; i < minWordLen; i++) { + if (words[i] != otherwords[i]) { + break; + } + } + i <<= 2; + final minLen = min(length, other.length); + for (; i < minLen; i++) { + final a = this[i]; + final b = other[i]; + if (a < b) { + return -1; + } + if (a > b) { + return 1; + } + } + if (length < other.length) { + return -1; + } + if (length > other.length) { + return 1; + } + return 0; + } +}