import 'dart:async'; import 'package:async_tools/async_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:uuid/uuid.dart'; import 'package:uuid/v4.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; import '../../proto/proto.dart' as proto; import '../../tools/tools.dart'; import '../models/models.dart'; import 'reconciliation/reconciliation.dart'; const _sfSendMessageTag = 'sfSendMessageTag'; class RenderStateElement { RenderStateElement( {required this.seqId, required this.message, required this.isLocal, this.reconciledTimestamp, this.sent = false, this.sentOffline = false}); MessageSendState? get sendState { if (!isLocal) { return null; } if (reconciledTimestamp != null) { return MessageSendState.delivered; } if (sent) { if (!sentOffline) { return MessageSendState.sent; } else { return MessageSendState.sending; } } return null; } int seqId; proto.Message message; bool isLocal; Timestamp? reconciledTimestamp; bool sent; bool sentOffline; } typedef SingleContactMessagesState = AsyncValue>; // Cubit that processes single-contact chats // Builds the reconciled chat record from the local and remote conversation keys class SingleContactMessagesCubit extends Cubit { SingleContactMessagesCubit({ required AccountInfo accountInfo, required TypedKey remoteIdentityPublicKey, required TypedKey localConversationRecordKey, required TypedKey localMessagesRecordKey, required TypedKey remoteConversationRecordKey, required TypedKey? remoteMessagesRecordKey, }) : _accountInfo = accountInfo, _remoteIdentityPublicKey = remoteIdentityPublicKey, _localConversationRecordKey = localConversationRecordKey, _localMessagesRecordKey = localMessagesRecordKey, _remoteConversationRecordKey = remoteConversationRecordKey, _remoteMessagesRecordKey = remoteMessagesRecordKey, _commandController = StreamController(), super(const AsyncValue.loading()) { // Async Init _initWait.add(_init); } @override Future close() async { await _initWait(); await serialFutureClose((this, _sfSendMessageTag)); await _commandController.close(); await _commandRunnerFut; await _unsentMessagesQueue.close(); await _sentSubscription?.cancel(); await _rcvdSubscription?.cancel(); await _reconciledSubscription?.cancel(); await _sentMessagesDHTLog?.close(); await _rcvdMessagesDHTLog?.close(); await _reconciledMessagesCubit?.close(); // If the local conversation record is gone, then delete the reconciled // messages table as well final conversationDead = await DHTRecordPool.instance .isDeletedRecordKey(_localConversationRecordKey); if (conversationDead) { await SingleContactMessagesCubit.cleanupAndDeleteMessages( localConversationRecordKey: _localConversationRecordKey); } await super.close(); } // Initialize everything Future _init(Completer _) async { _unsentMessagesQueue = PersistentQueue( table: 'SingleContactUnsentMessages', key: _remoteConversationRecordKey.toString(), fromBuffer: proto.Message.fromBuffer, toBuffer: (x) => x.writeToBuffer(), closure: _processUnsentMessages, onError: (e, st) { log.error('Exception while processing unsent messages: $e\n$st\n'); }); // Make crypto await _initCrypto(); // Reconciled messages key await _initReconciledMessagesCubit(); // Local messages key await _initSentMessagesDHTLog(); // Remote messages key await _initRcvdMessagesDHTLog(); // Command execution background process _commandRunnerFut = Future.delayed(Duration.zero, _commandRunner); // Run reconciliation once for all input queues _reconciliation.reconcileMessages(null); } // Make crypto Future _initCrypto() async { _conversationCrypto = await _accountInfo.makeConversationCrypto(_remoteIdentityPublicKey); _senderMessageIntegrity = await MessageIntegrity.create( author: _accountInfo.identityTypedPublicKey); } // Open local messages key Future _initSentMessagesDHTLog() async { final writer = _accountInfo.identityWriter; final sentMessagesDHTLog = await DHTLog.openWrite(_localMessagesRecordKey, writer, debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::' 'SentMessages', parent: _localConversationRecordKey, crypto: _conversationCrypto); _sentSubscription = await sentMessagesDHTLog.listen(_updateSentMessages); _sentMessagesDHTLog = sentMessagesDHTLog; _reconciliation.addInputSourceFromDHTLog( _accountInfo.identityTypedPublicKey, sentMessagesDHTLog); } // Open remote messages key Future _initRcvdMessagesDHTLog() async { // Don't bother if we don't have a remote messages record key yet if (_remoteMessagesRecordKey == null) { return; } // Open new cubit if one is desired final rcvdMessagesDHTLog = await DHTLog.openRead(_remoteMessagesRecordKey!, debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' 'RcvdMessages', parent: _remoteConversationRecordKey, crypto: _conversationCrypto); _rcvdSubscription = await rcvdMessagesDHTLog.listen(_updateRcvdMessages); _rcvdMessagesDHTLog = rcvdMessagesDHTLog; _reconciliation.addInputSourceFromDHTLog( _remoteIdentityPublicKey, rcvdMessagesDHTLog); } void updateRemoteMessagesRecordKey(TypedKey? remoteMessagesRecordKey) { _sspRemoteConversationRecordKey.updateState(remoteMessagesRecordKey, (remoteMessagesRecordKey) async { await _initWait(); // Don't bother if nothing is changing if (_remoteMessagesRecordKey == remoteMessagesRecordKey) { return; } // Close existing DHTLog if we have one final rcvdMessagesDHTLog = _rcvdMessagesDHTLog; _rcvdMessagesDHTLog = null; _remoteMessagesRecordKey = null; await _rcvdSubscription?.cancel(); _rcvdSubscription = null; await rcvdMessagesDHTLog?.close(); // Init the new DHTLog if we should _remoteMessagesRecordKey = remoteMessagesRecordKey; await _initRcvdMessagesDHTLog(); // Run reconciliation once for all input queues _reconciliation.reconcileMessages(null); }); } Future _makeLocalMessagesCrypto() => VeilidCryptoPrivate.fromTypedKey( _accountInfo.userLogin!.identitySecret, 'tabledb'); // Open reconciled chat record key Future _initReconciledMessagesCubit() async { final tableName = _reconciledMessagesTableDBName(_localConversationRecordKey); final crypto = await _makeLocalMessagesCrypto(); _reconciledMessagesCubit = TableDBArrayProtobufCubit( open: () => TableDBArrayProtobuf.make( table: tableName, crypto: crypto, fromBuffer: proto.ReconciledMessage.fromBuffer), ); _reconciliation = MessageReconciliation( output: _reconciledMessagesCubit!, onError: (e, st) { addError(e, st); emit(AsyncValue.error(e, st)); }); _reconciledSubscription = _reconciledMessagesCubit!.stream.listen(_updateReconciledMessagesState); _updateReconciledMessagesState(_reconciledMessagesCubit!.state); } //////////////////////////////////////////////////////////////////////////// // Public interface // Set the tail position of the log for pagination. // If tail is 0, the end of the log is used. // If tail is negative, the position is subtracted from the current log // length. // If tail is positive, the position is absolute from the head of the log // If follow is enabled, the tail offset will update when the log changes Future setWindow( {int? tail, int? count, bool? follow, bool forceRefresh = false}) async { await _initWait(); // print('setWindow: tail=$tail count=$count, follow=$follow'); await _reconciledMessagesCubit!.setWindow( tail: tail, count: count, follow: follow, forceRefresh: forceRefresh); } // Set a user-visible 'text' message with possible attachments void sendTextMessage({required proto.Message_Text messageText}) { final message = proto.Message()..text = messageText; _sendMessage(message: message); } // Run a chat command void runCommand(String command) { final (cmd, rest) = command.splitOnce(' '); if (kIsDebugMode) { if (cmd == '/repeat' && rest != null) { final (countStr, text) = rest.splitOnce(' '); final count = int.tryParse(countStr); if (count != null) { runCommandRepeat(count, text ?? ''); } } } } // Run a repeat command void runCommandRepeat(int count, String text) { _commandController.sink.add(() async { for (var i = 0; i < count; i++) { final protoMessageText = proto.Message_Text() ..text = text.replaceAll(RegExp(r'\$n\b'), i.toString()); final message = proto.Message()..text = protoMessageText; _sendMessage(message: message); await Future.delayed(const Duration(milliseconds: 50)); } }); } //////////////////////////////////////////////////////////////////////////// // Internal implementation // Called when the sent messages DHTLog gets a change // This will re-render when messages are sent from another machine void _updateSentMessages(DHTLogUpdate upd) { _reconciliation.reconcileMessages(_accountInfo.identityTypedPublicKey); } // Called when the received messages DHTLog gets a change void _updateRcvdMessages(DHTLogUpdate upd) { _reconciliation.reconcileMessages(_remoteIdentityPublicKey); } // Called when the reconciled messages window gets a change void _updateReconciledMessagesState( TableDBArrayProtobufBusyState avmessages) { // Update the view _renderState(); } Future _processMessageToSend( proto.Message message, proto.Message? previousMessage) async { // It's possible we had a signature from a previous // operateAppendEventual attempt, so clear it and make a new message id too message ..clearSignature() ..id = await _senderMessageIntegrity.generateMessageId(previousMessage); // Now sign it await _senderMessageIntegrity.signMessage( message, _accountInfo.identitySecretKey); } // Async process to send messages in the background Future _processUnsentMessages(IList messages) async { try { await _sentMessagesDHTLog!.operateAppendEventual((writer) async { // Get the previous message if we have one var previousMessage = writer.length == 0 ? null : await writer.getProtobuf( proto.Message.fromBuffer, writer.length - 1); // Sign all messages final processedMessages = messages.toList(); for (final message in processedMessages) { try { await _processMessageToSend(message, previousMessage); previousMessage = message; } on Exception catch (e, st) { log.error('Exception processing unsent message: $e:\n$st\n'); } } final byteMessages = messages.map((m) => m.writeToBuffer()).toList(); return writer.addAll(byteMessages); }); } on Exception catch (e, st) { log.error('Exception appending unsent messages: $e:\n$st\n'); } } // Produce a state for this cubit from the input cubits and queues void _renderState() { // Get all reconciled messages in the cubit window final reconciledMessages = _reconciledMessagesCubit?.state.state.asData?.value; // Get all sent messages that are still offline //final sentMessages = _sentMessagesDHTLog. // Get all items in the unsent queue final unsentMessages = _unsentMessagesQueue.queue; // If we aren't ready to render a state, say we're loading if (reconciledMessages == null) { emit(const AsyncLoading()); return; } // Generate state for each message // final reconciledMessagesMap = // IMap.fromValues( // keyMapper: (x) => x.content.authorUniqueIdString, // values: reconciledMessages.windowElements, // ); // final sentMessagesMap = // IMap>.fromValues( // keyMapper: (x) => x.value.authorUniqueIdString, // values: sentMessages.window, // ); // final unsentMessagesMap = IMap.fromValues( // keyMapper: (x) => x.authorUniqueIdString, // values: unsentMessages, // ); // List of all rendered state elements that we will turn into // message states final renderedElements = []; // Keep track of the ids we have rendered // because there can be an overlap between the 'unsent messages' // and the reconciled messages as the async state catches up final renderedIds = {}; var seqId = (reconciledMessages.windowTail == 0 ? reconciledMessages.length : reconciledMessages.windowTail) - reconciledMessages.windowElements.length; for (final m in reconciledMessages.windowElements) { final isLocal = m.content.author.toVeilid() == _accountInfo.identityTypedPublicKey; final reconciledTimestamp = Timestamp.fromInt64(m.reconciledTime); //final sm = //isLocal ? sentMessagesMap[m.content.authorUniqueIdString] : null; //final sent = isLocal && sm != null; //final sentOffline = isLocal && sm != null && sm.isOffline; final sent = isLocal; final sentOffline = false; // if (renderedIds.contains(m.content.authorUniqueIdString)) { seqId++; continue; } renderedElements.add(RenderStateElement( seqId: seqId, message: m.content, isLocal: isLocal, reconciledTimestamp: reconciledTimestamp, sent: sent, sentOffline: sentOffline, )); renderedIds.add(m.content.authorUniqueIdString); seqId++; } // Render in-flight messages at the bottom for (final m in unsentMessages) { if (renderedIds.contains(m.authorUniqueIdString)) { seqId++; continue; } renderedElements.add(RenderStateElement( seqId: seqId, message: m, isLocal: true, sent: true, sentOffline: true, )); renderedIds.add(m.authorUniqueIdString); seqId++; } // Render the state final messages = renderedElements .map((x) => MessageState( seqId: x.seqId, content: x.message, sentTimestamp: Timestamp.fromInt64(x.message.timestamp), reconciledTimestamp: x.reconciledTimestamp, sendState: x.sendState)) .toIList(); // Emit the rendered state emit(AsyncValue.data(WindowState( window: messages, length: reconciledMessages.length, windowTail: reconciledMessages.windowTail, windowCount: reconciledMessages.windowCount, follow: reconciledMessages.follow))); } void _sendMessage({required proto.Message message}) { // Add common fields // real id and signature will get set by _processMessageToSend // temporary id set here is random and not 'valid' in the eyes // of reconcilation, noting that reconciled timestamp is not yet set. message ..author = _accountInfo.identityTypedPublicKey.toProto() ..timestamp = Veilid.instance.now().toInt64() ..id = Uuid.parse(_uuidGen.generate()); if ((message.writeToBuffer().lengthInBytes + 256) > 4096) { throw const FormatException('message is too long'); } // Put in the queue serialFuture((this, _sfSendMessageTag), () async { // Add the message to the persistent queue await _unsentMessagesQueue.add(message); // Update the view _renderState(); }); } Future _commandRunner() async { await for (final command in _commandController.stream) { await command(); } } ///////////////////////////////////////////////////////////////////////// // Static utility functions static Future cleanupAndDeleteMessages( {required TypedKey localConversationRecordKey}) async { final recmsgdbname = _reconciledMessagesTableDBName(localConversationRecordKey); await Veilid.instance.deleteTableDB(recmsgdbname); } static String _reconciledMessagesTableDBName( TypedKey localConversationRecordKey) => 'msg_${localConversationRecordKey.toString().replaceAll(':', '_')}'; ///////////////////////////////////////////////////////////////////////// final WaitSet _initWait = WaitSet(); late final AccountInfo _accountInfo; final TypedKey _remoteIdentityPublicKey; final TypedKey _localConversationRecordKey; final TypedKey _localMessagesRecordKey; final TypedKey _remoteConversationRecordKey; TypedKey? _remoteMessagesRecordKey; late final VeilidCrypto _conversationCrypto; late final MessageIntegrity _senderMessageIntegrity; DHTLog? _sentMessagesDHTLog; DHTLog? _rcvdMessagesDHTLog; TableDBArrayProtobufCubit? _reconciledMessagesCubit; late final MessageReconciliation _reconciliation; late final PersistentQueue _unsentMessagesQueue; StreamSubscription? _sentSubscription; StreamSubscription? _rcvdSubscription; StreamSubscription>? _reconciledSubscription; final StreamController Function()> _commandController; late final Future _commandRunnerFut; final _sspRemoteConversationRecordKey = SingleStateProcessor(); final _uuidGen = const UuidV4(); }