diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index 8e80e15..6e16276 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -79,7 +79,8 @@ class ChatComponent extends StatelessWidget { // Get the messages to display // and ensure it is safe to operate() on the MessageCubit for this chat - final avmessages = context.select>?>( (x) => x.state[remoteConversationRecordKey]); if (avmessages == null) { @@ -117,7 +118,7 @@ class ChatComponent extends StatelessWidget { if (message.text.isEmpty) { return; } - await context.read().operate( + await context.read().operate( _remoteConversationRecordKey, closure: (messagesCubit) => messagesCubit.addMessage(message: message)); } diff --git a/lib/chat_list/cubits/active_conversation_messages_bloc_map_cubit.dart b/lib/chat_list/cubits/active_conversation_messages_bloc_map_cubit.dart new file mode 100644 index 0000000..a906bfc --- /dev/null +++ b/lib/chat_list/cubits/active_conversation_messages_bloc_map_cubit.dart @@ -0,0 +1,68 @@ +import 'dart:async'; + +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:veilid_support/veilid_support.dart'; + +import '../../account_manager/account_manager.dart'; +import '../../chat/chat.dart'; +import '../../proto/proto.dart' as proto; +import '../../tools/tools.dart'; +import 'active_conversations_bloc_map_cubit.dart'; + +// Map of remoteConversationRecordKey to MessagesCubit +// Wraps a MessagesCubit to stream the latest messages to the state +// Automatically follows the state of a ActiveConversationsBlocMapCubit. +class ActiveConversationMessagesBlocMapCubit extends BlocMapCubit>, MessagesCubit> + with + StateFollower> { + ActiveConversationMessagesBlocMapCubit({ + required ActiveAccountInfo activeAccountInfo, + }) : _activeAccountInfo = activeAccountInfo; + + Future _addConversationMessages( + {required proto.Contact contact, + required proto.Conversation localConversation, + required proto.Conversation remoteConversation}) async => + add(() => MapEntry( + contact.remoteConversationRecordKey.toVeilid(), + MessagesCubit( + activeAccountInfo: _activeAccountInfo, + remoteIdentityPublicKey: contact.identityPublicKey.toVeilid(), + localConversationRecordKey: + contact.localConversationRecordKey.toVeilid(), + remoteConversationRecordKey: + contact.remoteConversationRecordKey.toVeilid(), + localMessagesRecordKey: localConversation.messages.toVeilid(), + remoteMessagesRecordKey: + remoteConversation.messages.toVeilid()))); + + /// StateFollower ///////////////////////// + + @override + IMap> getStateMap( + ActiveConversationsBlocMapState state) => + state; + + @override + Future removeFromState(TypedKey key) => remove(key); + + @override + Future updateState( + TypedKey key, AsyncValue value) async { + await value.when( + data: (state) => _addConversationMessages( + contact: state.contact, + localConversation: state.localConversation, + remoteConversation: state.remoteConversation), + loading: () => addState(key, const AsyncValue.loading()), + error: (error, stackTrace) => + addState(key, AsyncValue.error(error, stackTrace))); + } + + //// + + final ActiveAccountInfo _activeAccountInfo; +} diff --git a/lib/chat_list/cubits/active_conversation_messages_cubit.dart b/lib/chat_list/cubits/active_conversation_messages_cubit.dart deleted file mode 100644 index aad822d..0000000 --- a/lib/chat_list/cubits/active_conversation_messages_cubit.dart +++ /dev/null @@ -1,113 +0,0 @@ -import 'dart:async'; - -import 'package:async_tools/async_tools.dart'; -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:veilid_support/veilid_support.dart'; - -import '../../account_manager/account_manager.dart'; -import '../../chat/chat.dart'; -import '../../proto/proto.dart' as proto; -import '../../tools/tools.dart'; -import 'active_conversations_bloc_map_cubit.dart'; - -class ActiveConversationMessagesCubit extends BlocMapCubit>, MessagesCubit> { - ActiveConversationMessagesCubit({ - required ActiveAccountInfo activeAccountInfo, - required Stream stream, - }) : _activeAccountInfo = activeAccountInfo { - // - _subscription = stream.listen(updateMessageCubits); - } - - @override - Future close() async { - await _subscription.cancel(); - await super.close(); - } - - // Determine which conversations have been added, deleted, or changed - // and update this cubit's state appropriately - void updateMessageCubits(ActiveConversationsBlocMapState newInputState) { - // Use a singlefuture here to ensure we get dont lose any updates - // If the ActiveConversations stream gives us an update while we are - // still processing the last update, the most recent input state will - // be saved and processed eventually. - singleFuture(this, () async { - var newActiveConversationsState = newInputState; - var done = false; - while (!done) { - // Build lists of changes to conversations - final deleted = _lastActiveConversationsState.keys - .where((k) => !newActiveConversationsState.containsKey(k)); - final added = newActiveConversationsState.keys - .where((k) => !_lastActiveConversationsState.containsKey(k)); - final changed = _lastActiveConversationsState.where((k, v) { - final nv = newActiveConversationsState[k]; - if (nv == null) { - return false; - } - return nv != v; - }).keys; - - // Process all deleted conversations - for (final d in deleted) { - await remove(d); - } - - // Process all added and changed conversations - for (final a in [...added, ...changed]) { - final av = newActiveConversationsState[a]!; - await av.when( - data: (state) => _addConversationMessages( - contact: state.contact, - localConversation: state.localConversation, - remoteConversation: state.remoteConversation), - loading: () => addState(a, const AsyncValue.loading()), - error: (error, stackTrace) => - addState(a, AsyncValue.error(error, stackTrace))); - } - - // Keep this state for the next time - _lastActiveConversationsState = newActiveConversationsState; - - // See if there's another state change to process - final next = _nextActiveConversationsState; - _nextActiveConversationsState = null; - if (next != null) { - newActiveConversationsState = next; - } else { - done = true; - } - } - }, onBusy: () { - // Keep this state until we process again - _nextActiveConversationsState = newInputState; - }); - } - - Future _addConversationMessages( - {required proto.Contact contact, - required proto.Conversation localConversation, - required proto.Conversation remoteConversation}) async => - add(() => MapEntry( - contact.remoteConversationRecordKey.toVeilid(), - MessagesCubit( - activeAccountInfo: _activeAccountInfo, - remoteIdentityPublicKey: contact.identityPublicKey.toVeilid(), - localConversationRecordKey: - contact.localConversationRecordKey.toVeilid(), - remoteConversationRecordKey: - contact.remoteConversationRecordKey.toVeilid(), - localMessagesRecordKey: localConversation.messages.toVeilid(), - remoteMessagesRecordKey: - remoteConversation.messages.toVeilid()))); - - //// - - final ActiveAccountInfo _activeAccountInfo; - ActiveConversationsBlocMapState _lastActiveConversationsState = - ActiveConversationsBlocMapState(); - ActiveConversationsBlocMapState? _nextActiveConversationsState; - late final StreamSubscription _subscription; -} diff --git a/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart b/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart index 946ec99..78bf8ff 100644 --- a/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart +++ b/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart @@ -1,5 +1,6 @@ import 'package:async_tools/async_tools.dart'; import 'package:equatable/equatable.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:meta/meta.dart'; import 'package:veilid_support/veilid_support.dart'; @@ -32,11 +33,15 @@ typedef ActiveConversationsBlocMapState // Map of remoteConversationRecordKey to ActiveConversationCubit // Wraps a conversation cubit to only expose completely built conversations +// Automatically follows the state of a ChatListCubit. class ActiveConversationsBlocMapCubit extends BlocMapCubit, ActiveConversationCubit> { + AsyncValue, ActiveConversationCubit> + with StateFollower>, TypedKey, proto.Chat> { ActiveConversationsBlocMapCubit( - {required ActiveAccountInfo activeAccountInfo}) - : _activeAccountInfo = activeAccountInfo; + {required ActiveAccountInfo activeAccountInfo, + required ContactListCubit contactListCubit}) + : _activeAccountInfo = activeAccountInfo, + _contactListCubit = contactListCubit; // Add an active conversation to be tracked for changes Future addConversation({required proto.Contact contact}) async => @@ -65,5 +70,41 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit getStateMap(AsyncValue> state) { + final stateValue = state.data?.value; + if (stateValue == null) { + return IMap(); + } + return IMap.fromIterable(stateValue, + keyMapper: (e) => e.remoteConversationKey.toVeilid(), + valueMapper: (e) => e); + } + + @override + Future removeFromState(TypedKey key) => remove(key); + + @override + Future updateState(TypedKey key, proto.Chat value) async { + final contactList = _contactListCubit.state.data?.value; + if (contactList == null) { + await addState(key, const AsyncValue.loading()); + return; + } + final contactIndex = contactList + .indexWhere((c) => c.remoteConversationRecordKey.toVeilid() == key); + if (contactIndex == -1) { + await addState(key, AsyncValue.error('Contact not found for chat')); + return; + } + final contact = contactList[contactIndex]; + await addConversation(contact: contact); + } + + //// + final ActiveAccountInfo _activeAccountInfo; + final ContactListCubit _contactListCubit; } diff --git a/lib/chat_list/cubits/cubits.dart b/lib/chat_list/cubits/cubits.dart index 7ff0db1..0f099ca 100644 --- a/lib/chat_list/cubits/cubits.dart +++ b/lib/chat_list/cubits/cubits.dart @@ -1,3 +1,3 @@ -export 'active_conversation_messages_cubit.dart'; +export 'active_conversation_messages_bloc_map_cubit.dart'; export 'active_conversations_bloc_map_cubit.dart'; export 'chat_list_cubit.dart'; diff --git a/lib/chat_list/views/chat_single_contact_item_widget.dart b/lib/chat_list/views/chat_single_contact_item_widget.dart index 5bb39ee..7d64e43 100644 --- a/lib/chat_list/views/chat_single_contact_item_widget.dart +++ b/lib/chat_list/views/chat_single_contact_item_widget.dart @@ -68,11 +68,7 @@ class ChatSingleContactItemWidget extends StatelessWidget { // component is not dragged. child: ListTile( onTap: () { - final activeConversationsCubit = - context.read(); singleFuture(activeChatCubit, () async { - await activeConversationsCubit.addConversation( - contact: _contact); activeChatCubit.setActiveChat(remoteConversationRecordKey); }); }, diff --git a/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart b/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart index 812537e..988c23b 100644 --- a/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart +++ b/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart @@ -12,7 +12,7 @@ typedef WaitingInvitationsBlocMapState // Map of contactInvitationListRecordKey to WaitingInvitationCubit // Wraps a contact invitation cubit to watch for accept/reject -// Automatically follows the state of a ContactInvitiationListCubit. +// Automatically follows the state of a ContactInvitationListCubit. class WaitingInvitationsBlocMapCubit extends BlocMapCubit, WaitingInvitationCubit> with @@ -33,18 +33,15 @@ class WaitingInvitationsBlocMapCubit extends BlocMapCubit getStateMap( - AsyncValue> avstate) { - final state = avstate.data?.value; - if (state == null) { + AsyncValue> state) { + final stateValue = state.data?.value; + if (stateValue == null) { return IMap(); } - return IMap.fromIterable(state, + return IMap.fromIterable(stateValue, keyMapper: (e) => e.contactRequestInbox.recordKey.toVeilid(), valueMapper: (e) => e); } @@ -55,4 +52,8 @@ class WaitingInvitationsBlocMapCubit extends BlocMapCubit updateState(TypedKey key, proto.ContactInvitationRecord value) => addWaitingInvitation(contactInvitationRecord: value); + + //// + final ActiveAccountInfo activeAccountInfo; + final proto.Account account; } diff --git a/lib/layout/home/home_account_ready/home_account_ready_shell.dart b/lib/layout/home/home_account_ready/home_account_ready_shell.dart index 2434003..fceea75 100644 --- a/lib/layout/home/home_account_ready/home_account_ready_shell.dart +++ b/lib/layout/home/home_account_ready/home_account_ready_shell.dart @@ -1,3 +1,5 @@ +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:provider/provider.dart'; @@ -62,7 +64,19 @@ class HomeAccountReadyShellState extends State { account: account)), BlocProvider( create: (context) => ActiveConversationsBlocMapCubit( - activeAccountInfo: activeAccountInfo)), + activeAccountInfo: activeAccountInfo, + contactListCubit: context.read()) + ..follow( + initialInputState: const AsyncValue.loading(), + stream: context.read().stream)), + BlocProvider( + create: (context) => ActiveConversationMessagesBlocMapCubit( + activeAccountInfo: activeAccountInfo, + )..follow( + initialInputState: IMap(), + stream: context + .read() + .stream)), BlocProvider( create: (context) => ActiveChatCubit(null) ..withStateListen((event) { @@ -70,7 +84,12 @@ class HomeAccountReadyShellState extends State { })), BlocProvider( create: (context) => WaitingInvitationsBlocMapCubit( - activeAccountInfo: activeAccountInfo, account: account)) + activeAccountInfo: activeAccountInfo, account: account) + ..follow( + initialInputState: const AsyncValue.loading(), + stream: context + .read() + .stream)) ], child: widget.child); }))); }