more state follower

This commit is contained in:
Christien Rioux 2024-02-20 20:07:35 -05:00
parent 450bdf9c7c
commit c4c7b264aa
8 changed files with 146 additions and 133 deletions

View file

@ -0,0 +1,68 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../chat/chat.dart';
import '../../proto/proto.dart' as proto;
import '../../tools/tools.dart';
import 'active_conversations_bloc_map_cubit.dart';
// Map of remoteConversationRecordKey to MessagesCubit
// Wraps a MessagesCubit to stream the latest messages to the state
// Automatically follows the state of a ActiveConversationsBlocMapCubit.
class ActiveConversationMessagesBlocMapCubit extends BlocMapCubit<TypedKey,
AsyncValue<IList<proto.Message>>, MessagesCubit>
with
StateFollower<ActiveConversationsBlocMapState, TypedKey,
AsyncValue<ActiveConversationState>> {
ActiveConversationMessagesBlocMapCubit({
required ActiveAccountInfo activeAccountInfo,
}) : _activeAccountInfo = activeAccountInfo;
Future<void> _addConversationMessages(
{required proto.Contact contact,
required proto.Conversation localConversation,
required proto.Conversation remoteConversation}) async =>
add(() => MapEntry(
contact.remoteConversationRecordKey.toVeilid(),
MessagesCubit(
activeAccountInfo: _activeAccountInfo,
remoteIdentityPublicKey: contact.identityPublicKey.toVeilid(),
localConversationRecordKey:
contact.localConversationRecordKey.toVeilid(),
remoteConversationRecordKey:
contact.remoteConversationRecordKey.toVeilid(),
localMessagesRecordKey: localConversation.messages.toVeilid(),
remoteMessagesRecordKey:
remoteConversation.messages.toVeilid())));
/// StateFollower /////////////////////////
@override
IMap<TypedKey, AsyncValue<ActiveConversationState>> getStateMap(
ActiveConversationsBlocMapState state) =>
state;
@override
Future<void> removeFromState(TypedKey key) => remove(key);
@override
Future<void> updateState(
TypedKey key, AsyncValue<ActiveConversationState> value) async {
await value.when(
data: (state) => _addConversationMessages(
contact: state.contact,
localConversation: state.localConversation,
remoteConversation: state.remoteConversation),
loading: () => addState(key, const AsyncValue.loading()),
error: (error, stackTrace) =>
addState(key, AsyncValue.error(error, stackTrace)));
}
////
final ActiveAccountInfo _activeAccountInfo;
}

View file

@ -1,113 +0,0 @@
import 'dart:async';
import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:veilid_support/veilid_support.dart';
import '../../account_manager/account_manager.dart';
import '../../chat/chat.dart';
import '../../proto/proto.dart' as proto;
import '../../tools/tools.dart';
import 'active_conversations_bloc_map_cubit.dart';
class ActiveConversationMessagesCubit extends BlocMapCubit<TypedKey,
AsyncValue<IList<proto.Message>>, MessagesCubit> {
ActiveConversationMessagesCubit({
required ActiveAccountInfo activeAccountInfo,
required Stream<ActiveConversationsBlocMapState> stream,
}) : _activeAccountInfo = activeAccountInfo {
//
_subscription = stream.listen(updateMessageCubits);
}
@override
Future<void> close() async {
await _subscription.cancel();
await super.close();
}
// Determine which conversations have been added, deleted, or changed
// and update this cubit's state appropriately
void updateMessageCubits(ActiveConversationsBlocMapState newInputState) {
// Use a singlefuture here to ensure we get dont lose any updates
// If the ActiveConversations stream gives us an update while we are
// still processing the last update, the most recent input state will
// be saved and processed eventually.
singleFuture(this, () async {
var newActiveConversationsState = newInputState;
var done = false;
while (!done) {
// Build lists of changes to conversations
final deleted = _lastActiveConversationsState.keys
.where((k) => !newActiveConversationsState.containsKey(k));
final added = newActiveConversationsState.keys
.where((k) => !_lastActiveConversationsState.containsKey(k));
final changed = _lastActiveConversationsState.where((k, v) {
final nv = newActiveConversationsState[k];
if (nv == null) {
return false;
}
return nv != v;
}).keys;
// Process all deleted conversations
for (final d in deleted) {
await remove(d);
}
// Process all added and changed conversations
for (final a in [...added, ...changed]) {
final av = newActiveConversationsState[a]!;
await av.when(
data: (state) => _addConversationMessages(
contact: state.contact,
localConversation: state.localConversation,
remoteConversation: state.remoteConversation),
loading: () => addState(a, const AsyncValue.loading()),
error: (error, stackTrace) =>
addState(a, AsyncValue.error(error, stackTrace)));
}
// Keep this state for the next time
_lastActiveConversationsState = newActiveConversationsState;
// See if there's another state change to process
final next = _nextActiveConversationsState;
_nextActiveConversationsState = null;
if (next != null) {
newActiveConversationsState = next;
} else {
done = true;
}
}
}, onBusy: () {
// Keep this state until we process again
_nextActiveConversationsState = newInputState;
});
}
Future<void> _addConversationMessages(
{required proto.Contact contact,
required proto.Conversation localConversation,
required proto.Conversation remoteConversation}) async =>
add(() => MapEntry(
contact.remoteConversationRecordKey.toVeilid(),
MessagesCubit(
activeAccountInfo: _activeAccountInfo,
remoteIdentityPublicKey: contact.identityPublicKey.toVeilid(),
localConversationRecordKey:
contact.localConversationRecordKey.toVeilid(),
remoteConversationRecordKey:
contact.remoteConversationRecordKey.toVeilid(),
localMessagesRecordKey: localConversation.messages.toVeilid(),
remoteMessagesRecordKey:
remoteConversation.messages.toVeilid())));
////
final ActiveAccountInfo _activeAccountInfo;
ActiveConversationsBlocMapState _lastActiveConversationsState =
ActiveConversationsBlocMapState();
ActiveConversationsBlocMapState? _nextActiveConversationsState;
late final StreamSubscription<ActiveConversationsBlocMapState> _subscription;
}

View file

@ -1,5 +1,6 @@
import 'package:async_tools/async_tools.dart';
import 'package:equatable/equatable.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:meta/meta.dart';
import 'package:veilid_support/veilid_support.dart';
@ -32,11 +33,15 @@ typedef ActiveConversationsBlocMapState
// Map of remoteConversationRecordKey to ActiveConversationCubit
// Wraps a conversation cubit to only expose completely built conversations
// Automatically follows the state of a ChatListCubit.
class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
AsyncValue<ActiveConversationState>, ActiveConversationCubit> {
AsyncValue<ActiveConversationState>, ActiveConversationCubit>
with StateFollower<AsyncValue<IList<proto.Chat>>, TypedKey, proto.Chat> {
ActiveConversationsBlocMapCubit(
{required ActiveAccountInfo activeAccountInfo})
: _activeAccountInfo = activeAccountInfo;
{required ActiveAccountInfo activeAccountInfo,
required ContactListCubit contactListCubit})
: _activeAccountInfo = activeAccountInfo,
_contactListCubit = contactListCubit;
// Add an active conversation to be tracked for changes
Future<void> addConversation({required proto.Contact contact}) async =>
@ -65,5 +70,41 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
loading: AsyncValue.loading,
error: AsyncValue.error))));
/// StateFollower /////////////////////////
@override
IMap<TypedKey, proto.Chat> getStateMap(AsyncValue<IList<proto.Chat>> state) {
final stateValue = state.data?.value;
if (stateValue == null) {
return IMap();
}
return IMap.fromIterable(stateValue,
keyMapper: (e) => e.remoteConversationKey.toVeilid(),
valueMapper: (e) => e);
}
@override
Future<void> removeFromState(TypedKey key) => remove(key);
@override
Future<void> updateState(TypedKey key, proto.Chat value) async {
final contactList = _contactListCubit.state.data?.value;
if (contactList == null) {
await addState(key, const AsyncValue.loading());
return;
}
final contactIndex = contactList
.indexWhere((c) => c.remoteConversationRecordKey.toVeilid() == key);
if (contactIndex == -1) {
await addState(key, AsyncValue.error('Contact not found for chat'));
return;
}
final contact = contactList[contactIndex];
await addConversation(contact: contact);
}
////
final ActiveAccountInfo _activeAccountInfo;
final ContactListCubit _contactListCubit;
}

View file

@ -1,3 +1,3 @@
export 'active_conversation_messages_cubit.dart';
export 'active_conversation_messages_bloc_map_cubit.dart';
export 'active_conversations_bloc_map_cubit.dart';
export 'chat_list_cubit.dart';

View file

@ -68,11 +68,7 @@ class ChatSingleContactItemWidget extends StatelessWidget {
// component is not dragged.
child: ListTile(
onTap: () {
final activeConversationsCubit =
context.read<ActiveConversationsBlocMapCubit>();
singleFuture(activeChatCubit, () async {
await activeConversationsCubit.addConversation(
contact: _contact);
activeChatCubit.setActiveChat(remoteConversationRecordKey);
});
},