incremental chat state work

This commit is contained in:
Christien Rioux 2024-08-03 21:55:20 -05:00
parent 83880d79ba
commit 47287ba8d4
7 changed files with 70 additions and 30 deletions

View File

@ -233,7 +233,8 @@ class ChatComponentCubit extends Cubit<ChatComponentState> {
return types.User( return types.User(
id: remoteIdentityPublicKey.toString(), id: remoteIdentityPublicKey.toString(),
firstName: activeConversationState.remoteConversation.profile.name, firstName: activeConversationState.remoteConversation?.profile.name ??
'<unnamed>',
metadata: {metadataKeyIdentityPublicKey: remoteIdentityPublicKey}); metadata: {metadataKeyIdentityPublicKey: remoteIdentityPublicKey});
} }

View File

@ -55,7 +55,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
required TypedKey localConversationRecordKey, required TypedKey localConversationRecordKey,
required TypedKey localMessagesRecordKey, required TypedKey localMessagesRecordKey,
required TypedKey remoteConversationRecordKey, required TypedKey remoteConversationRecordKey,
required TypedKey remoteMessagesRecordKey, required TypedKey? remoteMessagesRecordKey,
}) : _accountInfo = accountInfo, }) : _accountInfo = accountInfo,
_remoteIdentityPublicKey = remoteIdentityPublicKey, _remoteIdentityPublicKey = remoteIdentityPublicKey,
_localConversationRecordKey = localConversationRecordKey, _localConversationRecordKey = localConversationRecordKey,
@ -147,8 +147,14 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Open remote messages key // Open remote messages key
Future<void> _initRcvdMessagesCubit() async { Future<void> _initRcvdMessagesCubit() async {
// Don't bother if we don't have a remote messages record key yet
if (_remoteMessagesRecordKey == null) {
return;
}
// Open new cubit if one is desired
_rcvdMessagesCubit = DHTLogCubit( _rcvdMessagesCubit = DHTLogCubit(
open: () async => DHTLog.openRead(_remoteMessagesRecordKey, open: () async => DHTLog.openRead(_remoteMessagesRecordKey!,
debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::'
'RcvdMessages', 'RcvdMessages',
parent: _remoteConversationRecordKey, parent: _remoteConversationRecordKey,
@ -159,6 +165,31 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_updateRcvdMessagesState(_rcvdMessagesCubit!.state); _updateRcvdMessagesState(_rcvdMessagesCubit!.state);
} }
Future<void> updateRemoteMessagesRecordKey(
TypedKey? remoteMessagesRecordKey) async {
await _initWait();
_sspRemoteConversationRecordKey.updateState(remoteMessagesRecordKey,
(remoteMessagesRecordKey) async {
// Don't bother if nothing is changing
if (_remoteMessagesRecordKey == remoteMessagesRecordKey) {
return;
}
// Close existing cubit if we have one
final rcvdMessagesCubit = _rcvdMessagesCubit;
_rcvdMessagesCubit = null;
_remoteMessagesRecordKey = null;
await _rcvdSubscription?.cancel();
_rcvdSubscription = null;
await rcvdMessagesCubit?.close();
// Init the new cubit if we should
_remoteMessagesRecordKey = remoteMessagesRecordKey;
await _initRcvdMessagesCubit();
});
}
Future<VeilidCrypto> _makeLocalMessagesCrypto() async => Future<VeilidCrypto> _makeLocalMessagesCrypto() async =>
VeilidCryptoPrivate.fromTypedKey( VeilidCryptoPrivate.fromTypedKey(
_accountInfo.userLogin!.identitySecret, 'tabledb'); _accountInfo.userLogin!.identitySecret, 'tabledb');
@ -452,7 +483,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
final TypedKey _localConversationRecordKey; final TypedKey _localConversationRecordKey;
final TypedKey _localMessagesRecordKey; final TypedKey _localMessagesRecordKey;
final TypedKey _remoteConversationRecordKey; final TypedKey _remoteConversationRecordKey;
final TypedKey _remoteMessagesRecordKey; TypedKey? _remoteMessagesRecordKey;
late final VeilidCrypto _conversationCrypto; late final VeilidCrypto _conversationCrypto;
late final MessageIntegrity _senderMessageIntegrity; late final MessageIntegrity _senderMessageIntegrity;
@ -471,4 +502,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_reconciledSubscription; _reconciledSubscription;
final StreamController<Future<void> Function()> _commandController; final StreamController<Future<void> Function()> _commandController;
late final Future<void> _commandRunnerFut; late final Future<void> _commandRunnerFut;
final _sspRemoteConversationRecordKey = SingleStateProcessor<TypedKey?>();
} }

View File

@ -24,7 +24,7 @@ class ActiveConversationState extends Equatable {
final TypedKey localConversationRecordKey; final TypedKey localConversationRecordKey;
final TypedKey remoteConversationRecordKey; final TypedKey remoteConversationRecordKey;
final proto.Conversation localConversation; final proto.Conversation localConversation;
final proto.Conversation remoteConversation; final proto.Conversation? remoteConversation;
@override @override
List<Object?> get props => [ List<Object?> get props => [
@ -102,7 +102,8 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
conversationCubit.watchAccountChanges( conversationCubit.watchAccountChanges(
_accountRecordCubit.stream, _accountRecordCubit.state); _accountRecordCubit.stream, _accountRecordCubit.state);
// Transformer that only passes through completed/active conversations // Transformer that only passes through conversations where the local
// portion is not loading
// along with the contact that corresponds to the completed // along with the contact that corresponds to the completed
// conversation // conversation
final transformedCubit = TransformerCubit< final transformedCubit = TransformerCubit<
@ -110,12 +111,11 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit<TypedKey,
AsyncValue<ConversationState>, AsyncValue<ConversationState>,
ConversationCubit>(conversationCubit, ConversationCubit>(conversationCubit,
transform: (avstate) => avstate.when( transform: (avstate) => avstate.when(
data: (data) => (data.localConversation == null || data: (data) => (data.localConversation == null)
data.remoteConversation == null)
? const AsyncValue.loading() ? const AsyncValue.loading()
: AsyncValue.data(ActiveConversationState( : AsyncValue.data(ActiveConversationState(
localConversation: data.localConversation!, localConversation: data.localConversation!,
remoteConversation: data.remoteConversation!, remoteConversation: data.remoteConversation,
remoteIdentityPublicKey: remoteIdentityPublicKey, remoteIdentityPublicKey: remoteIdentityPublicKey,
localConversationRecordKey: localConversationRecordKey, localConversationRecordKey: localConversationRecordKey,
remoteConversationRecordKey: remoteConversationRecordKey:

View File

@ -25,7 +25,7 @@ class _SingleContactChatState extends Equatable {
final TypedKey localConversationRecordKey; final TypedKey localConversationRecordKey;
final TypedKey remoteConversationRecordKey; final TypedKey remoteConversationRecordKey;
final TypedKey localMessagesRecordKey; final TypedKey localMessagesRecordKey;
final TypedKey remoteMessagesRecordKey; final TypedKey? remoteMessagesRecordKey;
@override @override
List<Object?> get props => [ List<Object?> get props => [
@ -53,8 +53,16 @@ class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit<TypedKey,
follow(activeConversationsBlocMapCubit); follow(activeConversationsBlocMapCubit);
} }
Future<void> _addConversationMessages(_SingleContactChatState state) async => Future<void> _addConversationMessages(_SingleContactChatState state) async {
add(() => MapEntry( // xxx could use atomic update() function
final cubit = await tryOperateAsync<SingleContactMessagesCubit>(
state.localConversationRecordKey, closure: (cubit) async {
await cubit.updateRemoteMessagesRecordKey(state.remoteMessagesRecordKey);
return cubit;
});
if (cubit == null) {
await add(() => MapEntry(
state.localConversationRecordKey, state.localConversationRecordKey,
SingleContactMessagesCubit( SingleContactMessagesCubit(
accountInfo: _accountInfo, accountInfo: _accountInfo,
@ -64,6 +72,8 @@ class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit<TypedKey,
localMessagesRecordKey: state.localMessagesRecordKey, localMessagesRecordKey: state.localMessagesRecordKey,
remoteMessagesRecordKey: state.remoteMessagesRecordKey, remoteMessagesRecordKey: state.remoteMessagesRecordKey,
))); )));
}
}
_SingleContactChatState? _mapStateValue( _SingleContactChatState? _mapStateValue(
AsyncValue<ActiveConversationState> avInputState) { AsyncValue<ActiveConversationState> avInputState) {
@ -78,7 +88,7 @@ class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit<TypedKey,
localMessagesRecordKey: localMessagesRecordKey:
inputState.localConversation.messages.toVeilid(), inputState.localConversation.messages.toVeilid(),
remoteMessagesRecordKey: remoteMessagesRecordKey:
inputState.remoteConversation.messages.toVeilid()); inputState.remoteConversation?.messages.toVeilid());
} }
/// StateFollower ///////////////////////// /// StateFollower /////////////////////////

View File

@ -226,13 +226,11 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
localConversation: conv, localConversation: conv,
remoteConversation: _incrementalState.remoteConversation); remoteConversation: _incrementalState.remoteConversation);
// return loading still if state isn't complete // return loading still if state isn't complete
if ((_localConversationRecordKey != null && if (_localConversationRecordKey != null &&
_incrementalState.localConversation == null) || _incrementalState.localConversation == null) {
(_remoteConversationRecordKey != null &&
_incrementalState.remoteConversation == null)) {
return const AsyncValue<ConversationState>.loading(); return const AsyncValue<ConversationState>.loading();
} }
// state is complete, all required keys are open // local state is complete, all remote state is emitted incrementally
return AsyncValue.data(_incrementalState); return AsyncValue.data(_incrementalState);
}, },
loading: AsyncValue<ConversationState>.loading, loading: AsyncValue<ConversationState>.loading,
@ -247,14 +245,12 @@ class ConversationCubit extends Cubit<AsyncValue<ConversationState>> {
_incrementalState = ConversationState( _incrementalState = ConversationState(
localConversation: _incrementalState.localConversation, localConversation: _incrementalState.localConversation,
remoteConversation: conv); remoteConversation: conv);
// return loading still if state isn't complete // return loading still if the local state isn't complete
if ((_localConversationRecordKey != null && if (_localConversationRecordKey != null &&
_incrementalState.localConversation == null) || _incrementalState.localConversation == null) {
(_remoteConversationRecordKey != null &&
_incrementalState.remoteConversation == null)) {
return const AsyncValue<ConversationState>.loading(); return const AsyncValue<ConversationState>.loading();
} }
// state is complete, all required keys are open // local state is complete, all remote state is emitted incrementally
return AsyncValue.data(_incrementalState); return AsyncValue.data(_incrementalState);
}, },
loading: AsyncValue<ConversationState>.loading, loading: AsyncValue<ConversationState>.loading,

View File

@ -112,9 +112,9 @@ class CallbackPrinter extends LoggyPrinter {
@override @override
void onLog(LogRecord record) { void onLog(LogRecord record) {
final out = record.pretty(); final out = record.pretty();
if (isDesktop) { //if (isDesktop) {
debugPrintSynchronously(out); debugPrintSynchronously(out);
} //}
globalDebugTerminal.write('$out\n'.replaceAll('\n', '\r\n')); globalDebugTerminal.write('$out\n'.replaceAll('\n', '\r\n'));
callback?.call(record); callback?.call(record);
} }

View File

@ -4,12 +4,12 @@ import 'loggy.dart';
const Map<String, LogLevel> _blocChangeLogLevels = { const Map<String, LogLevel> _blocChangeLogLevels = {
'ConnectionStateCubit': LogLevel.off, 'ConnectionStateCubit': LogLevel.off,
'ActiveSingleContactChatBlocMapCubit': LogLevel.off, //'ActiveSingleContactChatBlocMapCubit': LogLevel.off,
'ActiveConversationsBlocMapCubit': LogLevel.off, //'ActiveConversationsBlocMapCubit': LogLevel.off,
'PersistentQueueCubit<Message>': LogLevel.off, 'PersistentQueueCubit<Message>': LogLevel.off,
'TableDBArrayProtobufCubit<ReconciledMessage>': LogLevel.off, 'TableDBArrayProtobufCubit<ReconciledMessage>': LogLevel.off,
'DHTLogCubit<Message>': LogLevel.off, 'DHTLogCubit<Message>': LogLevel.off,
'SingleContactMessagesCubit': LogLevel.off, //'SingleContactMessagesCubit': LogLevel.off,
'ChatComponentCubit': LogLevel.off, 'ChatComponentCubit': LogLevel.off,
}; };