From 8ac9a93f7226afe181be22d7948c946939590128 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 20 Apr 2024 13:43:17 -0400 Subject: [PATCH 1/3] updates and stuff --- ios/Podfile.lock | 4 +- lib/tools/state_logger.dart | 4 +- pubspec.lock | 88 ++++++++++++++++++------------------- pubspec.yaml | 22 +++++----- 4 files changed, 60 insertions(+), 58 deletions(-) diff --git a/ios/Podfile.lock b/ios/Podfile.lock index 636ef83..1411245 100644 --- a/ios/Podfile.lock +++ b/ios/Podfile.lock @@ -144,7 +144,7 @@ EXTERNAL SOURCES: SPEC CHECKSUMS: camera_avfoundation: 759172d1a77ae7be0de08fc104cfb79738b8a59e Flutter: e0871f40cf51350855a761d2e70bf5af5b9b5de7 - flutter_native_splash: 52501b97d1c0a5f898d687f1646226c1f93c56ef + flutter_native_splash: edf599c81f74d093a4daf8e17bd7a018854bc778 GoogleDataTransport: 54dee9d48d14580407f8f5fbf2f496e92437a2f2 GoogleMLKit: 2bd0dc6253c4d4f227aad460f69215a504b2980e GoogleToolboxForMac: 8bef7c7c5cf7291c687cf5354f39f9db6399ad34 @@ -160,7 +160,7 @@ SPEC CHECKSUMS: pasteboard: 982969ebaa7c78af3e6cc7761e8f5e77565d9ce0 path_provider_foundation: 3784922295ac71e43754bd15e0653ccfd36a147c PromisesObjC: c50d2056b5253dadbd6c2bea79b0674bd5a52fa4 - share_plus: c3fef564749587fc939ef86ffb283ceac0baf9f5 + share_plus: 8875f4f2500512ea181eef553c3e27dba5135aad shared_preferences_foundation: b4c3b4cddf1c21f02770737f147a3f5da9d39695 smart_auth: 4bedbc118723912d0e45a07e8ab34039c19e04f2 sqflite: 673a0e54cc04b7d6dba8d24fb8095b31c3a99eec diff --git a/lib/tools/state_logger.dart b/lib/tools/state_logger.dart index 150309c..b17727f 100644 --- a/lib/tools/state_logger.dart +++ b/lib/tools/state_logger.dart @@ -4,8 +4,10 @@ import 'loggy.dart'; const Map _blocChangeLogLevels = { 'ConnectionStateCubit': LogLevel.off, - 'ActiveConversationMessagesBlocMapCubit': LogLevel.off, + 'ActiveSingleContactChatBlocMapCubit': LogLevel.off, + 'ActiveConversationsBlocMapCubit': LogLevel.off, 'DHTShortArrayCubit': LogLevel.off, + 'PersistentQueueCubit': LogLevel.off, }; const Map _blocCreateCloseLogLevels = {}; const Map _blocErrorLogLevels = {}; diff --git a/pubspec.lock b/pubspec.lock index b6e252f..6d53fc0 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -45,10 +45,10 @@ packages: dependency: transitive description: name: args - sha256: eef6c46b622e0494a36c5a12d10d77fb4e855501a91c1b9ef9339326e58f0596 + sha256: "7cf60b9f0cc88203c5a190b4cd62a99feea42759a7fa695010eb5de1c0b2252a" url: "https://pub.dev" source: hosted - version: "2.4.2" + version: "2.5.0" async: dependency: transitive description: @@ -68,10 +68,10 @@ packages: dependency: "direct main" description: name: awesome_extensions - sha256: "7d235d64a81543a7e200a91b1149bef7d32241290fa483bae25b31be41449a7c" + sha256: c3bf11d07a69fe10ff5541717b920661c7a87a791ee182851f1c92a2d15b95a2 url: "https://pub.dev" source: hosted - version: "2.0.13" + version: "2.0.14" badges: dependency: "direct main" description: @@ -219,18 +219,18 @@ packages: dependency: transitive description: name: camera_android - sha256: ae5b9a996dfb8d77b02031b67f5500873d6402f33bd6a5283e932eef08542a51 + sha256: "7b0aba6398afa8475e2bc9115d976efb49cf8db781e922572d443795c04a4f4f" url: "https://pub.dev" source: hosted - version: "0.10.9" + version: "0.10.9+1" camera_avfoundation: dependency: transitive description: name: camera_avfoundation - sha256: "5d009ae48de1c8ab621b1c4496dadb6e2a83f3223b76c6e6a4a252414105f561" + sha256: "9dbbb253aaf201a69c40cf95571f366ca936305d2de012684e21f6f1b1433d31" url: "https://pub.dev" source: hosted - version: "0.9.15" + version: "0.9.15+4" camera_platform_interface: dependency: transitive description: @@ -371,10 +371,10 @@ packages: dependency: "direct main" description: name: cupertino_icons - sha256: d57953e10f9f8327ce64a508a355f0b1ec902193f66288e8cb5070e7c47eeb2d + sha256: ba631d1c7f7bef6b729a622b7b752645a2d076dba9976925b8f25725a30e1ee6 url: "https://pub.dev" source: hosted - version: "1.0.6" + version: "1.0.8" dart_style: dependency: transitive description: @@ -533,10 +533,10 @@ packages: dependency: transitive description: name: flutter_plugin_android_lifecycle - sha256: b068ffc46f82a55844acfa4fdbb61fad72fa2aef0905548419d97f0f95c456da + sha256: "8cf40eebf5dec866a6d1956ad7b4f7016e6c0cc69847ab946833b7d43743809f" url: "https://pub.dev" source: hosted - version: "2.0.17" + version: "2.0.19" flutter_shaders: dependency: transitive description: @@ -594,10 +594,10 @@ packages: dependency: "direct dev" description: name: freezed - sha256: "24f77b50776d4285cc4b3a1665bb79852714c09b878363efbe64788c179c4284" + sha256: a434911f643466d78462625df76fd9eb13e57348ff43fe1f77bbe909522c67a1 url: "https://pub.dev" source: hosted - version: "2.5.0" + version: "2.5.2" freezed_annotation: dependency: "direct main" description: @@ -634,10 +634,10 @@ packages: dependency: "direct main" description: name: go_router - sha256: "5ed2687bc961f33a752017ccaa7edead3e5601b28b6376a5901bf24728556b85" + sha256: "771c8feb40ad0ef639973d7ecf1b43d55ffcedb2207fd43fab030f5639e40446" url: "https://pub.dev" source: hosted - version: "13.2.2" + version: "13.2.4" graphs: dependency: transitive description: @@ -826,10 +826,10 @@ packages: dependency: "direct main" description: name: motion_toast - sha256: f3fe9f92d9956814a1aa040c22c8a6c432cfb0c9f783163d9ec64915838e4837 + sha256: "4763b2aa3499d0bf00ffd9737479b73141d0397f532542893156efb4a5ac1994" url: "https://pub.dev" source: hosted - version: "2.9.0" + version: "2.9.1" mutex: dependency: "direct main" description: @@ -889,18 +889,18 @@ packages: dependency: "direct main" description: name: path_provider - sha256: b27217933eeeba8ff24845c34003b003b2b22151de3c908d0e679e8fe1aa078b + sha256: c9e7d3a4cd1410877472158bee69963a4579f78b68c65a2b7d40d1a7a88bb161 url: "https://pub.dev" source: hosted - version: "2.1.2" + version: "2.1.3" path_provider_android: dependency: transitive description: name: path_provider_android - sha256: "477184d672607c0a3bf68fbbf601805f92ef79c82b64b4d6eb318cbca4c48668" + sha256: a248d8146ee5983446bf03ed5ea8f6533129a12b11f12057ad1b4a67a2b3b41d url: "https://pub.dev" source: hosted - version: "2.2.2" + version: "2.2.4" path_provider_foundation: dependency: transitive description: @@ -977,10 +977,10 @@ packages: dependency: transitive description: name: pointycastle - sha256: "70fe966348fe08c34bf929582f1d8247d9d9408130723206472b4687227e4333" + sha256: "79fbafed02cfdbe85ef3fd06c7f4bc2cbcba0177e61b765264853d4253b21744" url: "https://pub.dev" source: hosted - version: "3.8.0" + version: "3.9.0" pool: dependency: transitive description: @@ -1041,10 +1041,10 @@ packages: dependency: "direct main" description: name: qr_code_dart_scan - sha256: "8e9732d5b6e4e28d50647dc6d7713bf421148cadf28c768a10e9810bf6f3d87a" + sha256: "948271f8dc39ab3798341783f0ab7bfdb723054fdc9ea0928c0a5be8503ee01c" url: "https://pub.dev" source: hosted - version: "0.7.6" + version: "0.8.0" qr_flutter: dependency: "direct main" description: @@ -1113,18 +1113,18 @@ packages: dependency: "direct main" description: name: searchable_listview - sha256: "5535ea3efa4599cf23ce52870a9580b52ece5d691aa90655ebec76d5081c9592" + sha256: d8513a968bdd540cb011220a5670b23b346e04a7bcb99690a859ed58092f72a4 url: "https://pub.dev" source: hosted - version: "2.11.1" + version: "2.11.2" share_plus: dependency: "direct main" description: name: share_plus - sha256: "05ec043470319bfbabe0adbc90d3a84cbff0426b9d9f3a6e2ad3e131fa5fa629" + sha256: fb5319f3aab4c5dda5ebb92dca978179ba21f8c783ee4380910ef4c1c6824f51 url: "https://pub.dev" source: hosted - version: "8.0.2" + version: "8.0.3" share_plus_platform_interface: dependency: transitive description: @@ -1137,18 +1137,18 @@ packages: dependency: "direct main" description: name: shared_preferences - sha256: "81429e4481e1ccfb51ede496e916348668fd0921627779233bd24cc3ff6abd02" + sha256: d3bbe5553a986e83980916ded2f0b435ef2e1893dfaa29d5a7a790d0eca12180 url: "https://pub.dev" source: hosted - version: "2.2.2" + version: "2.2.3" shared_preferences_android: dependency: transitive description: name: shared_preferences_android - sha256: "8568a389334b6e83415b6aae55378e158fbc2314e074983362d20c562780fb06" + sha256: "1ee8bf911094a1b592de7ab29add6f826a7331fb854273d55918693d5364a1f2" url: "https://pub.dev" source: hosted - version: "2.2.1" + version: "2.2.2" shared_preferences_foundation: dependency: transitive description: @@ -1398,18 +1398,18 @@ packages: dependency: transitive description: name: url_launcher - sha256: "0ecc004c62fd3ed36a2ffcbe0dd9700aee63bd7532d0b642a488b1ec310f492e" + sha256: "6ce1e04375be4eed30548f10a315826fd933c1e493206eab82eed01f438c8d2e" url: "https://pub.dev" source: hosted - version: "6.2.5" + version: "6.2.6" url_launcher_android: dependency: transitive description: name: url_launcher_android - sha256: d4ed0711849dd8e33eb2dd69c25db0d0d3fdc37e0a62e629fe32f57a22db2745 + sha256: "360a6ed2027f18b73c8d98e159dda67a61b7f2e0f6ec26e86c3ada33b0621775" url: "https://pub.dev" source: hosted - version: "6.3.0" + version: "6.3.1" url_launcher_ios: dependency: transitive description: @@ -1446,10 +1446,10 @@ packages: dependency: transitive description: name: url_launcher_web - sha256: "3692a459204a33e04bc94f5fb91158faf4f2c8903281ddd82915adecdb1a901d" + sha256: "8d9e750d8c9338601e709cd0885f95825086bd8b642547f26bda435aade95d8a" url: "https://pub.dev" source: hosted - version: "2.3.0" + version: "2.3.1" url_launcher_windows: dependency: transitive description: @@ -1462,10 +1462,10 @@ packages: dependency: "direct main" description: name: uuid - sha256: cd210a09f7c18cbe5a02511718e0334de6559871052c90a90c0cca46a4aa81c8 + sha256: "814e9e88f21a176ae1359149021870e87f7cddaf633ab678a5d2b0bff7fd1ba8" url: "https://pub.dev" source: hosted - version: "4.3.3" + version: "4.4.0" vector_graphics: dependency: transitive description: @@ -1540,10 +1540,10 @@ packages: dependency: transitive description: name: web_socket_channel - sha256: "1d8e795e2a8b3730c41b8a98a2dff2e0fb57ae6f0764a1c46ec5915387d257b2" + sha256: "58c6666b342a38816b2e7e50ed0f1e261959630becd4c879c4f26bfa14aa5a42" url: "https://pub.dev" source: hosted - version: "2.4.4" + version: "2.4.5" win32: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index 3853a75..f8ac76c 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -13,7 +13,7 @@ dependencies: archive: ^3.4.10 async_tools: path: packages/async_tools - awesome_extensions: ^2.0.13 + awesome_extensions: ^2.0.14 badges: ^3.1.2 basic_utils: ^5.7.0 bloc: ^8.1.4 @@ -25,7 +25,7 @@ dependencies: circular_profile_avatar: ^2.0.5 circular_reveal_animation: ^2.0.1 cool_dropdown: ^2.1.0 - cupertino_icons: ^1.0.6 + cupertino_icons: ^1.0.8 equatable: ^2.0.5 fast_immutable_collections: ^10.2.2 fixnum: ^1.1.0 @@ -46,7 +46,7 @@ dependencies: flutter_translate: ^4.0.4 form_builder_validators: ^9.1.0 freezed_annotation: ^2.4.1 - go_router: ^13.2.2 + go_router: ^13.2.4 hydrated_bloc: ^9.1.5 image: ^4.1.7 intl: ^0.18.1 @@ -54,30 +54,30 @@ dependencies: loggy: ^2.0.3 meta: ^1.11.0 mobile_scanner: ^4.0.1 - motion_toast: ^2.9.0 + motion_toast: ^2.9.1 mutex: path: packages/mutex pasteboard: ^0.2.0 path: ^1.9.0 - path_provider: ^2.1.2 + path_provider: ^2.1.3 pinput: ^4.0.0 preload_page_view: ^0.2.0 protobuf: ^3.1.0 provider: ^6.1.2 - qr_code_dart_scan: ^0.7.6 + qr_code_dart_scan: ^0.8.0 qr_flutter: ^4.1.0 quickalert: ^1.1.0 radix_colors: ^1.0.4 reorderable_grid: ^1.0.10 - searchable_listview: ^2.11.1 - share_plus: ^8.0.2 - shared_preferences: ^2.2.2 + searchable_listview: ^2.11.2 + share_plus: ^8.0.3 + shared_preferences: ^2.2.3 signal_strength_indicator: ^0.4.1 split_view: ^3.2.1 stack_trace: ^1.11.1 stream_transform: ^2.1.0 stylish_bottom_bar: ^1.1.0 - uuid: ^4.3.3 + uuid: ^4.4.0 veilid: # veilid: ^0.0.1 path: ../veilid/veilid-flutter @@ -89,7 +89,7 @@ dependencies: dev_dependencies: build_runner: ^2.4.9 - freezed: ^2.5.0 + freezed: ^2.5.2 icons_launcher: ^2.1.7 json_serializable: ^6.7.1 lint_hard: ^4.0.0 From 37b1717a710745277f3a87da66e5c64ab7319cb5 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 20 Apr 2024 21:24:03 -0400 Subject: [PATCH 2/3] fix persistent queue --- .../cubits/single_contact_messages_cubit.dart | 147 ++++++------- .../lib/src/persistent_queue.dart | 195 ++++++++++++++++++ .../lib/src/persistent_queue_cubit.dart | 194 ----------------- .../veilid_support/lib/veilid_support.dart | 2 +- 4 files changed, 259 insertions(+), 279 deletions(-) create mode 100644 packages/veilid_support/lib/src/persistent_queue.dart delete mode 100644 packages/veilid_support/lib/src/persistent_queue_cubit.dart diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 85eb9d6..72a6a90 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -63,14 +63,6 @@ class SingleContactMessagesCubit extends Cubit { _remoteConversationRecordKey = remoteConversationRecordKey, _remoteMessagesRecordKey = remoteMessagesRecordKey, _reconciledChatRecord = reconciledChatRecord, - _unreconciledMessagesQueue = PersistentQueueCubit( - table: 'SingleContactUnreconciledMessages', - key: remoteConversationRecordKey.toString(), - fromBuffer: proto.Message.fromBuffer), - _sendingMessagesQueue = PersistentQueueCubit( - table: 'SingleContactSendingMessages', - key: remoteConversationRecordKey.toString(), - fromBuffer: proto.Message.fromBuffer), super(const AsyncValue.loading()) { // Async Init _initWait.add(_init); @@ -93,6 +85,20 @@ class SingleContactMessagesCubit extends Cubit { // Initialize everything Future _init() async { + // Late initialization of queues with closures + _unreconciledMessagesQueue = PersistentQueue( + table: 'SingleContactUnreconciledMessages', + key: _remoteConversationRecordKey.toString(), + fromBuffer: proto.Message.fromBuffer, + closure: _processUnreconciledMessages, + ); + _sendingMessagesQueue = PersistentQueue( + table: 'SingleContactSendingMessages', + key: _remoteConversationRecordKey.toString(), + fromBuffer: proto.Message.fromBuffer, + closure: _processSendingMessages, + ); + // Make crypto await _initMessagesCrypto(); @@ -104,32 +110,6 @@ class SingleContactMessagesCubit extends Cubit { // Remote messages key await _initRcvdMessagesCubit(); - - // Unreconciled messages processing queue listener - Future.delayed(Duration.zero, () async { - await for (final entry in _unreconciledMessagesQueue.stream) { - final data = entry.asData; - if (data != null && data.value.isNotEmpty) { - // Process data using recoverable processing mechanism - await _unreconciledMessagesQueue.process((messages) async { - await _processUnreconciledMessages(data.value); - }); - } - } - }); - - // Sending messages processing queue listener - Future.delayed(Duration.zero, () async { - await for (final entry in _sendingMessagesQueue.stream) { - final data = entry.asData; - if (data != null && data.value.isNotEmpty) { - // Process data using recoverable processing mechanism - await _sendingMessagesQueue.process((messages) async { - await _processSendingMessages(data.value); - }); - } - } - }); } // Make crypto @@ -145,8 +125,8 @@ class SingleContactMessagesCubit extends Cubit { _sentMessagesCubit = DHTShortArrayCubit( open: () async => DHTShortArray.openWrite( _localMessagesRecordKey, writer, - debugName: - 'SingleContactMessagesCubit::_initSentMessagesCubit::SentMessages', + debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::' + 'SentMessages', parent: _localConversationRecordKey, crypto: _messagesCrypto), decodeElement: proto.Message.fromBuffer); @@ -176,7 +156,8 @@ class SingleContactMessagesCubit extends Cubit { _reconciledMessagesCubit = DHTShortArrayCubit( open: () async => DHTShortArray.openOwned(_reconciledChatRecord, - debugName: 'SingleContactMessagesCubit::_initReconciledMessages::' + debugName: + 'SingleContactMessagesCubit::_initReconciledMessagesCubit::' 'ReconciledMessages', parent: accountRecordKey), decodeElement: proto.Message.fromBuffer); @@ -185,34 +166,35 @@ class SingleContactMessagesCubit extends Cubit { _updateReconciledMessagesState(_reconciledMessagesCubit!.state); } - // Called when the remote messages list gets a change + //////////////////////////////////////////////////////////////////////////// + + // Called when the sent messages cubit gets a change + // This will re-render when messages are sent from another machine + void _updateSentMessagesState( + DHTShortArrayBusyState avmessages) { + final sentMessages = avmessages.state.asData?.value; + if (sentMessages == null) { + return; + } + // Don't reconcile, the sending machine will have already added + // to the reconciliation queue on that machine + + // Update the view + _renderState(); + } + + // Called when the received messages cubit gets a change void _updateRcvdMessagesState( DHTShortArrayBusyState avmessages) { - final remoteMessages = avmessages.state.asData?.value; - if (remoteMessages == null) { + final rcvdMessages = avmessages.state.asData?.value; + if (rcvdMessages == null) { return; } // Add remote messages updates to queue to process asynchronously // Ignore offline state because remote messages are always fully delivered // This may happen once per client but should be idempotent - _unreconciledMessagesQueue - .addAllSync(remoteMessages.map((x) => x.value).toIList()); - - // Update the view - _renderState(); - } - - // Called when the send messages list gets a change - // This will re-render when messages are sent from another machine - void _updateSentMessagesState( - DHTShortArrayBusyState avmessages) { - final remoteMessages = avmessages.state.asData?.value; - if (remoteMessages == null) { - return; - } - // Don't reconcile, the sending machine will have already added - // to the reconciliation queue on that machine + _unreconciledMessagesQueue.addAllSync(rcvdMessages.map((x) => x.value)); // Update the view _renderState(); @@ -227,6 +209,25 @@ class SingleContactMessagesCubit extends Cubit { _renderState(); } + // Async process to reconcile messages sent or received in the background + Future _processUnreconciledMessages( + IList messages) async { + await _reconciledMessagesCubit! + .operateWrite((reconciledMessagesWriter) async { + await _reconcileMessagesInner( + reconciledMessagesWriter: reconciledMessagesWriter, + messages: messages); + }); + } + + // Async process to send messages in the background + Future _processSendingMessages(IList messages) async { + for (final message in messages) { + await _sentMessagesCubit!.operateWriteEventual( + (writer) => writer.tryAddItem(message.writeToBuffer())); + } + } + Future _reconcileMessagesInner( {required DHTShortArrayWrite reconciledMessagesWriter, required IList messages}) async { @@ -288,25 +289,6 @@ class SingleContactMessagesCubit extends Cubit { } } - // Async process to reconcile messages sent or received in the background - Future _processUnreconciledMessages( - IList messages) async { - await _reconciledMessagesCubit! - .operateWrite((reconciledMessagesWriter) async { - await _reconcileMessagesInner( - reconciledMessagesWriter: reconciledMessagesWriter, - messages: messages); - }); - } - - // Async process to send messages in the background - Future _processSendingMessages(IList messages) async { - for (final message in messages) { - await _sentMessagesCubit!.operateWriteEventual( - (writer) => writer.tryAddItem(message.writeToBuffer())); - } - } - // Produce a state for this cubit from the input cubits and queues void _renderState() { // Get all reconciled messages @@ -315,15 +297,12 @@ class SingleContactMessagesCubit extends Cubit { // Get all sent messages final sentMessages = _sentMessagesCubit?.state.state.asData?.value; // Get all items in the unreconciled queue - final unreconciledMessages = _unreconciledMessagesQueue.state.asData?.value; + final unreconciledMessages = _unreconciledMessagesQueue.queue; // Get all items in the unsent queue - final sendingMessages = _sendingMessagesQueue.state.asData?.value; + final sendingMessages = _sendingMessagesQueue.queue; // If we aren't ready to render a state, say we're loading - if (reconciledMessages == null || - sentMessages == null || - unreconciledMessages == null || - sendingMessages == null) { + if (reconciledMessages == null || sentMessages == null) { emit(const AsyncLoading()); return; } @@ -428,8 +407,8 @@ class SingleContactMessagesCubit extends Cubit { DHTShortArrayCubit? _rcvdMessagesCubit; DHTShortArrayCubit? _reconciledMessagesCubit; - final PersistentQueueCubit _unreconciledMessagesQueue; - final PersistentQueueCubit _sendingMessagesQueue; + late final PersistentQueue _unreconciledMessagesQueue; + late final PersistentQueue _sendingMessagesQueue; StreamSubscription>? _sentSubscription; StreamSubscription>? _rcvdSubscription; diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart new file mode 100644 index 0000000..fb76a10 --- /dev/null +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -0,0 +1,195 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async_tools/async_tools.dart'; +import 'package:fast_immutable_collections/fast_immutable_collections.dart'; +import 'package:mutex/mutex.dart'; +import 'package:protobuf/protobuf.dart'; + +import 'table_db.dart'; + +class PersistentQueue + /*extends Cubit>>*/ with + TableDBBackedFromBuffer> { + // + PersistentQueue( + {required String table, + required String key, + required T Function(Uint8List) fromBuffer, + required Future Function(IList) closure, + bool deleteOnClose = true}) + : _table = table, + _key = key, + _fromBuffer = fromBuffer, + _closure = closure, + _deleteOnClose = deleteOnClose { + _initWait.add(_init); + } + + Future close() async { + // Ensure the init finished + await _initWait(); + + // Close the sync add stream + await _syncAddController.close(); + + // Stop the processing trigger + await _queueReady.close(); + + // Wait for any setStates to finish + await _queueMutex.acquire(); + + // Clean up table if desired + if (_deleteOnClose) { + await delete(); + } + } + + Future _init() async { + // Start the processor + unawaited(Future.delayed(Duration.zero, () async { + await _initWait(); + await for (final _ in _queueReady.stream) { + await _process(); + } + })); + + // Start the sync add controller + unawaited(Future.delayed(Duration.zero, () async { + await _initWait(); + await for (final elem in _syncAddController.stream) { + await addAll(elem); + } + })); + + // Load the queue if we have one + await _queueMutex.protect(() async { + _queue = await load() ?? await store(IList.empty()); + }); + } + + Future _updateQueueInner(IList newQueue) async { + _queue = await store(newQueue); + if (_queue.isNotEmpty) { + _queueReady.sink.add(null); + } + } + + Future add(T item) async { + await _initWait(); + await _queueMutex.protect(() async { + final newQueue = _queue.add(item); + await _updateQueueInner(newQueue); + }); + } + + Future addAll(Iterable items) async { + await _initWait(); + await _queueMutex.protect(() async { + final newQueue = _queue.addAll(items); + await _updateQueueInner(newQueue); + }); + } + + void addSync(T item) { + _syncAddController.sink.add([item]); + } + + void addAllSync(Iterable items) { + _syncAddController.sink.add(items); + } + + // Future get isEmpty async { + // await _initWait(); + // return state.asData!.value.isEmpty; + // } + + // Future get isNotEmpty async { + // await _initWait(); + // return state.asData!.value.isNotEmpty; + // } + + // Future get length async { + // await _initWait(); + // return state.asData!.value.length; + // } + + // Future pop() async { + // await _initWait(); + // return _processingMutex.protect(() async => _stateMutex.protect(() async { + // final removedItem = Output(); + // final queue = state.asData!.value.removeAt(0, removedItem); + // await _setStateInner(queue); + // return removedItem.value; + // })); + // } + + // Future> popAll() async { + // await _initWait(); + // return _processingMutex.protect(() async => _stateMutex.protect(() async { + // final queue = state.asData!.value; + // await _setStateInner(IList.empty); + // return queue; + // })); + // } + + Future _process() async { + // Take a copy of the current queue + // (doesn't need queue mutex because this is a sync operation) + final toProcess = _queue; + final processCount = toProcess.length; + if (processCount == 0) { + return; + } + + // Run the processing closure + await _closure(toProcess); + + // If there was no exception, remove the processed items + await _queueMutex.protect(() async { + // Get the queue from the state again as items could + // have been added during processing + final newQueue = _queue.skip(processCount).toIList(); + await _updateQueueInner(newQueue); + }); + } + + IList get queue => _queue; + + // TableDBBacked + @override + String tableKeyName() => _key; + + @override + String tableName() => _table; + + @override + IList valueFromBuffer(Uint8List bytes) { + final reader = CodedBufferReader(bytes); + var out = IList(); + while (!reader.isAtEnd()) { + out = out.add(_fromBuffer(reader.readBytesAsView())); + } + return out; + } + + @override + Uint8List valueToBuffer(IList val) { + final writer = CodedBufferWriter(); + for (final elem in val) { + writer.writeRawBytes(elem.writeToBuffer()); + } + return writer.toBuffer(); + } + + final String _table; + final String _key; + final T Function(Uint8List) _fromBuffer; + final bool _deleteOnClose; + final WaitSet _initWait = WaitSet(); + final Mutex _queueMutex = Mutex(); + IList _queue = IList.empty(); + final StreamController> _syncAddController = StreamController(); + final StreamController _queueReady = StreamController(); + final Future Function(IList) _closure; +} diff --git a/packages/veilid_support/lib/src/persistent_queue_cubit.dart b/packages/veilid_support/lib/src/persistent_queue_cubit.dart deleted file mode 100644 index 6cc79c1..0000000 --- a/packages/veilid_support/lib/src/persistent_queue_cubit.dart +++ /dev/null @@ -1,194 +0,0 @@ -import 'dart:async'; -import 'dart:typed_data'; - -import 'package:async_tools/async_tools.dart'; -import 'package:bloc/bloc.dart'; -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:mutex/mutex.dart'; -import 'package:protobuf/protobuf.dart'; - -import 'table_db.dart'; - -class PersistentQueueCubit - extends Cubit>> with TableDBBackedFromBuffer> { - // - PersistentQueueCubit( - {required String table, - required String key, - required T Function(Uint8List) fromBuffer, - bool deleteOnClose = true}) - : _table = table, - _key = key, - _fromBuffer = fromBuffer, - _deleteOnClose = deleteOnClose, - super(const AsyncValue.loading()) { - _initWait.add(_build); - unawaited(Future.delayed(Duration.zero, () async { - await for (final elem in _syncAddController.stream) { - await addAll(elem); - } - })); - } - - @override - Future close() async { - // Ensure the init finished - await _initWait(); - - // Close the sync add stream - await _syncAddController.close(); - - // Wait for any setStates to finish - await _stateMutex.acquire(); - - // Clean up table if desired - if (_deleteOnClose) { - await delete(); - } - - await super.close(); - } - - Future _build() async { - await _stateMutex.protect(() async { - try { - emit(AsyncValue.data(await load() ?? await store(IList.empty()))); - } on Exception catch (e, stackTrace) { - emit(AsyncValue.error(e, stackTrace)); - } - }); - } - - Future _setStateInner(IList newState) async { - emit(AsyncValue.data(await store(newState))); - } - - Future add(T item) async { - await _initWait(); - await _stateMutex.protect(() async { - final queue = state.asData!.value.add(item); - await _setStateInner(queue); - }); - } - - Future addAll(IList items) async { - await _initWait(); - await _stateMutex.protect(() async { - var queue = state.asData!.value; - for (final item in items) { - queue = queue.add(item); - } - await _setStateInner(queue); - }); - } - - void addSync(T item) { - _syncAddController.sink.add([item].toIList()); - } - - void addAllSync(IList items) { - _syncAddController.sink.add(items.toIList()); - } - - Future get isEmpty async { - await _initWait(); - return state.asData!.value.isEmpty; - } - - Future get isNotEmpty async { - await _initWait(); - return state.asData!.value.isNotEmpty; - } - - Future get length async { - await _initWait(); - return state.asData!.value.length; - } - - // Future pop() async { - // await _initWait(); - // return _processingMutex.protect(() async => _stateMutex.protect(() async { - // final removedItem = Output(); - // final queue = state.asData!.value.removeAt(0, removedItem); - // await _setStateInner(queue); - // return removedItem.value; - // })); - // } - - // Future> popAll() async { - // await _initWait(); - // return _processingMutex.protect(() async => _stateMutex.protect(() async { - // final queue = state.asData!.value; - // await _setStateInner(IList.empty); - // return queue; - // })); - // } - - Future process(Future Function(IList) closure, - {int? count}) async { - await _initWait(); - // Only one processor at a time - return _processingMutex.protect(() async { - // Take 'count' items from the front of the list - final toProcess = await _stateMutex.protect(() async { - final queue = state.asData!.value; - final processCount = (count ?? queue.length).clamp(0, queue.length); - return queue.take(processCount).toIList(); - }); - - // Run the processing closure - final processCount = toProcess.length; - final out = await closure(toProcess); - - // If there was nothing to process just return - if (toProcess.isEmpty) { - return out; - } - - // If there was no exception, remove the processed items - return _stateMutex.protect(() async { - // Get the queue from the state again as items could - // have been added during processing - final queue = state.asData!.value; - final newQueue = queue.skip(processCount).toIList(); - await _setStateInner(newQueue); - return out; - }); - }); - } - - // TableDBBacked - @override - String tableKeyName() => _key; - - @override - String tableName() => _table; - - @override - IList valueFromBuffer(Uint8List bytes) { - final reader = CodedBufferReader(bytes); - var out = IList(); - while (!reader.isAtEnd()) { - out = out.add(_fromBuffer(reader.readBytesAsView())); - } - return out; - } - - @override - Uint8List valueToBuffer(IList val) { - final writer = CodedBufferWriter(); - for (final elem in val) { - writer.writeRawBytes(elem.writeToBuffer()); - } - return writer.toBuffer(); - } - - final String _table; - final String _key; - final T Function(Uint8List) _fromBuffer; - final bool _deleteOnClose; - final WaitSet _initWait = WaitSet(); - final Mutex _stateMutex = Mutex(); - final Mutex _processingMutex = Mutex(); - final StreamController> _syncAddController = StreamController(); -} diff --git a/packages/veilid_support/lib/veilid_support.dart b/packages/veilid_support/lib/veilid_support.dart index 3cad8e0..fcbbaf4 100644 --- a/packages/veilid_support/lib/veilid_support.dart +++ b/packages/veilid_support/lib/veilid_support.dart @@ -10,7 +10,7 @@ export 'src/config.dart'; export 'src/identity.dart'; export 'src/json_tools.dart'; export 'src/memory_tools.dart'; -export 'src/persistent_queue_cubit.dart'; +export 'src/persistent_queue.dart'; export 'src/protobuf_tools.dart'; export 'src/table_db.dart'; export 'src/veilid_log.dart' hide veilidLoggy; From 64d0019e6e50fe20324336e77a323a647aaf8db7 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 21 Apr 2024 22:16:22 -0400 Subject: [PATCH 3/3] watchvalue fixes --- assets/i18n/en.json | 1 + .../cubits/single_contact_messages_cubit.dart | 10 ++++------ .../views/invitation_dialog.dart | 9 +++++++-- .../dht_support/src/dht_record/dht_record.dart | 16 ++++++++-------- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/assets/i18n/en.json b/assets/i18n/en.json index 2a1e4ac..f074031 100644 --- a/assets/i18n/en.json +++ b/assets/i18n/en.json @@ -96,6 +96,7 @@ "failed_to_accept": "Failed to accept contact invitation", "failed_to_reject": "Failed to reject contact invitation", "invalid_invitation": "Invalid invitation", + "try_again_online": "Invitation could not be reached, try again when online", "protected_with_pin": "Contact invitation is protected with a PIN", "protected_with_password": "Contact invitation is protected with a password", "invalid_pin": "Invalid PIN", diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 72a6a90..41c872d 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -23,13 +23,11 @@ class RenderStateElement { if (!isLocal) { return null; } - if (reconciled && sent) { - if (!reconciledOffline && !sentOffline) { - return MessageSendState.delivered; - } - return MessageSendState.sent; - } + if (sent && !sentOffline) { + return MessageSendState.delivered; + } + if (reconciled && !reconciledOffline) { return MessageSendState.sent; } return MessageSendState.sending; diff --git a/lib/contact_invitation/views/invitation_dialog.dart b/lib/contact_invitation/views/invitation_dialog.dart index 60d8784..2f1bd1c 100644 --- a/lib/contact_invitation/views/invitation_dialog.dart +++ b/lib/contact_invitation/views/invitation_dialog.dart @@ -224,8 +224,13 @@ class InvitationDialogState extends State { _validInvitation = null; widget.onValidationFailed(); }); - } on VeilidAPIException { - final errorText = translate('invitation_dialog.invalid_invitation'); + } on VeilidAPIException catch (e) { + late final String errorText; + if (e is VeilidAPIExceptionTryAgain) { + errorText = translate('invitation_dialog.try_again_online'); + } else { + errorText = translate('invitation_dialog.invalid_invitation'); + } if (mounted) { showErrorToast(context, errorText); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index af16842..f7df9a9 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -390,14 +390,14 @@ class DHTRecord { // range we care about, don't pass it through final overlappedFirstSubkey = overlappedSubkeys.firstSubkey; final updateFirstSubkey = subkeys.firstSubkey; - final updatedData = (overlappedFirstSubkey != null && - updateFirstSubkey != null && - overlappedFirstSubkey == updateFirstSubkey) - ? data - : null; - // Report only watched subkeys - watchController?.add(DHTRecordWatchChange( - local: local, data: updatedData, subkeys: overlappedSubkeys)); + if (overlappedFirstSubkey != null && updateFirstSubkey != null) { + final updatedData = + overlappedFirstSubkey == updateFirstSubkey ? data : null; + + // Report only watched subkeys + watchController?.add(DHTRecordWatchChange( + local: local, data: updatedData, subkeys: overlappedSubkeys)); + } } } }