diff --git a/lib/chat/cubits/reconciliation/message_reconciliation.dart b/lib/chat/cubits/reconciliation/message_reconciliation.dart index 683b46d..f6d46c3 100644 --- a/lib/chat/cubits/reconciliation/message_reconciliation.dart +++ b/lib/chat/cubits/reconciliation/message_reconciliation.dart @@ -32,11 +32,10 @@ class MessageReconciliation { final activeInputQueues = await _updateAuthorInputQueues(); // Process all input queues together - await _outputCubit - .operate((reconciledArray) async => _reconcileInputQueues( - reconciledArray: reconciledArray, - activeInputQueues: activeInputQueues, - )); + await _outputCubit.operate((reconciledArray) => _reconcileInputQueues( + reconciledArray: reconciledArray, + activeInputQueues: activeInputQueues, + )); }); } @@ -273,5 +272,5 @@ class MessageReconciliation { final TableDBArrayProtobufCubit _outputCubit; final void Function(Object, StackTrace?) _onError; - static const int _maxReconcileChunk = 65536; + static const _maxReconcileChunk = 65536; } diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 77b1bb9..0e20229 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -102,7 +102,7 @@ class SingleContactMessagesCubit extends Cubit { } // Initialize everything - Future _init(Completer _cancel) async { + Future _init(Completer _) async { _unsentMessagesQueue = PersistentQueue( table: 'SingleContactUnsentMessages', key: _remoteConversationRecordKey.toString(), @@ -126,6 +126,9 @@ class SingleContactMessagesCubit extends Cubit { // Command execution background process _commandRunnerFut = Future.delayed(Duration.zero, _commandRunner); + + // Run reconciliation once for all input queues + _reconciliation.reconcileMessages(null); } // Make crypto @@ -198,7 +201,7 @@ class SingleContactMessagesCubit extends Cubit { }); } - Future _makeLocalMessagesCrypto() async => + Future _makeLocalMessagesCrypto() => VeilidCryptoPrivate.fromTypedKey( _accountInfo.userLogin!.identitySecret, 'tabledb'); @@ -210,7 +213,7 @@ class SingleContactMessagesCubit extends Cubit { final crypto = await _makeLocalMessagesCrypto(); _reconciledMessagesCubit = TableDBArrayProtobufCubit( - open: () async => TableDBArrayProtobuf.make( + open: () => TableDBArrayProtobuf.make( table: tableName, crypto: crypto, fromBuffer: proto.ReconciledMessage.fromBuffer), diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index d9f5df2..93cdcc4 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -608,14 +608,13 @@ class _DHTLogSpine { Future watch() async { // This will update any existing watches if necessary try { - await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]); - // Update changes to the head record // xxx: check if this localChanges can be false... // xxx: Don't watch for local changes because this class already handles // xxx: notifying listeners and knows when it makes local changes _subscription ??= await _spineRecord.listen(localChanges: true, _onSpineChanged); + await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]); } on Exception { // If anything fails, try to cancel the watches await cancelWatch(); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 66b5baa..1785b28 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -482,13 +482,13 @@ class _DHTShortArrayHead { Future watch() async { // This will update any existing watches if necessary try { - await _headRecord.watch(subkeys: [ValueSubkeyRange.single(0)]); - // Update changes to the head record // Don't watch for local changes because this class already handles // notifying listeners and knows when it makes local changes _subscription ??= await _headRecord.listen(localChanges: false, _onHeadValueChanged); + + await _headRecord.watch(subkeys: [ValueSubkeyRange.single(0)]); } on Exception { // If anything fails, try to cancel the watches await cancelWatch();