eliminate race condition with listen/watch

This commit is contained in:
Christien Rioux 2025-05-18 11:33:08 -04:00
parent 34f9bea6eb
commit 61855521dc
4 changed files with 14 additions and 13 deletions

View file

@ -32,8 +32,7 @@ class MessageReconciliation {
final activeInputQueues = await _updateAuthorInputQueues(); final activeInputQueues = await _updateAuthorInputQueues();
// Process all input queues together // Process all input queues together
await _outputCubit await _outputCubit.operate((reconciledArray) => _reconcileInputQueues(
.operate((reconciledArray) async => _reconcileInputQueues(
reconciledArray: reconciledArray, reconciledArray: reconciledArray,
activeInputQueues: activeInputQueues, activeInputQueues: activeInputQueues,
)); ));
@ -273,5 +272,5 @@ class MessageReconciliation {
final TableDBArrayProtobufCubit<proto.ReconciledMessage> _outputCubit; final TableDBArrayProtobufCubit<proto.ReconciledMessage> _outputCubit;
final void Function(Object, StackTrace?) _onError; final void Function(Object, StackTrace?) _onError;
static const int _maxReconcileChunk = 65536; static const _maxReconcileChunk = 65536;
} }

View file

@ -102,7 +102,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
} }
// Initialize everything // Initialize everything
Future<void> _init(Completer<void> _cancel) async { Future<void> _init(Completer<void> _) async {
_unsentMessagesQueue = PersistentQueue<proto.Message>( _unsentMessagesQueue = PersistentQueue<proto.Message>(
table: 'SingleContactUnsentMessages', table: 'SingleContactUnsentMessages',
key: _remoteConversationRecordKey.toString(), key: _remoteConversationRecordKey.toString(),
@ -126,6 +126,9 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Command execution background process // Command execution background process
_commandRunnerFut = Future.delayed(Duration.zero, _commandRunner); _commandRunnerFut = Future.delayed(Duration.zero, _commandRunner);
// Run reconciliation once for all input queues
_reconciliation.reconcileMessages(null);
} }
// Make crypto // Make crypto
@ -198,7 +201,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
}); });
} }
Future<VeilidCrypto> _makeLocalMessagesCrypto() async => Future<VeilidCrypto> _makeLocalMessagesCrypto() =>
VeilidCryptoPrivate.fromTypedKey( VeilidCryptoPrivate.fromTypedKey(
_accountInfo.userLogin!.identitySecret, 'tabledb'); _accountInfo.userLogin!.identitySecret, 'tabledb');
@ -210,7 +213,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
final crypto = await _makeLocalMessagesCrypto(); final crypto = await _makeLocalMessagesCrypto();
_reconciledMessagesCubit = TableDBArrayProtobufCubit( _reconciledMessagesCubit = TableDBArrayProtobufCubit(
open: () async => TableDBArrayProtobuf.make( open: () => TableDBArrayProtobuf.make(
table: tableName, table: tableName,
crypto: crypto, crypto: crypto,
fromBuffer: proto.ReconciledMessage.fromBuffer), fromBuffer: proto.ReconciledMessage.fromBuffer),

View file

@ -608,14 +608,13 @@ class _DHTLogSpine {
Future<void> watch() async { Future<void> watch() async {
// This will update any existing watches if necessary // This will update any existing watches if necessary
try { try {
await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
// Update changes to the head record // Update changes to the head record
// xxx: check if this localChanges can be false... // xxx: check if this localChanges can be false...
// xxx: Don't watch for local changes because this class already handles // xxx: Don't watch for local changes because this class already handles
// xxx: notifying listeners and knows when it makes local changes // xxx: notifying listeners and knows when it makes local changes
_subscription ??= _subscription ??=
await _spineRecord.listen(localChanges: true, _onSpineChanged); await _spineRecord.listen(localChanges: true, _onSpineChanged);
await _spineRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
} on Exception { } on Exception {
// If anything fails, try to cancel the watches // If anything fails, try to cancel the watches
await cancelWatch(); await cancelWatch();

View file

@ -482,13 +482,13 @@ class _DHTShortArrayHead {
Future<void> watch() async { Future<void> watch() async {
// This will update any existing watches if necessary // This will update any existing watches if necessary
try { try {
await _headRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
// Update changes to the head record // Update changes to the head record
// Don't watch for local changes because this class already handles // Don't watch for local changes because this class already handles
// notifying listeners and knows when it makes local changes // notifying listeners and knows when it makes local changes
_subscription ??= _subscription ??=
await _headRecord.listen(localChanges: false, _onHeadValueChanged); await _headRecord.listen(localChanges: false, _onHeadValueChanged);
await _headRecord.watch(subkeys: [ValueSubkeyRange.single(0)]);
} on Exception { } on Exception {
// If anything fails, try to cancel the watches // If anything fails, try to cancel the watches
await cancelWatch(); await cancelWatch();