fix persistent queue

This commit is contained in:
Christien Rioux 2024-04-20 21:24:03 -04:00
parent 8ac9a93f72
commit 37b1717a71
4 changed files with 259 additions and 279 deletions

View File

@ -63,14 +63,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_remoteConversationRecordKey = remoteConversationRecordKey, _remoteConversationRecordKey = remoteConversationRecordKey,
_remoteMessagesRecordKey = remoteMessagesRecordKey, _remoteMessagesRecordKey = remoteMessagesRecordKey,
_reconciledChatRecord = reconciledChatRecord, _reconciledChatRecord = reconciledChatRecord,
_unreconciledMessagesQueue = PersistentQueueCubit<proto.Message>(
table: 'SingleContactUnreconciledMessages',
key: remoteConversationRecordKey.toString(),
fromBuffer: proto.Message.fromBuffer),
_sendingMessagesQueue = PersistentQueueCubit<proto.Message>(
table: 'SingleContactSendingMessages',
key: remoteConversationRecordKey.toString(),
fromBuffer: proto.Message.fromBuffer),
super(const AsyncValue.loading()) { super(const AsyncValue.loading()) {
// Async Init // Async Init
_initWait.add(_init); _initWait.add(_init);
@ -93,6 +85,20 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Initialize everything // Initialize everything
Future<void> _init() async { Future<void> _init() async {
// Late initialization of queues with closures
_unreconciledMessagesQueue = PersistentQueue<proto.Message>(
table: 'SingleContactUnreconciledMessages',
key: _remoteConversationRecordKey.toString(),
fromBuffer: proto.Message.fromBuffer,
closure: _processUnreconciledMessages,
);
_sendingMessagesQueue = PersistentQueue<proto.Message>(
table: 'SingleContactSendingMessages',
key: _remoteConversationRecordKey.toString(),
fromBuffer: proto.Message.fromBuffer,
closure: _processSendingMessages,
);
// Make crypto // Make crypto
await _initMessagesCrypto(); await _initMessagesCrypto();
@ -104,32 +110,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Remote messages key // Remote messages key
await _initRcvdMessagesCubit(); await _initRcvdMessagesCubit();
// Unreconciled messages processing queue listener
Future.delayed(Duration.zero, () async {
await for (final entry in _unreconciledMessagesQueue.stream) {
final data = entry.asData;
if (data != null && data.value.isNotEmpty) {
// Process data using recoverable processing mechanism
await _unreconciledMessagesQueue.process((messages) async {
await _processUnreconciledMessages(data.value);
});
}
}
});
// Sending messages processing queue listener
Future.delayed(Duration.zero, () async {
await for (final entry in _sendingMessagesQueue.stream) {
final data = entry.asData;
if (data != null && data.value.isNotEmpty) {
// Process data using recoverable processing mechanism
await _sendingMessagesQueue.process((messages) async {
await _processSendingMessages(data.value);
});
}
}
});
} }
// Make crypto // Make crypto
@ -145,8 +125,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_sentMessagesCubit = DHTShortArrayCubit( _sentMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openWrite( open: () async => DHTShortArray.openWrite(
_localMessagesRecordKey, writer, _localMessagesRecordKey, writer,
debugName: debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::'
'SingleContactMessagesCubit::_initSentMessagesCubit::SentMessages', 'SentMessages',
parent: _localConversationRecordKey, parent: _localConversationRecordKey,
crypto: _messagesCrypto), crypto: _messagesCrypto),
decodeElement: proto.Message.fromBuffer); decodeElement: proto.Message.fromBuffer);
@ -176,7 +156,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_reconciledMessagesCubit = DHTShortArrayCubit( _reconciledMessagesCubit = DHTShortArrayCubit(
open: () async => DHTShortArray.openOwned(_reconciledChatRecord, open: () async => DHTShortArray.openOwned(_reconciledChatRecord,
debugName: 'SingleContactMessagesCubit::_initReconciledMessages::' debugName:
'SingleContactMessagesCubit::_initReconciledMessagesCubit::'
'ReconciledMessages', 'ReconciledMessages',
parent: accountRecordKey), parent: accountRecordKey),
decodeElement: proto.Message.fromBuffer); decodeElement: proto.Message.fromBuffer);
@ -185,34 +166,35 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_updateReconciledMessagesState(_reconciledMessagesCubit!.state); _updateReconciledMessagesState(_reconciledMessagesCubit!.state);
} }
// Called when the remote messages list gets a change ////////////////////////////////////////////////////////////////////////////
// Called when the sent messages cubit gets a change
// This will re-render when messages are sent from another machine
void _updateSentMessagesState(
DHTShortArrayBusyState<proto.Message> avmessages) {
final sentMessages = avmessages.state.asData?.value;
if (sentMessages == null) {
return;
}
// Don't reconcile, the sending machine will have already added
// to the reconciliation queue on that machine
// Update the view
_renderState();
}
// Called when the received messages cubit gets a change
void _updateRcvdMessagesState( void _updateRcvdMessagesState(
DHTShortArrayBusyState<proto.Message> avmessages) { DHTShortArrayBusyState<proto.Message> avmessages) {
final remoteMessages = avmessages.state.asData?.value; final rcvdMessages = avmessages.state.asData?.value;
if (remoteMessages == null) { if (rcvdMessages == null) {
return; return;
} }
// Add remote messages updates to queue to process asynchronously // Add remote messages updates to queue to process asynchronously
// Ignore offline state because remote messages are always fully delivered // Ignore offline state because remote messages are always fully delivered
// This may happen once per client but should be idempotent // This may happen once per client but should be idempotent
_unreconciledMessagesQueue _unreconciledMessagesQueue.addAllSync(rcvdMessages.map((x) => x.value));
.addAllSync(remoteMessages.map((x) => x.value).toIList());
// Update the view
_renderState();
}
// Called when the send messages list gets a change
// This will re-render when messages are sent from another machine
void _updateSentMessagesState(
DHTShortArrayBusyState<proto.Message> avmessages) {
final remoteMessages = avmessages.state.asData?.value;
if (remoteMessages == null) {
return;
}
// Don't reconcile, the sending machine will have already added
// to the reconciliation queue on that machine
// Update the view // Update the view
_renderState(); _renderState();
@ -227,6 +209,25 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
_renderState(); _renderState();
} }
// Async process to reconcile messages sent or received in the background
Future<void> _processUnreconciledMessages(
IList<proto.Message> messages) async {
await _reconciledMessagesCubit!
.operateWrite((reconciledMessagesWriter) async {
await _reconcileMessagesInner(
reconciledMessagesWriter: reconciledMessagesWriter,
messages: messages);
});
}
// Async process to send messages in the background
Future<void> _processSendingMessages(IList<proto.Message> messages) async {
for (final message in messages) {
await _sentMessagesCubit!.operateWriteEventual(
(writer) => writer.tryAddItem(message.writeToBuffer()));
}
}
Future<void> _reconcileMessagesInner( Future<void> _reconcileMessagesInner(
{required DHTShortArrayWrite reconciledMessagesWriter, {required DHTShortArrayWrite reconciledMessagesWriter,
required IList<proto.Message> messages}) async { required IList<proto.Message> messages}) async {
@ -288,25 +289,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
} }
} }
// Async process to reconcile messages sent or received in the background
Future<void> _processUnreconciledMessages(
IList<proto.Message> messages) async {
await _reconciledMessagesCubit!
.operateWrite((reconciledMessagesWriter) async {
await _reconcileMessagesInner(
reconciledMessagesWriter: reconciledMessagesWriter,
messages: messages);
});
}
// Async process to send messages in the background
Future<void> _processSendingMessages(IList<proto.Message> messages) async {
for (final message in messages) {
await _sentMessagesCubit!.operateWriteEventual(
(writer) => writer.tryAddItem(message.writeToBuffer()));
}
}
// Produce a state for this cubit from the input cubits and queues // Produce a state for this cubit from the input cubits and queues
void _renderState() { void _renderState() {
// Get all reconciled messages // Get all reconciled messages
@ -315,15 +297,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
// Get all sent messages // Get all sent messages
final sentMessages = _sentMessagesCubit?.state.state.asData?.value; final sentMessages = _sentMessagesCubit?.state.state.asData?.value;
// Get all items in the unreconciled queue // Get all items in the unreconciled queue
final unreconciledMessages = _unreconciledMessagesQueue.state.asData?.value; final unreconciledMessages = _unreconciledMessagesQueue.queue;
// Get all items in the unsent queue // Get all items in the unsent queue
final sendingMessages = _sendingMessagesQueue.state.asData?.value; final sendingMessages = _sendingMessagesQueue.queue;
// If we aren't ready to render a state, say we're loading // If we aren't ready to render a state, say we're loading
if (reconciledMessages == null || if (reconciledMessages == null || sentMessages == null) {
sentMessages == null ||
unreconciledMessages == null ||
sendingMessages == null) {
emit(const AsyncLoading()); emit(const AsyncLoading());
return; return;
} }
@ -428,8 +407,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
DHTShortArrayCubit<proto.Message>? _rcvdMessagesCubit; DHTShortArrayCubit<proto.Message>? _rcvdMessagesCubit;
DHTShortArrayCubit<proto.Message>? _reconciledMessagesCubit; DHTShortArrayCubit<proto.Message>? _reconciledMessagesCubit;
final PersistentQueueCubit<proto.Message> _unreconciledMessagesQueue; late final PersistentQueue<proto.Message> _unreconciledMessagesQueue;
final PersistentQueueCubit<proto.Message> _sendingMessagesQueue; late final PersistentQueue<proto.Message> _sendingMessagesQueue;
StreamSubscription<DHTShortArrayBusyState<proto.Message>>? _sentSubscription; StreamSubscription<DHTShortArrayBusyState<proto.Message>>? _sentSubscription;
StreamSubscription<DHTShortArrayBusyState<proto.Message>>? _rcvdSubscription; StreamSubscription<DHTShortArrayBusyState<proto.Message>>? _rcvdSubscription;

View File

@ -0,0 +1,195 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import 'table_db.dart';
class PersistentQueue<T extends GeneratedMessage>
/*extends Cubit<AsyncValue<IList<T>>>*/ with
TableDBBackedFromBuffer<IList<T>> {
//
PersistentQueue(
{required String table,
required String key,
required T Function(Uint8List) fromBuffer,
required Future<void> Function(IList<T>) closure,
bool deleteOnClose = true})
: _table = table,
_key = key,
_fromBuffer = fromBuffer,
_closure = closure,
_deleteOnClose = deleteOnClose {
_initWait.add(_init);
}
Future<void> close() async {
// Ensure the init finished
await _initWait();
// Close the sync add stream
await _syncAddController.close();
// Stop the processing trigger
await _queueReady.close();
// Wait for any setStates to finish
await _queueMutex.acquire();
// Clean up table if desired
if (_deleteOnClose) {
await delete();
}
}
Future<void> _init() async {
// Start the processor
unawaited(Future.delayed(Duration.zero, () async {
await _initWait();
await for (final _ in _queueReady.stream) {
await _process();
}
}));
// Start the sync add controller
unawaited(Future.delayed(Duration.zero, () async {
await _initWait();
await for (final elem in _syncAddController.stream) {
await addAll(elem);
}
}));
// Load the queue if we have one
await _queueMutex.protect(() async {
_queue = await load() ?? await store(IList<T>.empty());
});
}
Future<void> _updateQueueInner(IList<T> newQueue) async {
_queue = await store(newQueue);
if (_queue.isNotEmpty) {
_queueReady.sink.add(null);
}
}
Future<void> add(T item) async {
await _initWait();
await _queueMutex.protect(() async {
final newQueue = _queue.add(item);
await _updateQueueInner(newQueue);
});
}
Future<void> addAll(Iterable<T> items) async {
await _initWait();
await _queueMutex.protect(() async {
final newQueue = _queue.addAll(items);
await _updateQueueInner(newQueue);
});
}
void addSync(T item) {
_syncAddController.sink.add([item]);
}
void addAllSync(Iterable<T> items) {
_syncAddController.sink.add(items);
}
// Future<bool> get isEmpty async {
// await _initWait();
// return state.asData!.value.isEmpty;
// }
// Future<bool> get isNotEmpty async {
// await _initWait();
// return state.asData!.value.isNotEmpty;
// }
// Future<int> get length async {
// await _initWait();
// return state.asData!.value.length;
// }
// Future<T?> pop() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final removedItem = Output<T>();
// final queue = state.asData!.value.removeAt(0, removedItem);
// await _setStateInner(queue);
// return removedItem.value;
// }));
// }
// Future<IList<T>> popAll() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final queue = state.asData!.value;
// await _setStateInner(IList<T>.empty);
// return queue;
// }));
// }
Future<void> _process() async {
// Take a copy of the current queue
// (doesn't need queue mutex because this is a sync operation)
final toProcess = _queue;
final processCount = toProcess.length;
if (processCount == 0) {
return;
}
// Run the processing closure
await _closure(toProcess);
// If there was no exception, remove the processed items
await _queueMutex.protect(() async {
// Get the queue from the state again as items could
// have been added during processing
final newQueue = _queue.skip(processCount).toIList();
await _updateQueueInner(newQueue);
});
}
IList<T> get queue => _queue;
// TableDBBacked
@override
String tableKeyName() => _key;
@override
String tableName() => _table;
@override
IList<T> valueFromBuffer(Uint8List bytes) {
final reader = CodedBufferReader(bytes);
var out = IList<T>();
while (!reader.isAtEnd()) {
out = out.add(_fromBuffer(reader.readBytesAsView()));
}
return out;
}
@override
Uint8List valueToBuffer(IList<T> val) {
final writer = CodedBufferWriter();
for (final elem in val) {
writer.writeRawBytes(elem.writeToBuffer());
}
return writer.toBuffer();
}
final String _table;
final String _key;
final T Function(Uint8List) _fromBuffer;
final bool _deleteOnClose;
final WaitSet _initWait = WaitSet();
final Mutex _queueMutex = Mutex();
IList<T> _queue = IList<T>.empty();
final StreamController<Iterable<T>> _syncAddController = StreamController();
final StreamController<void> _queueReady = StreamController();
final Future<void> Function(IList<T>) _closure;
}

View File

@ -1,194 +0,0 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:bloc/bloc.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:mutex/mutex.dart';
import 'package:protobuf/protobuf.dart';
import 'table_db.dart';
class PersistentQueueCubit<T extends GeneratedMessage>
extends Cubit<AsyncValue<IList<T>>> with TableDBBackedFromBuffer<IList<T>> {
//
PersistentQueueCubit(
{required String table,
required String key,
required T Function(Uint8List) fromBuffer,
bool deleteOnClose = true})
: _table = table,
_key = key,
_fromBuffer = fromBuffer,
_deleteOnClose = deleteOnClose,
super(const AsyncValue.loading()) {
_initWait.add(_build);
unawaited(Future.delayed(Duration.zero, () async {
await for (final elem in _syncAddController.stream) {
await addAll(elem);
}
}));
}
@override
Future<void> close() async {
// Ensure the init finished
await _initWait();
// Close the sync add stream
await _syncAddController.close();
// Wait for any setStates to finish
await _stateMutex.acquire();
// Clean up table if desired
if (_deleteOnClose) {
await delete();
}
await super.close();
}
Future<void> _build() async {
await _stateMutex.protect(() async {
try {
emit(AsyncValue.data(await load() ?? await store(IList<T>.empty())));
} on Exception catch (e, stackTrace) {
emit(AsyncValue.error(e, stackTrace));
}
});
}
Future<void> _setStateInner(IList<T> newState) async {
emit(AsyncValue.data(await store(newState)));
}
Future<void> add(T item) async {
await _initWait();
await _stateMutex.protect(() async {
final queue = state.asData!.value.add(item);
await _setStateInner(queue);
});
}
Future<void> addAll(IList<T> items) async {
await _initWait();
await _stateMutex.protect(() async {
var queue = state.asData!.value;
for (final item in items) {
queue = queue.add(item);
}
await _setStateInner(queue);
});
}
void addSync(T item) {
_syncAddController.sink.add([item].toIList());
}
void addAllSync(IList<T> items) {
_syncAddController.sink.add(items.toIList());
}
Future<bool> get isEmpty async {
await _initWait();
return state.asData!.value.isEmpty;
}
Future<bool> get isNotEmpty async {
await _initWait();
return state.asData!.value.isNotEmpty;
}
Future<int> get length async {
await _initWait();
return state.asData!.value.length;
}
// Future<T?> pop() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final removedItem = Output<T>();
// final queue = state.asData!.value.removeAt(0, removedItem);
// await _setStateInner(queue);
// return removedItem.value;
// }));
// }
// Future<IList<T>> popAll() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final queue = state.asData!.value;
// await _setStateInner(IList<T>.empty);
// return queue;
// }));
// }
Future<R> process<R>(Future<R> Function(IList<T>) closure,
{int? count}) async {
await _initWait();
// Only one processor at a time
return _processingMutex.protect(() async {
// Take 'count' items from the front of the list
final toProcess = await _stateMutex.protect(() async {
final queue = state.asData!.value;
final processCount = (count ?? queue.length).clamp(0, queue.length);
return queue.take(processCount).toIList();
});
// Run the processing closure
final processCount = toProcess.length;
final out = await closure(toProcess);
// If there was nothing to process just return
if (toProcess.isEmpty) {
return out;
}
// If there was no exception, remove the processed items
return _stateMutex.protect(() async {
// Get the queue from the state again as items could
// have been added during processing
final queue = state.asData!.value;
final newQueue = queue.skip(processCount).toIList();
await _setStateInner(newQueue);
return out;
});
});
}
// TableDBBacked
@override
String tableKeyName() => _key;
@override
String tableName() => _table;
@override
IList<T> valueFromBuffer(Uint8List bytes) {
final reader = CodedBufferReader(bytes);
var out = IList<T>();
while (!reader.isAtEnd()) {
out = out.add(_fromBuffer(reader.readBytesAsView()));
}
return out;
}
@override
Uint8List valueToBuffer(IList<T> val) {
final writer = CodedBufferWriter();
for (final elem in val) {
writer.writeRawBytes(elem.writeToBuffer());
}
return writer.toBuffer();
}
final String _table;
final String _key;
final T Function(Uint8List) _fromBuffer;
final bool _deleteOnClose;
final WaitSet _initWait = WaitSet();
final Mutex _stateMutex = Mutex();
final Mutex _processingMutex = Mutex();
final StreamController<IList<T>> _syncAddController = StreamController();
}

View File

@ -10,7 +10,7 @@ export 'src/config.dart';
export 'src/identity.dart'; export 'src/identity.dart';
export 'src/json_tools.dart'; export 'src/json_tools.dart';
export 'src/memory_tools.dart'; export 'src/memory_tools.dart';
export 'src/persistent_queue_cubit.dart'; export 'src/persistent_queue.dart';
export 'src/protobuf_tools.dart'; export 'src/protobuf_tools.dart';
export 'src/table_db.dart'; export 'src/table_db.dart';
export 'src/veilid_log.dart' hide veilidLoggy; export 'src/veilid_log.dart' hide veilidLoggy;