From 809f6d69bf41d6a106ba2d90fd2ce5c05f0cb84a Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 17 Apr 2024 21:31:26 -0400 Subject: [PATCH] better message status support --- .../account_repository.dart | 27 +- lib/chat/chat.dart | 1 + .../cubits/single_contact_messages_cubit.dart | 330 ++++++++++++++---- lib/chat/models/message_state.dart | 34 ++ lib/chat/models/message_state.freezed.dart | 229 ++++++++++++ lib/chat/models/message_state.g.dart | 25 ++ lib/chat/models/models.dart | 1 + lib/chat/views/chat_component.dart | 49 +-- .../active_conversations_bloc_map_cubit.dart | 6 +- ...ve_single_contact_chat_bloc_map_cubit.dart | 15 +- lib/chat_list/cubits/chat_list_cubit.dart | 11 +- .../chat_single_contact_list_widget.dart | 8 +- .../cubits/contact_invitation_list_cubit.dart | 14 +- .../waiting_invitations_bloc_map_cubit.dart | 7 +- lib/contacts/cubits/conversation_cubit.dart | 8 +- .../main_pager/account_page.dart | 7 +- lib/proto/veilidchat.proto | 3 - lib/router/cubit/router_cubit.dart | 11 +- lib/router/cubit/router_state.dart | 11 - packages/bloc_tools/lib/src/future_cubit.dart | 20 +- .../src/dht_record/dht_record.dart | 2 +- .../src/dht_record/dht_record_pool.dart | 14 +- .../src/dht_short_array/dht_short_array.dart | 6 +- .../dht_short_array_cubit.dart | 31 +- .../dht_short_array/dht_short_array_read.dart | 37 ++ .../dht_short_array_write.dart | 28 +- .../lib/src/async_table_db_backed_cubit.dart | 49 ++- packages/veilid_support/lib/src/identity.dart | 5 +- .../lib/src/persistent_queue_cubit.dart | 194 ++++++++++ packages/veilid_support/lib/src/table_db.dart | 110 +++++- .../veilid_support/lib/veilid_support.dart | 1 + 31 files changed, 1046 insertions(+), 248 deletions(-) create mode 100644 lib/chat/models/message_state.dart create mode 100644 lib/chat/models/message_state.freezed.dart create mode 100644 lib/chat/models/message_state.g.dart create mode 100644 lib/chat/models/models.dart delete mode 100644 lib/router/cubit/router_state.dart create mode 100644 packages/veilid_support/lib/src/persistent_queue_cubit.dart diff --git a/lib/account_manager/repository/account_repository/account_repository.dart b/lib/account_manager/repository/account_repository/account_repository.dart index 056fd5f..777c79e 100644 --- a/lib/account_manager/repository/account_repository/account_repository.dart +++ b/lib/account_manager/repository/account_repository/account_repository.dart @@ -26,7 +26,8 @@ class AccountRepository { ? IList.fromJson( obj, genericFromJson(LocalAccount.fromJson)) : IList(), - valueToJson: (val) => val.toJson((la) => la.toJson())); + valueToJson: (val) => val?.toJson((la) => la.toJson()), + makeInitialValue: IList.empty); static TableDBValue> _initUserLogins() => TableDBValue( tableName: 'local_account_manager', @@ -34,13 +35,15 @@ class AccountRepository { valueFromJson: (obj) => obj != null ? IList.fromJson(obj, genericFromJson(UserLogin.fromJson)) : IList(), - valueToJson: (val) => val.toJson((la) => la.toJson())); + valueToJson: (val) => val?.toJson((la) => la.toJson()), + makeInitialValue: IList.empty); static TableDBValue _initActiveAccount() => TableDBValue( tableName: 'local_account_manager', tableKeyName: 'active_local_account', valueFromJson: (obj) => obj == null ? null : TypedKey.fromJson(obj), - valueToJson: (val) => val?.toJson()); + valueToJson: (val) => val?.toJson(), + makeInitialValue: () => null); ////////////////////////////////////////////////////////////// /// Fields @@ -62,7 +65,9 @@ class AccountRepository { } Future close() async { - // ??? + await _localAccounts.close(); + await _userLogins.close(); + await _activeLocalAccount.close(); } ////////////////////////////////////////////////////////////// @@ -72,18 +77,18 @@ class AccountRepository { ////////////////////////////////////////////////////////////// /// Selectors - IList getLocalAccounts() => _localAccounts.requireValue; - TypedKey? getActiveLocalAccount() => _activeLocalAccount.requireValue; - IList getUserLogins() => _userLogins.requireValue; + IList getLocalAccounts() => _localAccounts.value; + TypedKey? getActiveLocalAccount() => _activeLocalAccount.value; + IList getUserLogins() => _userLogins.value; UserLogin? getActiveUserLogin() { - final activeLocalAccount = _activeLocalAccount.requireValue; + final activeLocalAccount = _activeLocalAccount.value; return activeLocalAccount == null ? null : fetchUserLogin(activeLocalAccount); } LocalAccount? fetchLocalAccount(TypedKey accountMasterRecordKey) { - final localAccounts = _localAccounts.requireValue; + final localAccounts = _localAccounts.value; final idx = localAccounts.indexWhere( (e) => e.identityMaster.masterRecordKey == accountMasterRecordKey); if (idx == -1) { @@ -93,7 +98,7 @@ class AccountRepository { } UserLogin? fetchUserLogin(TypedKey accountMasterRecordKey) { - final userLogins = _userLogins.requireValue; + final userLogins = _userLogins.value; final idx = userLogins .indexWhere((e) => e.accountMasterRecordKey == accountMasterRecordKey); if (idx == -1) { @@ -295,7 +300,7 @@ class AccountRepository { if (accountMasterRecordKey != null) { // Assert the specified record key can be found, will throw if not - final _ = _userLogins.requireValue.firstWhere( + final _ = _userLogins.value.firstWhere( (ul) => ul.accountMasterRecordKey == accountMasterRecordKey); } await _activeLocalAccount.set(accountMasterRecordKey); diff --git a/lib/chat/chat.dart b/lib/chat/chat.dart index 6acdd43..08ae2e7 100644 --- a/lib/chat/chat.dart +++ b/lib/chat/chat.dart @@ -1,2 +1,3 @@ export 'cubits/cubits.dart'; +export 'models/models.dart'; export 'views/views.dart'; diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 1a2a604..85eb9d6 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -1,20 +1,49 @@ import 'dart:async'; import 'package:async_tools/async_tools.dart'; -import 'package:bloc_tools/bloc_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:fixnum/fixnum.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; import '../../proto/proto.dart' as proto; +import '../models/models.dart'; -class _SingleContactMessageQueueEntry { - _SingleContactMessageQueueEntry({this.remoteMessages}); - IList? remoteMessages; +class RenderStateElement { + RenderStateElement( + {required this.message, + required this.isLocal, + this.reconciled = false, + this.reconciledOffline = false, + this.sent = false, + this.sentOffline = false}); + + MessageSendState? get sendState { + if (!isLocal) { + return null; + } + if (reconciled && sent) { + if (!reconciledOffline && !sentOffline) { + return MessageSendState.delivered; + } + return MessageSendState.sent; + } + if (sent && !sentOffline) { + return MessageSendState.sent; + } + return MessageSendState.sending; + } + + proto.Message message; + bool isLocal; + bool reconciled; + bool reconciledOffline; + bool sent; + bool sentOffline; } -typedef SingleContactMessagesState = AsyncValue>; +typedef SingleContactMessagesState = AsyncValue>; // Cubit that processes single-contact chats // Builds the reconciled chat record from the local and remote conversation keys @@ -34,7 +63,14 @@ class SingleContactMessagesCubit extends Cubit { _remoteConversationRecordKey = remoteConversationRecordKey, _remoteMessagesRecordKey = remoteMessagesRecordKey, _reconciledChatRecord = reconciledChatRecord, - _messagesUpdateQueue = StreamController(), + _unreconciledMessagesQueue = PersistentQueueCubit( + table: 'SingleContactUnreconciledMessages', + key: remoteConversationRecordKey.toString(), + fromBuffer: proto.Message.fromBuffer), + _sendingMessagesQueue = PersistentQueueCubit( + table: 'SingleContactSendingMessages', + key: remoteConversationRecordKey.toString(), + fromBuffer: proto.Message.fromBuffer), super(const AsyncValue.loading()) { // Async Init _initWait.add(_init); @@ -44,13 +80,14 @@ class SingleContactMessagesCubit extends Cubit { Future close() async { await _initWait(); - await _messagesUpdateQueue.close(); - await _localSubscription?.cancel(); - await _remoteSubscription?.cancel(); - await _reconciledChatSubscription?.cancel(); - await _localMessagesCubit?.close(); - await _remoteMessagesCubit?.close(); - await _reconciledChatMessagesCubit?.close(); + await _unreconciledMessagesQueue.close(); + await _sendingMessagesQueue.close(); + await _sentSubscription?.cancel(); + await _rcvdSubscription?.cancel(); + await _reconciledSubscription?.cancel(); + await _sentMessagesCubit?.close(); + await _rcvdMessagesCubit?.close(); + await _reconciledMessagesCubit?.close(); await super.close(); } @@ -60,95 +97,137 @@ class SingleContactMessagesCubit extends Cubit { await _initMessagesCrypto(); // Reconciled messages key - await _initReconciledChatMessages(); + await _initReconciledMessagesCubit(); // Local messages key - await _initLocalMessages(); + await _initSentMessagesCubit(); // Remote messages key - await _initRemoteMessages(); + await _initRcvdMessagesCubit(); - // Messages listener + // Unreconciled messages processing queue listener Future.delayed(Duration.zero, () async { - await for (final entry in _messagesUpdateQueue.stream) { - await _updateMessagesStateAsync(entry); + await for (final entry in _unreconciledMessagesQueue.stream) { + final data = entry.asData; + if (data != null && data.value.isNotEmpty) { + // Process data using recoverable processing mechanism + await _unreconciledMessagesQueue.process((messages) async { + await _processUnreconciledMessages(data.value); + }); + } + } + }); + + // Sending messages processing queue listener + Future.delayed(Duration.zero, () async { + await for (final entry in _sendingMessagesQueue.stream) { + final data = entry.asData; + if (data != null && data.value.isNotEmpty) { + // Process data using recoverable processing mechanism + await _sendingMessagesQueue.process((messages) async { + await _processSendingMessages(data.value); + }); + } } }); } // Make crypto - Future _initMessagesCrypto() async { _messagesCrypto = await _activeAccountInfo .makeConversationCrypto(_remoteIdentityPublicKey); } // Open local messages key - Future _initLocalMessages() async { + Future _initSentMessagesCubit() async { final writer = _activeAccountInfo.conversationWriter; - _localMessagesCubit = DHTShortArrayCubit( + _sentMessagesCubit = DHTShortArrayCubit( open: () async => DHTShortArray.openWrite( _localMessagesRecordKey, writer, debugName: - 'SingleContactMessagesCubit::_initLocalMessages::LocalMessages', + 'SingleContactMessagesCubit::_initSentMessagesCubit::SentMessages', parent: _localConversationRecordKey, crypto: _messagesCrypto), decodeElement: proto.Message.fromBuffer); + _sentSubscription = + _sentMessagesCubit!.stream.listen(_updateSentMessagesState); + _updateSentMessagesState(_sentMessagesCubit!.state); } // Open remote messages key - Future _initRemoteMessages() async { - _remoteMessagesCubit = DHTShortArrayCubit( + Future _initRcvdMessagesCubit() async { + _rcvdMessagesCubit = DHTShortArrayCubit( open: () async => DHTShortArray.openRead(_remoteMessagesRecordKey, - debugName: 'SingleContactMessagesCubit::_initRemoteMessages::' - 'RemoteMessages', + debugName: 'SingleContactMessagesCubit::_initRcvdMessagesCubit::' + 'RcvdMessages', parent: _remoteConversationRecordKey, crypto: _messagesCrypto), decodeElement: proto.Message.fromBuffer); - _remoteSubscription = - _remoteMessagesCubit!.stream.listen(_updateRemoteMessagesState); - _updateRemoteMessagesState(_remoteMessagesCubit!.state); + _rcvdSubscription = + _rcvdMessagesCubit!.stream.listen(_updateRcvdMessagesState); + _updateRcvdMessagesState(_rcvdMessagesCubit!.state); } // Open reconciled chat record key - Future _initReconciledChatMessages() async { + Future _initReconciledMessagesCubit() async { final accountRecordKey = _activeAccountInfo.userLogin.accountRecordInfo.accountRecord.recordKey; - _reconciledChatMessagesCubit = DHTShortArrayCubit( + _reconciledMessagesCubit = DHTShortArrayCubit( open: () async => DHTShortArray.openOwned(_reconciledChatRecord, - debugName: - 'SingleContactMessagesCubit::_initReconciledChatMessages::' - 'ReconciledChat', + debugName: 'SingleContactMessagesCubit::_initReconciledMessages::' + 'ReconciledMessages', parent: accountRecordKey), decodeElement: proto.Message.fromBuffer); - _reconciledChatSubscription = - _reconciledChatMessagesCubit!.stream.listen(_updateReconciledChatState); - _updateReconciledChatState(_reconciledChatMessagesCubit!.state); + _reconciledSubscription = + _reconciledMessagesCubit!.stream.listen(_updateReconciledMessagesState); + _updateReconciledMessagesState(_reconciledMessagesCubit!.state); } // Called when the remote messages list gets a change - void _updateRemoteMessagesState( - BlocBusyState>> avmessages) { + void _updateRcvdMessagesState( + DHTShortArrayBusyState avmessages) { final remoteMessages = avmessages.state.asData?.value; if (remoteMessages == null) { return; } + // Add remote messages updates to queue to process asynchronously - _messagesUpdateQueue - .add(_SingleContactMessageQueueEntry(remoteMessages: remoteMessages)); + // Ignore offline state because remote messages are always fully delivered + // This may happen once per client but should be idempotent + _unreconciledMessagesQueue + .addAllSync(remoteMessages.map((x) => x.value).toIList()); + + // Update the view + _renderState(); + } + + // Called when the send messages list gets a change + // This will re-render when messages are sent from another machine + void _updateSentMessagesState( + DHTShortArrayBusyState avmessages) { + final remoteMessages = avmessages.state.asData?.value; + if (remoteMessages == null) { + return; + } + // Don't reconcile, the sending machine will have already added + // to the reconciliation queue on that machine + + // Update the view + _renderState(); } // Called when the reconciled messages list gets a change - void _updateReconciledChatState( - BlocBusyState>> avmessages) { - // When reconciled messages are updated, pass this - // directly to the messages cubit state - emit(avmessages.state); + // This can happen when multiple clients for the same identity are + // reading and reconciling the same remote chat + void _updateReconciledMessagesState( + DHTShortArrayBusyState avmessages) { + // Update the view + _renderState(); } - Future _mergeMessagesInner( + Future _reconcileMessagesInner( {required DHTShortArrayWrite reconciledMessagesWriter, required IList messages}) async { // Ensure remoteMessages is sorted by timestamp @@ -209,29 +288,129 @@ class SingleContactMessagesCubit extends Cubit { } } - Future _updateMessagesStateAsync( - _SingleContactMessageQueueEntry entry) async { - final reconciledChatMessagesCubit = _reconciledChatMessagesCubit!; - - // Merge remote and local messages into the reconciled chat log - await reconciledChatMessagesCubit + // Async process to reconcile messages sent or received in the background + Future _processUnreconciledMessages( + IList messages) async { + await _reconciledMessagesCubit! .operateWrite((reconciledMessagesWriter) async { - if (entry.remoteMessages != null) { - await _mergeMessagesInner( - reconciledMessagesWriter: reconciledMessagesWriter, - messages: entry.remoteMessages!); - } + await _reconcileMessagesInner( + reconciledMessagesWriter: reconciledMessagesWriter, + messages: messages); }); } - Future addMessage({required proto.Message message}) async { - await _initWait(); + // Async process to send messages in the background + Future _processSendingMessages(IList messages) async { + for (final message in messages) { + await _sentMessagesCubit!.operateWriteEventual( + (writer) => writer.tryAddItem(message.writeToBuffer())); + } + } - await _reconciledChatMessagesCubit!.operateWrite((writer) => - _mergeMessagesInner( - reconciledMessagesWriter: writer, messages: [message].toIList())); - await _localMessagesCubit! - .operateWrite((writer) => writer.tryAddItem(message.writeToBuffer())); + // Produce a state for this cubit from the input cubits and queues + void _renderState() { + // Get all reconciled messages + final reconciledMessages = + _reconciledMessagesCubit?.state.state.asData?.value; + // Get all sent messages + final sentMessages = _sentMessagesCubit?.state.state.asData?.value; + // Get all items in the unreconciled queue + final unreconciledMessages = _unreconciledMessagesQueue.state.asData?.value; + // Get all items in the unsent queue + final sendingMessages = _sendingMessagesQueue.state.asData?.value; + + // If we aren't ready to render a state, say we're loading + if (reconciledMessages == null || + sentMessages == null || + unreconciledMessages == null || + sendingMessages == null) { + emit(const AsyncLoading()); + return; + } + + // Generate state for each message + final sentMessagesMap = + IMap>.fromValues( + keyMapper: (x) => x.value.timestamp, + values: sentMessages, + ); + final reconciledMessagesMap = + IMap>.fromValues( + keyMapper: (x) => x.value.timestamp, + values: reconciledMessages, + ); + final sendingMessagesMap = IMap.fromValues( + keyMapper: (x) => x.timestamp, + values: sendingMessages, + ); + final unreconciledMessagesMap = IMap.fromValues( + keyMapper: (x) => x.timestamp, + values: unreconciledMessages, + ); + + final renderedElements = {}; + + for (final m in reconciledMessagesMap.entries) { + renderedElements[m.key] = RenderStateElement( + message: m.value.value, + isLocal: m.value.value.author.toVeilid() != _remoteIdentityPublicKey, + reconciled: true, + reconciledOffline: m.value.isOffline); + } + for (final m in sentMessagesMap.entries) { + renderedElements.putIfAbsent( + m.key, + () => RenderStateElement( + message: m.value.value, + isLocal: true, + )) + ..sent = true + ..sentOffline = m.value.isOffline; + } + for (final m in unreconciledMessagesMap.entries) { + renderedElements + .putIfAbsent( + m.key, + () => RenderStateElement( + message: m.value, + isLocal: + m.value.author.toVeilid() != _remoteIdentityPublicKey, + )) + .reconciled = false; + } + for (final m in sendingMessagesMap.entries) { + renderedElements + .putIfAbsent( + m.key, + () => RenderStateElement( + message: m.value, + isLocal: true, + )) + .sent = false; + } + + // Render the state + final messageKeys = renderedElements.entries + .toIList() + .sort((x, y) => x.key.compareTo(y.key)); + final renderedState = messageKeys + .map((x) => MessageState( + author: x.value.message.author.toVeilid(), + timestamp: Timestamp.fromInt64(x.key), + text: x.value.message.text, + sendState: x.value.sendState)) + .toIList(); + + // Emit the rendered state + emit(AsyncValue.data(renderedState)); + } + + void addMessage({required proto.Message message}) { + _unreconciledMessagesQueue.addSync(message); + _sendingMessagesQueue.addSync(message); + + // Update the view + _renderState(); } final WaitSet _initWait = WaitSet(); @@ -245,16 +424,15 @@ class SingleContactMessagesCubit extends Cubit { late final DHTRecordCrypto _messagesCrypto; - DHTShortArrayCubit? _localMessagesCubit; - DHTShortArrayCubit? _remoteMessagesCubit; - DHTShortArrayCubit? _reconciledChatMessagesCubit; + DHTShortArrayCubit? _sentMessagesCubit; + DHTShortArrayCubit? _rcvdMessagesCubit; + DHTShortArrayCubit? _reconciledMessagesCubit; - final StreamController<_SingleContactMessageQueueEntry> _messagesUpdateQueue; + final PersistentQueueCubit _unreconciledMessagesQueue; + final PersistentQueueCubit _sendingMessagesQueue; - StreamSubscription>>>? - _localSubscription; - StreamSubscription>>>? - _remoteSubscription; - StreamSubscription>>>? - _reconciledChatSubscription; + StreamSubscription>? _sentSubscription; + StreamSubscription>? _rcvdSubscription; + StreamSubscription>? + _reconciledSubscription; } diff --git a/lib/chat/models/message_state.dart b/lib/chat/models/message_state.dart new file mode 100644 index 0000000..c4c6ca5 --- /dev/null +++ b/lib/chat/models/message_state.dart @@ -0,0 +1,34 @@ +import 'package:change_case/change_case.dart'; +import 'package:flutter/foundation.dart'; +import 'package:freezed_annotation/freezed_annotation.dart'; +import 'package:veilid_support/veilid_support.dart'; + +part 'message_state.freezed.dart'; +part 'message_state.g.dart'; + +// Whether or not a message has been fully sent +enum MessageSendState { + // message is still being sent + sending, + // message issued has not reached the network + sent, + // message was sent and has reached the network + delivered; + + factory MessageSendState.fromJson(dynamic j) => + MessageSendState.values.byName((j as String).toCamelCase()); + String toJson() => name.toPascalCase(); +} + +@freezed +class MessageState with _$MessageState { + const factory MessageState({ + required TypedKey author, + required Timestamp timestamp, + required String text, + required MessageSendState? sendState, + }) = _MessageState; + + factory MessageState.fromJson(dynamic json) => + _$MessageStateFromJson(json as Map); +} diff --git a/lib/chat/models/message_state.freezed.dart b/lib/chat/models/message_state.freezed.dart new file mode 100644 index 0000000..3d76551 --- /dev/null +++ b/lib/chat/models/message_state.freezed.dart @@ -0,0 +1,229 @@ +// coverage:ignore-file +// GENERATED CODE - DO NOT MODIFY BY HAND +// ignore_for_file: type=lint +// ignore_for_file: unused_element, deprecated_member_use, deprecated_member_use_from_same_package, use_function_type_syntax_for_parameters, unnecessary_const, avoid_init_to_null, invalid_override_different_default_values_named, prefer_expression_function_bodies, annotate_overrides, invalid_annotation_target, unnecessary_question_mark + +part of 'message_state.dart'; + +// ************************************************************************** +// FreezedGenerator +// ************************************************************************** + +T _$identity(T value) => value; + +final _privateConstructorUsedError = UnsupportedError( + 'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#adding-getters-and-methods-to-our-models'); + +MessageState _$MessageStateFromJson(Map json) { + return _MessageState.fromJson(json); +} + +/// @nodoc +mixin _$MessageState { + Typed get author => throw _privateConstructorUsedError; + Timestamp get timestamp => throw _privateConstructorUsedError; + String get text => throw _privateConstructorUsedError; + MessageSendState? get sendState => throw _privateConstructorUsedError; + + Map toJson() => throw _privateConstructorUsedError; + @JsonKey(ignore: true) + $MessageStateCopyWith get copyWith => + throw _privateConstructorUsedError; +} + +/// @nodoc +abstract class $MessageStateCopyWith<$Res> { + factory $MessageStateCopyWith( + MessageState value, $Res Function(MessageState) then) = + _$MessageStateCopyWithImpl<$Res, MessageState>; + @useResult + $Res call( + {Typed author, + Timestamp timestamp, + String text, + MessageSendState? sendState}); +} + +/// @nodoc +class _$MessageStateCopyWithImpl<$Res, $Val extends MessageState> + implements $MessageStateCopyWith<$Res> { + _$MessageStateCopyWithImpl(this._value, this._then); + + // ignore: unused_field + final $Val _value; + // ignore: unused_field + final $Res Function($Val) _then; + + @pragma('vm:prefer-inline') + @override + $Res call({ + Object? author = null, + Object? timestamp = null, + Object? text = null, + Object? sendState = freezed, + }) { + return _then(_value.copyWith( + author: null == author + ? _value.author + : author // ignore: cast_nullable_to_non_nullable + as Typed, + timestamp: null == timestamp + ? _value.timestamp + : timestamp // ignore: cast_nullable_to_non_nullable + as Timestamp, + text: null == text + ? _value.text + : text // ignore: cast_nullable_to_non_nullable + as String, + sendState: freezed == sendState + ? _value.sendState + : sendState // ignore: cast_nullable_to_non_nullable + as MessageSendState?, + ) as $Val); + } +} + +/// @nodoc +abstract class _$$MessageStateImplCopyWith<$Res> + implements $MessageStateCopyWith<$Res> { + factory _$$MessageStateImplCopyWith( + _$MessageStateImpl value, $Res Function(_$MessageStateImpl) then) = + __$$MessageStateImplCopyWithImpl<$Res>; + @override + @useResult + $Res call( + {Typed author, + Timestamp timestamp, + String text, + MessageSendState? sendState}); +} + +/// @nodoc +class __$$MessageStateImplCopyWithImpl<$Res> + extends _$MessageStateCopyWithImpl<$Res, _$MessageStateImpl> + implements _$$MessageStateImplCopyWith<$Res> { + __$$MessageStateImplCopyWithImpl( + _$MessageStateImpl _value, $Res Function(_$MessageStateImpl) _then) + : super(_value, _then); + + @pragma('vm:prefer-inline') + @override + $Res call({ + Object? author = null, + Object? timestamp = null, + Object? text = null, + Object? sendState = freezed, + }) { + return _then(_$MessageStateImpl( + author: null == author + ? _value.author + : author // ignore: cast_nullable_to_non_nullable + as Typed, + timestamp: null == timestamp + ? _value.timestamp + : timestamp // ignore: cast_nullable_to_non_nullable + as Timestamp, + text: null == text + ? _value.text + : text // ignore: cast_nullable_to_non_nullable + as String, + sendState: freezed == sendState + ? _value.sendState + : sendState // ignore: cast_nullable_to_non_nullable + as MessageSendState?, + )); + } +} + +/// @nodoc +@JsonSerializable() +class _$MessageStateImpl with DiagnosticableTreeMixin implements _MessageState { + const _$MessageStateImpl( + {required this.author, + required this.timestamp, + required this.text, + required this.sendState}); + + factory _$MessageStateImpl.fromJson(Map json) => + _$$MessageStateImplFromJson(json); + + @override + final Typed author; + @override + final Timestamp timestamp; + @override + final String text; + @override + final MessageSendState? sendState; + + @override + String toString({DiagnosticLevel minLevel = DiagnosticLevel.info}) { + return 'MessageState(author: $author, timestamp: $timestamp, text: $text, sendState: $sendState)'; + } + + @override + void debugFillProperties(DiagnosticPropertiesBuilder properties) { + super.debugFillProperties(properties); + properties + ..add(DiagnosticsProperty('type', 'MessageState')) + ..add(DiagnosticsProperty('author', author)) + ..add(DiagnosticsProperty('timestamp', timestamp)) + ..add(DiagnosticsProperty('text', text)) + ..add(DiagnosticsProperty('sendState', sendState)); + } + + @override + bool operator ==(Object other) { + return identical(this, other) || + (other.runtimeType == runtimeType && + other is _$MessageStateImpl && + (identical(other.author, author) || other.author == author) && + (identical(other.timestamp, timestamp) || + other.timestamp == timestamp) && + (identical(other.text, text) || other.text == text) && + (identical(other.sendState, sendState) || + other.sendState == sendState)); + } + + @JsonKey(ignore: true) + @override + int get hashCode => + Object.hash(runtimeType, author, timestamp, text, sendState); + + @JsonKey(ignore: true) + @override + @pragma('vm:prefer-inline') + _$$MessageStateImplCopyWith<_$MessageStateImpl> get copyWith => + __$$MessageStateImplCopyWithImpl<_$MessageStateImpl>(this, _$identity); + + @override + Map toJson() { + return _$$MessageStateImplToJson( + this, + ); + } +} + +abstract class _MessageState implements MessageState { + const factory _MessageState( + {required final Typed author, + required final Timestamp timestamp, + required final String text, + required final MessageSendState? sendState}) = _$MessageStateImpl; + + factory _MessageState.fromJson(Map json) = + _$MessageStateImpl.fromJson; + + @override + Typed get author; + @override + Timestamp get timestamp; + @override + String get text; + @override + MessageSendState? get sendState; + @override + @JsonKey(ignore: true) + _$$MessageStateImplCopyWith<_$MessageStateImpl> get copyWith => + throw _privateConstructorUsedError; +} diff --git a/lib/chat/models/message_state.g.dart b/lib/chat/models/message_state.g.dart new file mode 100644 index 0000000..5324b93 --- /dev/null +++ b/lib/chat/models/message_state.g.dart @@ -0,0 +1,25 @@ +// GENERATED CODE - DO NOT MODIFY BY HAND + +part of 'message_state.dart'; + +// ************************************************************************** +// JsonSerializableGenerator +// ************************************************************************** + +_$MessageStateImpl _$$MessageStateImplFromJson(Map json) => + _$MessageStateImpl( + author: Typed.fromJson(json['author']), + timestamp: Timestamp.fromJson(json['timestamp']), + text: json['text'] as String, + sendState: json['send_state'] == null + ? null + : MessageSendState.fromJson(json['send_state']), + ); + +Map _$$MessageStateImplToJson(_$MessageStateImpl instance) => + { + 'author': instance.author.toJson(), + 'timestamp': instance.timestamp.toJson(), + 'text': instance.text, + 'send_state': instance.sendState?.toJson(), + }; diff --git a/lib/chat/models/models.dart b/lib/chat/models/models.dart new file mode 100644 index 0000000..2d92e01 --- /dev/null +++ b/lib/chat/models/models.dart @@ -0,0 +1 @@ +export 'message_state.dart'; diff --git a/lib/chat/views/chat_component.dart b/lib/chat/views/chat_component.dart index 35a2987..d0d3f39 100644 --- a/lib/chat/views/chat_component.dart +++ b/lib/chat/views/chat_component.dart @@ -1,5 +1,3 @@ -import 'dart:async'; - import 'package:async_tools/async_tools.dart'; import 'package:awesome_extensions/awesome_extensions.dart'; import 'package:flutter/material.dart'; @@ -99,38 +97,52 @@ class ChatComponent extends StatelessWidget { ///////////////////////////////////////////////////////////////////// - types.Message messageToChatMessage(proto.Message message) { - final isLocal = message.author == _localUserIdentityKey.toProto(); + types.Message messageToChatMessage(MessageState message) { + final isLocal = message.author == _localUserIdentityKey; + + types.Status? status; + if (message.sendState != null) { + assert(isLocal, 'send state should only be on sent messages'); + switch (message.sendState!) { + case MessageSendState.sending: + status = types.Status.sending; + case MessageSendState.sent: + status = types.Status.sent; + case MessageSendState.delivered: + status = types.Status.delivered; + } + } final textMessage = types.TextMessage( - author: isLocal ? _localUser : _remoteUser, - createdAt: (message.timestamp ~/ 1000).toInt(), - id: message.timestamp.toString(), - text: message.text, - ); + author: isLocal ? _localUser : _remoteUser, + createdAt: (message.timestamp.value ~/ BigInt.from(1000)).toInt(), + id: message.timestamp.toString(), + text: message.text, + showStatus: status != null, + status: status); return textMessage; } - Future _addMessage(proto.Message message) async { + void _addMessage(proto.Message message) { if (message.text.isEmpty) { return; } - await _messagesCubit.addMessage(message: message); + _messagesCubit.addMessage(message: message); } - Future _handleSendPressed(types.PartialText message) async { + void _handleSendPressed(types.PartialText message) { final protoMessage = proto.Message() ..author = _localUserIdentityKey.toProto() ..timestamp = Veilid.instance.now().toInt64() ..text = message.text; //..signature = signature; - await _addMessage(protoMessage); + _addMessage(protoMessage); } - Future _handleAttachmentPressed() async { - // - } + // void _handleAttachmentPressed() async { + // // + // } @override Widget build(BuildContext context) { @@ -195,10 +207,7 @@ class ChatComponent extends StatelessWidget { //onAttachmentPressed: _handleAttachmentPressed, //onMessageTap: _handleMessageTap, //onPreviewDataFetched: _handlePreviewDataFetched, - onSendPressed: (message) { - singleFuture( - this, () async => _handleSendPressed(message)); - }, + onSendPressed: _handleSendPressed, //showUserAvatars: false, //showUserNames: true, user: _localUser, diff --git a/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart b/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart index b79191b..ff903ca 100644 --- a/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart +++ b/lib/chat_list/cubits/active_conversations_bloc_map_cubit.dart @@ -85,14 +85,14 @@ class ActiveConversationsBlocMapCubit extends BlocMapCubit c.remoteConversationRecordKey.toVeilid() == key); + final contactIndex = contactList.indexWhere( + (c) => c.value.remoteConversationRecordKey.toVeilid() == key); if (contactIndex == -1) { await addState(key, AsyncValue.error('Contact not found')); return; } final contact = contactList[contactIndex]; - await _addConversation(contact: contact); + await _addConversation(contact: contact.value); } //// diff --git a/lib/chat_list/cubits/active_single_contact_chat_bloc_map_cubit.dart b/lib/chat_list/cubits/active_single_contact_chat_bloc_map_cubit.dart index 7ebbdb7..19ad150 100644 --- a/lib/chat_list/cubits/active_single_contact_chat_bloc_map_cubit.dart +++ b/lib/chat_list/cubits/active_single_contact_chat_bloc_map_cubit.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'package:async_tools/async_tools.dart'; import 'package:bloc_tools/bloc_tools.dart'; -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; @@ -16,7 +15,7 @@ import 'chat_list_cubit.dart'; // Wraps a MessagesCubit to stream the latest messages to the state // Automatically follows the state of a ActiveConversationsBlocMapCubit. class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit>, SingleContactMessagesCubit> + SingleContactMessagesState, SingleContactMessagesCubit> with StateMapFollower> { @@ -61,14 +60,14 @@ class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit c.remoteConversationRecordKey.toVeilid() == key); + final contactIndex = contactList.indexWhere( + (c) => c.value.remoteConversationRecordKey.toVeilid() == key); if (contactIndex == -1) { await addState( key, AsyncValue.error('Contact not found for conversation')); return; } - final contact = contactList[contactIndex]; + final contact = contactList[contactIndex].value; // Get the chat object for this single contact chat final chatList = _chatListCubit.state.state.asData?.value; @@ -76,13 +75,13 @@ class ActiveSingleContactChatBlocMapCubit extends BlocMapCubit c.remoteConversationRecordKey.toVeilid() == key); + final chatIndex = chatList.indexWhere( + (c) => c.value.remoteConversationRecordKey.toVeilid() == key); if (contactIndex == -1) { await addState(key, AsyncValue.error('Chat not found for conversation')); return; } - final chat = chatList[chatIndex]; + final chat = chatList[chatIndex].value; await value.when( data: (state) => _addConversationMessages( diff --git a/lib/chat_list/cubits/chat_list_cubit.dart b/lib/chat_list/cubits/chat_list_cubit.dart index 990c0d1..4a0818a 100644 --- a/lib/chat_list/cubits/chat_list_cubit.dart +++ b/lib/chat_list/cubits/chat_list_cubit.dart @@ -1,6 +1,5 @@ import 'dart:async'; -import 'package:async_tools/async_tools.dart'; import 'package:bloc_tools/bloc_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:veilid_support/veilid_support.dart'; @@ -14,7 +13,7 @@ import '../../tools/tools.dart'; ////////////////////////////////////////////////// // Mutable state for per-account chat list -typedef ChatListCubitState = BlocBusyState>>; +typedef ChatListCubitState = DHTShortArrayBusyState; class ChatListCubit extends DHTShortArrayCubit with StateMapFollowable { @@ -119,8 +118,8 @@ class ChatListCubit extends DHTShortArrayCubit // chat record now if (success && deletedItem != null) { try { - await DHTRecordPool.instance - .delete(deletedItem.reconciledChatRecord.toVeilid().recordKey); + await DHTRecordPool.instance.deleteRecord( + deletedItem.reconciledChatRecord.toVeilid().recordKey); } on Exception catch (e) { log.debug('error removing reconciled chat record: $e', e); } @@ -135,8 +134,8 @@ class ChatListCubit extends DHTShortArrayCubit return IMap(); } return IMap.fromIterable(stateValue, - keyMapper: (e) => e.remoteConversationRecordKey.toVeilid(), - valueMapper: (e) => e); + keyMapper: (e) => e.value.remoteConversationRecordKey.toVeilid(), + valueMapper: (e) => e.value); } final ActiveChatCubit activeChatCubit; diff --git a/lib/chat_list/views/chat_single_contact_list_widget.dart b/lib/chat_list/views/chat_single_contact_list_widget.dart index c1f54d8..a671011 100644 --- a/lib/chat_list/views/chat_single_contact_list_widget.dart +++ b/lib/chat_list/views/chat_single_contact_list_widget.dart @@ -20,8 +20,8 @@ class ChatSingleContactListWidget extends StatelessWidget { return contactListV.builder((context, contactList) { final contactMap = IMap.fromIterable(contactList, - keyMapper: (c) => c.remoteConversationRecordKey, - valueMapper: (c) => c); + keyMapper: (c) => c.value.remoteConversationRecordKey, + valueMapper: (c) => c.value); final chatListV = context.watch().state; return chatListV @@ -33,7 +33,7 @@ class ChatSingleContactListWidget extends StatelessWidget { child: (chatList.isEmpty) ? const EmptyChatListWidget() : SearchableList( - initialList: chatList.toList(), + initialList: chatList.map((x) => x.value).toList(), builder: (l, i, c) { final contact = contactMap[c.remoteConversationRecordKey]; @@ -47,7 +47,7 @@ class ChatSingleContactListWidget extends StatelessWidget { }, filter: (value) { final lowerValue = value.toLowerCase(); - return chatList.where((c) { + return chatList.map((x) => x.value).where((c) { final contact = contactMap[c.remoteConversationRecordKey]; if (contact == null) { diff --git a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart index 03df901..a24005a 100644 --- a/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart +++ b/lib/contact_invitation/cubits/contact_invitation_list_cubit.dart @@ -1,6 +1,5 @@ import 'dart:async'; -import 'package:async_tools/async_tools.dart'; import 'package:bloc_tools/bloc_tools.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:fixnum/fixnum.dart'; @@ -27,7 +26,7 @@ typedef GetEncryptionKeyCallback = Future Function( ////////////////////////////////////////////////// typedef ContactInvitiationListState - = BlocBusyState>>; + = DHTShortArrayBusyState; ////////////////////////////////////////////////// // Mutable state for per-account contact invitations @@ -208,13 +207,14 @@ class ContactInvitationListCubit await contactRequestInbox.tryWriteBytes(Uint8List(0)); }); try { - await pool.delete(contactRequestInbox.recordKey); + await pool.deleteRecord(contactRequestInbox.recordKey); } on Exception catch (e) { log.debug('error removing contact request inbox: $e', e); } if (!accepted) { try { - await pool.delete(deletedItem.localConversationRecordKey.toVeilid()); + await pool + .deleteRecord(deletedItem.localConversationRecordKey.toVeilid()); } on Exception catch (e) { log.debug('error removing local conversation record: $e', e); } @@ -246,7 +246,7 @@ class ContactInvitationListCubit // If we're chatting to ourselves, // we are validating an invitation we have created final isSelf = state.state.asData!.value.indexWhere((cir) => - cir.contactRequestInbox.recordKey.toVeilid() == + cir.value.contactRequestInbox.recordKey.toVeilid() == contactRequestInboxKey) != -1; @@ -315,8 +315,8 @@ class ContactInvitationListCubit return IMap(); } return IMap.fromIterable(stateValue, - keyMapper: (e) => e.contactRequestInbox.recordKey.toVeilid(), - valueMapper: (e) => e); + keyMapper: (e) => e.value.contactRequestInbox.recordKey.toVeilid(), + valueMapper: (e) => e.value); } // diff --git a/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart b/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart index bf81b4e..7c06bf7 100644 --- a/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart +++ b/lib/contact_invitation/cubits/waiting_invitations_bloc_map_cubit.dart @@ -1,6 +1,5 @@ import 'package:async_tools/async_tools.dart'; import 'package:bloc_tools/bloc_tools.dart'; -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; import 'package:veilid_support/veilid_support.dart'; import '../../account_manager/account_manager.dart'; @@ -16,10 +15,8 @@ typedef WaitingInvitationsBlocMapState class WaitingInvitationsBlocMapCubit extends BlocMapCubit, WaitingInvitationCubit> with - StateMapFollower< - BlocBusyState>>, - TypedKey, - proto.ContactInvitationRecord> { + StateMapFollower, + TypedKey, proto.ContactInvitationRecord> { WaitingInvitationsBlocMapCubit( {required this.activeAccountInfo, required this.account}); diff --git a/lib/contacts/cubits/conversation_cubit.dart b/lib/contacts/cubits/conversation_cubit.dart index 7b2dde2..7e23a99 100644 --- a/lib/contacts/cubits/conversation_cubit.dart +++ b/lib/contacts/cubits/conversation_cubit.dart @@ -173,8 +173,8 @@ class ConversationCubit extends Cubit> { await localConversationCubit.close(); final conversation = data.value; final messagesKey = conversation.messages.toVeilid(); - await pool.delete(messagesKey); - await pool.delete(_localConversationRecordKey!); + await pool.deleteRecord(messagesKey); + await pool.deleteRecord(_localConversationRecordKey!); _localConversationRecordKey = null; }); } @@ -191,8 +191,8 @@ class ConversationCubit extends Cubit> { await remoteConversationCubit.close(); final conversation = data.value; final messagesKey = conversation.messages.toVeilid(); - await pool.delete(messagesKey); - await pool.delete(_remoteConversationRecordKey!); + await pool.deleteRecord(messagesKey); + await pool.deleteRecord(_remoteConversationRecordKey!); }); } diff --git a/lib/layout/home/home_account_ready/main_pager/account_page.dart b/lib/layout/home/home_account_ready/main_pager/account_page.dart index 304d534..0d8650e 100644 --- a/lib/layout/home/home_account_ready/main_pager/account_page.dart +++ b/lib/layout/home/home_account_ready/main_pager/account_page.dart @@ -38,11 +38,14 @@ class AccountPageState extends State { final cilState = context.watch().state; final cilBusy = cilState.busy; final contactInvitationRecordList = - cilState.state.asData?.value ?? const IListConst([]); + cilState.state.asData?.value.map((x) => x.value).toIList() ?? + const IListConst([]); final ciState = context.watch().state; final ciBusy = ciState.busy; - final contactList = ciState.state.asData?.value ?? const IListConst([]); + final contactList = + ciState.state.asData?.value.map((x) => x.value).toIList() ?? + const IListConst([]); return SizedBox( child: Column(children: [ diff --git a/lib/proto/veilidchat.proto b/lib/proto/veilidchat.proto index 1727ee1..42692ac 100644 --- a/lib/proto/veilidchat.proto +++ b/lib/proto/veilidchat.proto @@ -27,8 +27,6 @@ message Attachment { } // A single message as part of a series of messages -// Messages are stored in a DHTLog -// DHT Schema: SMPL(0,1,[identityPublicKey]) message Message { // Author of the message veilid.TypedKey author = 1; @@ -53,7 +51,6 @@ message Message { // DHT Key (UnicastOutbox): localConversation // DHT Secret: None // Encryption: DH(IdentityA, IdentityB) - message Conversation { // Profile to publish to friend Profile profile = 1; diff --git a/lib/router/cubit/router_cubit.dart b/lib/router/cubit/router_cubit.dart index 83bc477..f30a617 100644 --- a/lib/router/cubit/router_cubit.dart +++ b/lib/router/cubit/router_cubit.dart @@ -16,11 +16,20 @@ import '../../veilid_processor/views/developer.dart'; part 'router_cubit.freezed.dart'; part 'router_cubit.g.dart'; -part 'router_state.dart'; final _rootNavKey = GlobalKey(debugLabel: 'rootNavKey'); final _homeNavKey = GlobalKey(debugLabel: 'homeNavKey'); +@freezed +class RouterState with _$RouterState { + const factory RouterState( + {required bool hasAnyAccount, + required bool hasActiveChat}) = _RouterState; + + factory RouterState.fromJson(dynamic json) => + _$RouterStateFromJson(json as Map); +} + class RouterCubit extends Cubit { RouterCubit(AccountRepository accountRepository) : super(RouterState( diff --git a/lib/router/cubit/router_state.dart b/lib/router/cubit/router_state.dart deleted file mode 100644 index ac60c39..0000000 --- a/lib/router/cubit/router_state.dart +++ /dev/null @@ -1,11 +0,0 @@ -part of 'router_cubit.dart'; - -@freezed -class RouterState with _$RouterState { - const factory RouterState( - {required bool hasAnyAccount, - required bool hasActiveChat}) = _RouterState; - - factory RouterState.fromJson(dynamic json) => - _$RouterStateFromJson(json as Map); -} diff --git a/packages/bloc_tools/lib/src/future_cubit.dart b/packages/bloc_tools/lib/src/future_cubit.dart index b14ac72..39be126 100644 --- a/packages/bloc_tools/lib/src/future_cubit.dart +++ b/packages/bloc_tools/lib/src/future_cubit.dart @@ -5,12 +5,20 @@ import 'package:bloc/bloc.dart'; abstract class FutureCubit extends Cubit> { FutureCubit(Future fut) : super(const AsyncValue.loading()) { - unawaited(fut.then((value) { - emit(AsyncValue.data(value)); - // ignore: avoid_types_on_closure_parameters - }, onError: (Object e, StackTrace stackTrace) { - emit(AsyncValue.error(e, stackTrace)); - })); + _initWait.add(() async => fut.then((value) { + emit(AsyncValue.data(value)); + // ignore: avoid_types_on_closure_parameters + }, onError: (Object e, StackTrace stackTrace) { + emit(AsyncValue.error(e, stackTrace)); + })); } FutureCubit.value(State state) : super(AsyncValue.data(state)); + + @override + Future close() async { + await _initWait(); + await super.close(); + } + + final WaitSet _initWait = WaitSet(); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index 0e23c5a..af16842 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -86,7 +86,7 @@ class DHTRecord { if (_open) { await close(); } - await DHTRecordPool.instance.delete(key); + await DHTRecordPool.instance.deleteRecord(key); rethrow; } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index bcdbc42..333558e 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -109,7 +109,7 @@ class OpenedRecordInfo { String get sharedDetails => shared.toString(); } -class DHTRecordPool with TableDBBacked { +class DHTRecordPool with TableDBBackedJson { DHTRecordPool._(Veilid veilid, VeilidRoutingContext routingContext) : _state = const DHTRecordPoolAllocations(), _mutex = Mutex(), @@ -150,7 +150,7 @@ class DHTRecordPool with TableDBBacked { ? DHTRecordPoolAllocations.fromJson(obj) : const DHTRecordPoolAllocations(); @override - Object? valueToJson(DHTRecordPoolAllocations val) => val.toJson(); + Object? valueToJson(DHTRecordPoolAllocations? val) => val?.toJson(); ////////////////////////////////////////////////////////////// @@ -161,7 +161,7 @@ class DHTRecordPool with TableDBBacked { final globalPool = DHTRecordPool._(Veilid.instance, routingContext); globalPool .._logger = logger - .._state = await globalPool.load(); + .._state = await globalPool.load() ?? const DHTRecordPoolAllocations(); _singleton = globalPool; } @@ -279,7 +279,7 @@ class DHTRecordPool with TableDBBacked { if (openedRecordInfo.records.isEmpty) { await _routingContext.closeDHTRecord(key); if (openedRecordInfo.shared.deleteOnClose) { - await _deleteInner(key); + await _deleteRecordInner(key); } _opened.remove(key); } @@ -316,7 +316,7 @@ class DHTRecordPool with TableDBBacked { } } - Future _deleteInner(TypedKey recordKey) async { + Future _deleteRecordInner(TypedKey recordKey) async { log('deleteDHTRecord: key=$recordKey'); // Remove this child from parents @@ -324,7 +324,7 @@ class DHTRecordPool with TableDBBacked { await _routingContext.deleteDHTRecord(recordKey); } - Future delete(TypedKey recordKey) async { + Future deleteRecord(TypedKey recordKey) async { await _mutex.protect(() async { final allDeps = _collectChildrenInner(recordKey); @@ -339,7 +339,7 @@ class DHTRecordPool with TableDBBacked { ori.shared.deleteOnClose = true; } else { // delete now - await _deleteInner(recordKey); + await _deleteRecordInner(recordKey); } }); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index dcf5cb4..5a4210f 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -69,7 +69,7 @@ class DHTShortArray { return dhtShortArray; } on Exception catch (_) { await dhtRecord.close(); - await pool.delete(dhtRecord.key); + await pool.deleteRecord(dhtRecord.key); rethrow; } } @@ -152,7 +152,7 @@ class DHTShortArray { /// Free all resources for the DHTShortArray and delete it from the DHT Future delete() async { await close(); - await DHTRecordPool.instance.delete(recordKey); + await DHTRecordPool.instance.deleteRecord(recordKey); } /// Runs a closure that guarantees the DHTShortArray @@ -212,6 +212,8 @@ class DHTShortArray { return closure(writer); }, timeout: timeout); + /// Listen to and any all changes to the structure of this short array + /// regardless of where the changes are coming from Future> listen( void Function() onChanged, ) => diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart index 52214ba..cdce828 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_cubit.dart @@ -3,11 +3,24 @@ import 'dart:async'; import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; import 'package:bloc_tools/bloc_tools.dart'; +import 'package:equatable/equatable.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:meta/meta.dart'; import '../../../veilid_support.dart'; -typedef DHTShortArrayState = AsyncValue>; +@immutable +class DHTShortArrayElementState extends Equatable { + const DHTShortArrayElementState( + {required this.value, required this.isOffline}); + final T value; + final bool isOffline; + + @override + List get props => [value, isOffline]; +} + +typedef DHTShortArrayState = AsyncValue>>; typedef DHTShortArrayBusyState = BlocBusyState>; class DHTShortArrayCubit extends Cubit> @@ -49,13 +62,19 @@ class DHTShortArrayCubit extends Cubit> Future _refreshNoWait({bool forceRefresh = false}) async => busy((emit) async => _refreshInner(emit, forceRefresh: forceRefresh)); - Future _refreshInner(void Function(AsyncValue>) emit, + Future _refreshInner(void Function(DHTShortArrayState) emit, {bool forceRefresh = false}) async { try { - final newState = (await _shortArray.operate( - (reader) => reader.getAllItems(forceRefresh: forceRefresh))) - ?.map(_decodeElement) - .toIList(); + final newState = await _shortArray.operate((reader) async { + final offlinePositions = await reader.getOfflinePositions(); + final allItems = (await reader.getAllItems(forceRefresh: forceRefresh)) + ?.indexed + .map((x) => DHTShortArrayElementState( + value: _decodeElement(x.$2), + isOffline: offlinePositions.contains(x.$1))) + .toIList(); + return allItems; + }); if (newState != null) { emit(AsyncValue.data(newState)); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 3151e6e..44e565d 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -15,6 +15,9 @@ abstract class DHTShortArrayRead { /// is specified, the network will always be checked for newer values /// rather than returning the existing locally stored copy of the elements. Future?> getAllItems({bool forceRefresh = false}); + + /// Get a list of the positions that were written offline and not flushed yet + Future> getOfflinePositions(); } extension DHTShortArrayReadExt on DHTShortArrayRead { @@ -96,6 +99,40 @@ class _DHTShortArrayRead implements DHTShortArrayRead { return out; } + /// Get a list of the positions that were written offline and not flushed yet + @override + Future> getOfflinePositions() async { + final indexOffline = {}; + final inspects = await [ + _head._headRecord.inspect(), + ..._head._linkedRecords.map((lr) => lr.inspect()) + ].wait; + + // Add to offline index + var strideOffset = 0; + for (final inspect in inspects) { + for (final r in inspect.offlineSubkeys) { + for (var i = r.low; i <= r.high; i++) { + // If this is the head record, ignore the first head subkey + if (strideOffset != 0 || i != 0) { + indexOffline.add(i + ((strideOffset == 0) ? -1 : strideOffset)); + } + } + } + strideOffset += _head._stride; + } + + // See which positions map to offline indexes + final positionOffline = {}; + for (var i = 0; i < _head._index.length; i++) { + final idx = _head._index[i]; + if (indexOffline.contains(idx)) { + positionOffline.add(i); + } + } + return positionOffline; + } + //////////////////////////////////////////////////////////////////////////// // Fields final _DHTShortArrayHead _head; diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index a91543c..af6204e 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -92,12 +92,11 @@ extension DHTShortArrayWriteExt on DHTShortArrayWrite { } //////////////////////////////////////////////////////////////////////////// -// Writer-only implementation +// Writer implementation -class _DHTShortArrayWrite implements DHTShortArrayWrite { - _DHTShortArrayWrite._(_DHTShortArrayHead head) - : _head = head, - _reader = _DHTShortArrayRead._(head); +class _DHTShortArrayWrite extends _DHTShortArrayRead + implements DHTShortArrayWrite { + _DHTShortArrayWrite._(super.head) : super._(); @override Future tryAddItem(Uint8List value) async { @@ -187,23 +186,4 @@ class _DHTShortArrayWrite implements DHTShortArrayWrite { } return (oldValue, true); } - - //////////////////////////////////////////////////////////////////////////// - // Reader passthrough - - @override - int get length => _reader.length; - - @override - Future getItem(int pos, {bool forceRefresh = false}) => - _reader.getItem(pos, forceRefresh: forceRefresh); - - @override - Future?> getAllItems({bool forceRefresh = false}) => - _reader.getAllItems(forceRefresh: forceRefresh); - - //////////////////////////////////////////////////////////////////////////// - // Fields - final _DHTShortArrayHead _head; - final _DHTShortArrayRead _reader; } diff --git a/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart b/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart index a50d893..da278d4 100644 --- a/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart +++ b/packages/veilid_support/lib/src/async_table_db_backed_cubit.dart @@ -2,48 +2,47 @@ import 'dart:async'; import 'package:async_tools/async_tools.dart'; import 'package:bloc/bloc.dart'; +import 'package:meta/meta.dart'; +import 'package:mutex/mutex.dart'; import 'table_db.dart'; -abstract class AsyncTableDBBackedCubit extends Cubit> - with TableDBBacked { +abstract class AsyncTableDBBackedCubit extends Cubit> + with TableDBBackedJson { AsyncTableDBBackedCubit() : super(const AsyncValue.loading()) { - unawaited(Future.delayed(Duration.zero, _build)); + _initWait.add(_build); + } + + @override + Future close() async { + // Ensure the init finished + await _initWait(); + // Wait for any setStates to finish + await _mutex.acquire(); + + await super.close(); } Future _build() async { try { - emit(AsyncValue.data(await load())); + await _mutex.protect(() async { + emit(AsyncValue.data(await load())); + }); } on Exception catch (e, stackTrace) { emit(AsyncValue.error(e, stackTrace)); } } - Future readyData() async { - final stateStream = stream.distinct(); - await for (final AsyncValue av in stateStream) { - final d = av.when( - data: (value) => value, loading: () => null, error: (e, s) => null); - if (d != null) { - return d; - } - final ef = av.when( - data: (value) => null, - loading: () => null, - error: Future.error); - if (ef != null) { - return ef; - } - } - return Future.error( - StateError("data never became ready in cubit '$runtimeType'")); - } - - Future setState(State newState) async { + @protected + Future setState(T? newState) async { + await _initWait(); try { emit(AsyncValue.data(await store(newState))); } on Exception catch (e, stackTrace) { emit(AsyncValue.error(e, stackTrace)); } } + + final WaitSet _initWait = WaitSet(); + final Mutex _mutex = Mutex(); } diff --git a/packages/veilid_support/lib/src/identity.dart b/packages/veilid_support/lib/src/identity.dart index 5810bc5..9d26a7d 100644 --- a/packages/veilid_support/lib/src/identity.dart +++ b/packages/veilid_support/lib/src/identity.dart @@ -93,7 +93,7 @@ extension IdentityMasterExtension on IdentityMaster { /// Deletes a master identity and the identity record under it Future delete() async { final pool = DHTRecordPool.instance; - await pool.delete(masterRecordKey); + await pool.deleteRecord(masterRecordKey); } Future get identityCrypto => @@ -111,6 +111,9 @@ extension IdentityMasterExtension on IdentityMaster { TypedKey identityPublicTypedKey() => TypedKey(kind: identityRecordKey.kind, value: identityPublicKey); + TypedKey masterPublicTypedKey() => + TypedKey(kind: identityRecordKey.kind, value: masterPublicKey); + Future validateIdentitySecret( SecretKey identitySecret) async { final cs = await identityCrypto; diff --git a/packages/veilid_support/lib/src/persistent_queue_cubit.dart b/packages/veilid_support/lib/src/persistent_queue_cubit.dart new file mode 100644 index 0000000..6cc79c1 --- /dev/null +++ b/packages/veilid_support/lib/src/persistent_queue_cubit.dart @@ -0,0 +1,194 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async_tools/async_tools.dart'; +import 'package:bloc/bloc.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:mutex/mutex.dart'; +import 'package:protobuf/protobuf.dart'; + +import 'table_db.dart'; + +class PersistentQueueCubit + extends Cubit>> with TableDBBackedFromBuffer> { + // + PersistentQueueCubit( + {required String table, + required String key, + required T Function(Uint8List) fromBuffer, + bool deleteOnClose = true}) + : _table = table, + _key = key, + _fromBuffer = fromBuffer, + _deleteOnClose = deleteOnClose, + super(const AsyncValue.loading()) { + _initWait.add(_build); + unawaited(Future.delayed(Duration.zero, () async { + await for (final elem in _syncAddController.stream) { + await addAll(elem); + } + })); + } + + @override + Future close() async { + // Ensure the init finished + await _initWait(); + + // Close the sync add stream + await _syncAddController.close(); + + // Wait for any setStates to finish + await _stateMutex.acquire(); + + // Clean up table if desired + if (_deleteOnClose) { + await delete(); + } + + await super.close(); + } + + Future _build() async { + await _stateMutex.protect(() async { + try { + emit(AsyncValue.data(await load() ?? await store(IList.empty()))); + } on Exception catch (e, stackTrace) { + emit(AsyncValue.error(e, stackTrace)); + } + }); + } + + Future _setStateInner(IList newState) async { + emit(AsyncValue.data(await store(newState))); + } + + Future add(T item) async { + await _initWait(); + await _stateMutex.protect(() async { + final queue = state.asData!.value.add(item); + await _setStateInner(queue); + }); + } + + Future addAll(IList items) async { + await _initWait(); + await _stateMutex.protect(() async { + var queue = state.asData!.value; + for (final item in items) { + queue = queue.add(item); + } + await _setStateInner(queue); + }); + } + + void addSync(T item) { + _syncAddController.sink.add([item].toIList()); + } + + void addAllSync(IList items) { + _syncAddController.sink.add(items.toIList()); + } + + Future get isEmpty async { + await _initWait(); + return state.asData!.value.isEmpty; + } + + Future get isNotEmpty async { + await _initWait(); + return state.asData!.value.isNotEmpty; + } + + Future get length async { + await _initWait(); + return state.asData!.value.length; + } + + // Future pop() async { + // await _initWait(); + // return _processingMutex.protect(() async => _stateMutex.protect(() async { + // final removedItem = Output(); + // final queue = state.asData!.value.removeAt(0, removedItem); + // await _setStateInner(queue); + // return removedItem.value; + // })); + // } + + // Future> popAll() async { + // await _initWait(); + // return _processingMutex.protect(() async => _stateMutex.protect(() async { + // final queue = state.asData!.value; + // await _setStateInner(IList.empty); + // return queue; + // })); + // } + + Future process(Future Function(IList) closure, + {int? count}) async { + await _initWait(); + // Only one processor at a time + return _processingMutex.protect(() async { + // Take 'count' items from the front of the list + final toProcess = await _stateMutex.protect(() async { + final queue = state.asData!.value; + final processCount = (count ?? queue.length).clamp(0, queue.length); + return queue.take(processCount).toIList(); + }); + + // Run the processing closure + final processCount = toProcess.length; + final out = await closure(toProcess); + + // If there was nothing to process just return + if (toProcess.isEmpty) { + return out; + } + + // If there was no exception, remove the processed items + return _stateMutex.protect(() async { + // Get the queue from the state again as items could + // have been added during processing + final queue = state.asData!.value; + final newQueue = queue.skip(processCount).toIList(); + await _setStateInner(newQueue); + return out; + }); + }); + } + + // TableDBBacked + @override + String tableKeyName() => _key; + + @override + String tableName() => _table; + + @override + IList valueFromBuffer(Uint8List bytes) { + final reader = CodedBufferReader(bytes); + var out = IList(); + while (!reader.isAtEnd()) { + out = out.add(_fromBuffer(reader.readBytesAsView())); + } + return out; + } + + @override + Uint8List valueToBuffer(IList val) { + final writer = CodedBufferWriter(); + for (final elem in val) { + writer.writeRawBytes(elem.writeToBuffer()); + } + return writer.toBuffer(); + } + + final String _table; + final String _key; + final T Function(Uint8List) _fromBuffer; + final bool _deleteOnClose; + final WaitSet _initWait = WaitSet(); + final Mutex _stateMutex = Mutex(); + final Mutex _processingMutex = Mutex(); + final StreamController> _syncAddController = StreamController(); +} diff --git a/packages/veilid_support/lib/src/table_db.dart b/packages/veilid_support/lib/src/table_db.dart index 1e09fc4..d0fb72e 100644 --- a/packages/veilid_support/lib/src/table_db.dart +++ b/packages/veilid_support/lib/src/table_db.dart @@ -1,6 +1,9 @@ import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; +import 'package:meta/meta.dart'; import 'package:veilid/veilid.dart'; Future tableScope( @@ -32,14 +35,19 @@ Future transactionScope( } } -abstract mixin class TableDBBacked { +abstract mixin class TableDBBackedJson { + @protected String tableName(); + @protected String tableKeyName(); - T valueFromJson(Object? obj); - Object? valueToJson(T val); + @protected + T? valueFromJson(Object? obj); + @protected + Object? valueToJson(T? val); /// Load things from storage - Future load() async { + @protected + Future load() async { final obj = await tableScope(tableName(), (tdb) async { final objJson = await tdb.loadStringJson(0, tableKeyName()); return valueFromJson(objJson); @@ -48,28 +56,98 @@ abstract mixin class TableDBBacked { } /// Store things to storage + @protected Future store(T obj) async { await tableScope(tableName(), (tdb) async { await tdb.storeStringJson(0, tableKeyName(), valueToJson(obj)); }); return obj; } + + /// Delete things from storage + @protected + Future delete() async { + final obj = await tableScope(tableName(), (tdb) async { + final objJson = await tdb.deleteStringJson(0, tableKeyName()); + return valueFromJson(objJson); + }); + return obj; + } } -class TableDBValue extends TableDBBacked { +abstract mixin class TableDBBackedFromBuffer { + @protected + String tableName(); + @protected + String tableKeyName(); + @protected + T valueFromBuffer(Uint8List bytes); + @protected + Uint8List valueToBuffer(T val); + + /// Load things from storage + @protected + Future load() async { + final obj = await tableScope(tableName(), (tdb) async { + final objBytes = await tdb.load(0, utf8.encode(tableKeyName())); + if (objBytes == null) { + return null; + } + return valueFromBuffer(objBytes); + }); + return obj; + } + + /// Store things to storage + @protected + Future store(T obj) async { + await tableScope(tableName(), (tdb) async { + await tdb.store(0, utf8.encode(tableKeyName()), valueToBuffer(obj)); + }); + return obj; + } + + /// Delete things from storage + @protected + Future delete() async { + final obj = await tableScope(tableName(), (tdb) async { + final objBytes = await tdb.delete(0, utf8.encode(tableKeyName())); + if (objBytes == null) { + return null; + } + return valueFromBuffer(objBytes); + }); + return obj; + } +} + +class TableDBValue extends TableDBBackedJson { TableDBValue({ required String tableName, required String tableKeyName, - required T Function(Object? obj) valueFromJson, - required Object? Function(T obj) valueToJson, + required T? Function(Object? obj) valueFromJson, + required Object? Function(T? obj) valueToJson, + required T Function() makeInitialValue, }) : _tableName = tableName, _valueFromJson = valueFromJson, _valueToJson = valueToJson, _tableKeyName = tableKeyName, - _streamController = StreamController.broadcast(); + _makeInitialValue = makeInitialValue, + _streamController = StreamController.broadcast() { + _initWait.add(() async { + await get(); + }); + } - AsyncData? get value => _value; - T get requireValue => _value!.value; + Future init() async { + await _initWait(); + } + + Future close() async { + await _initWait(); + } + + T get value => _value!.value; Stream get stream => _streamController.stream; Future get() async { @@ -77,7 +155,7 @@ class TableDBValue extends TableDBBacked { if (val != null) { return val.value; } - final loadedValue = await load(); + final loadedValue = await load() ?? await store(_makeInitialValue()); _value = AsyncData(loadedValue); return loadedValue; } @@ -88,11 +166,13 @@ class TableDBValue extends TableDBBacked { } AsyncData? _value; + final T Function() _makeInitialValue; final String _tableName; final String _tableKeyName; - final T Function(Object? obj) _valueFromJson; - final Object? Function(T obj) _valueToJson; + final T? Function(Object? obj) _valueFromJson; + final Object? Function(T? obj) _valueToJson; final StreamController _streamController; + final WaitSet _initWait = WaitSet(); ////////////////////////////////////////////////////////////// /// AsyncTableDBBacked @@ -101,7 +181,7 @@ class TableDBValue extends TableDBBacked { @override String tableKeyName() => _tableKeyName; @override - T valueFromJson(Object? obj) => _valueFromJson(obj); + T? valueFromJson(Object? obj) => _valueFromJson(obj); @override - Object? valueToJson(T val) => _valueToJson(val); + Object? valueToJson(T? val) => _valueToJson(val); } diff --git a/packages/veilid_support/lib/veilid_support.dart b/packages/veilid_support/lib/veilid_support.dart index 56db796..3cad8e0 100644 --- a/packages/veilid_support/lib/veilid_support.dart +++ b/packages/veilid_support/lib/veilid_support.dart @@ -10,6 +10,7 @@ export 'src/config.dart'; export 'src/identity.dart'; export 'src/json_tools.dart'; export 'src/memory_tools.dart'; +export 'src/persistent_queue_cubit.dart'; export 'src/protobuf_tools.dart'; export 'src/table_db.dart'; export 'src/veilid_log.dart' hide veilidLoggy;