mirror of
https://gitlab.com/veilid/veilidchat.git
synced 2025-04-19 15:06:05 -04:00
Merge branch veilidchat:main into main
This commit is contained in:
commit
ec22bf895c
@ -96,6 +96,7 @@
|
||||
"failed_to_accept": "Failed to accept contact invitation",
|
||||
"failed_to_reject": "Failed to reject contact invitation",
|
||||
"invalid_invitation": "Invalid invitation",
|
||||
"try_again_online": "Invitation could not be reached, try again when online",
|
||||
"protected_with_pin": "Contact invitation is protected with a PIN",
|
||||
"protected_with_password": "Contact invitation is protected with a password",
|
||||
"invalid_pin": "Invalid PIN",
|
||||
|
@ -144,7 +144,7 @@ EXTERNAL SOURCES:
|
||||
SPEC CHECKSUMS:
|
||||
camera_avfoundation: 759172d1a77ae7be0de08fc104cfb79738b8a59e
|
||||
Flutter: e0871f40cf51350855a761d2e70bf5af5b9b5de7
|
||||
flutter_native_splash: 52501b97d1c0a5f898d687f1646226c1f93c56ef
|
||||
flutter_native_splash: edf599c81f74d093a4daf8e17bd7a018854bc778
|
||||
GoogleDataTransport: 54dee9d48d14580407f8f5fbf2f496e92437a2f2
|
||||
GoogleMLKit: 2bd0dc6253c4d4f227aad460f69215a504b2980e
|
||||
GoogleToolboxForMac: 8bef7c7c5cf7291c687cf5354f39f9db6399ad34
|
||||
@ -160,7 +160,7 @@ SPEC CHECKSUMS:
|
||||
pasteboard: 982969ebaa7c78af3e6cc7761e8f5e77565d9ce0
|
||||
path_provider_foundation: 3784922295ac71e43754bd15e0653ccfd36a147c
|
||||
PromisesObjC: c50d2056b5253dadbd6c2bea79b0674bd5a52fa4
|
||||
share_plus: c3fef564749587fc939ef86ffb283ceac0baf9f5
|
||||
share_plus: 8875f4f2500512ea181eef553c3e27dba5135aad
|
||||
shared_preferences_foundation: b4c3b4cddf1c21f02770737f147a3f5da9d39695
|
||||
smart_auth: 4bedbc118723912d0e45a07e8ab34039c19e04f2
|
||||
sqflite: 673a0e54cc04b7d6dba8d24fb8095b31c3a99eec
|
||||
|
@ -23,13 +23,11 @@ class RenderStateElement {
|
||||
if (!isLocal) {
|
||||
return null;
|
||||
}
|
||||
if (reconciled && sent) {
|
||||
if (!reconciledOffline && !sentOffline) {
|
||||
return MessageSendState.delivered;
|
||||
}
|
||||
return MessageSendState.sent;
|
||||
}
|
||||
|
||||
if (sent && !sentOffline) {
|
||||
return MessageSendState.delivered;
|
||||
}
|
||||
if (reconciled && !reconciledOffline) {
|
||||
return MessageSendState.sent;
|
||||
}
|
||||
return MessageSendState.sending;
|
||||
@ -63,14 +61,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
_remoteConversationRecordKey = remoteConversationRecordKey,
|
||||
_remoteMessagesRecordKey = remoteMessagesRecordKey,
|
||||
_reconciledChatRecord = reconciledChatRecord,
|
||||
_unreconciledMessagesQueue = PersistentQueueCubit<proto.Message>(
|
||||
table: 'SingleContactUnreconciledMessages',
|
||||
key: remoteConversationRecordKey.toString(),
|
||||
fromBuffer: proto.Message.fromBuffer),
|
||||
_sendingMessagesQueue = PersistentQueueCubit<proto.Message>(
|
||||
table: 'SingleContactSendingMessages',
|
||||
key: remoteConversationRecordKey.toString(),
|
||||
fromBuffer: proto.Message.fromBuffer),
|
||||
super(const AsyncValue.loading()) {
|
||||
// Async Init
|
||||
_initWait.add(_init);
|
||||
@ -93,6 +83,20 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
|
||||
// Initialize everything
|
||||
Future<void> _init() async {
|
||||
// Late initialization of queues with closures
|
||||
_unreconciledMessagesQueue = PersistentQueue<proto.Message>(
|
||||
table: 'SingleContactUnreconciledMessages',
|
||||
key: _remoteConversationRecordKey.toString(),
|
||||
fromBuffer: proto.Message.fromBuffer,
|
||||
closure: _processUnreconciledMessages,
|
||||
);
|
||||
_sendingMessagesQueue = PersistentQueue<proto.Message>(
|
||||
table: 'SingleContactSendingMessages',
|
||||
key: _remoteConversationRecordKey.toString(),
|
||||
fromBuffer: proto.Message.fromBuffer,
|
||||
closure: _processSendingMessages,
|
||||
);
|
||||
|
||||
// Make crypto
|
||||
await _initMessagesCrypto();
|
||||
|
||||
@ -104,32 +108,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
|
||||
// Remote messages key
|
||||
await _initRcvdMessagesCubit();
|
||||
|
||||
// Unreconciled messages processing queue listener
|
||||
Future.delayed(Duration.zero, () async {
|
||||
await for (final entry in _unreconciledMessagesQueue.stream) {
|
||||
final data = entry.asData;
|
||||
if (data != null && data.value.isNotEmpty) {
|
||||
// Process data using recoverable processing mechanism
|
||||
await _unreconciledMessagesQueue.process((messages) async {
|
||||
await _processUnreconciledMessages(data.value);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Sending messages processing queue listener
|
||||
Future.delayed(Duration.zero, () async {
|
||||
await for (final entry in _sendingMessagesQueue.stream) {
|
||||
final data = entry.asData;
|
||||
if (data != null && data.value.isNotEmpty) {
|
||||
// Process data using recoverable processing mechanism
|
||||
await _sendingMessagesQueue.process((messages) async {
|
||||
await _processSendingMessages(data.value);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Make crypto
|
||||
@ -145,8 +123,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
_sentMessagesCubit = DHTShortArrayCubit(
|
||||
open: () async => DHTShortArray.openWrite(
|
||||
_localMessagesRecordKey, writer,
|
||||
debugName:
|
||||
'SingleContactMessagesCubit::_initSentMessagesCubit::SentMessages',
|
||||
debugName: 'SingleContactMessagesCubit::_initSentMessagesCubit::'
|
||||
'SentMessages',
|
||||
parent: _localConversationRecordKey,
|
||||
crypto: _messagesCrypto),
|
||||
decodeElement: proto.Message.fromBuffer);
|
||||
@ -176,7 +154,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
|
||||
_reconciledMessagesCubit = DHTShortArrayCubit(
|
||||
open: () async => DHTShortArray.openOwned(_reconciledChatRecord,
|
||||
debugName: 'SingleContactMessagesCubit::_initReconciledMessages::'
|
||||
debugName:
|
||||
'SingleContactMessagesCubit::_initReconciledMessagesCubit::'
|
||||
'ReconciledMessages',
|
||||
parent: accountRecordKey),
|
||||
decodeElement: proto.Message.fromBuffer);
|
||||
@ -185,34 +164,35 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
_updateReconciledMessagesState(_reconciledMessagesCubit!.state);
|
||||
}
|
||||
|
||||
// Called when the remote messages list gets a change
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Called when the sent messages cubit gets a change
|
||||
// This will re-render when messages are sent from another machine
|
||||
void _updateSentMessagesState(
|
||||
DHTShortArrayBusyState<proto.Message> avmessages) {
|
||||
final sentMessages = avmessages.state.asData?.value;
|
||||
if (sentMessages == null) {
|
||||
return;
|
||||
}
|
||||
// Don't reconcile, the sending machine will have already added
|
||||
// to the reconciliation queue on that machine
|
||||
|
||||
// Update the view
|
||||
_renderState();
|
||||
}
|
||||
|
||||
// Called when the received messages cubit gets a change
|
||||
void _updateRcvdMessagesState(
|
||||
DHTShortArrayBusyState<proto.Message> avmessages) {
|
||||
final remoteMessages = avmessages.state.asData?.value;
|
||||
if (remoteMessages == null) {
|
||||
final rcvdMessages = avmessages.state.asData?.value;
|
||||
if (rcvdMessages == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add remote messages updates to queue to process asynchronously
|
||||
// Ignore offline state because remote messages are always fully delivered
|
||||
// This may happen once per client but should be idempotent
|
||||
_unreconciledMessagesQueue
|
||||
.addAllSync(remoteMessages.map((x) => x.value).toIList());
|
||||
|
||||
// Update the view
|
||||
_renderState();
|
||||
}
|
||||
|
||||
// Called when the send messages list gets a change
|
||||
// This will re-render when messages are sent from another machine
|
||||
void _updateSentMessagesState(
|
||||
DHTShortArrayBusyState<proto.Message> avmessages) {
|
||||
final remoteMessages = avmessages.state.asData?.value;
|
||||
if (remoteMessages == null) {
|
||||
return;
|
||||
}
|
||||
// Don't reconcile, the sending machine will have already added
|
||||
// to the reconciliation queue on that machine
|
||||
_unreconciledMessagesQueue.addAllSync(rcvdMessages.map((x) => x.value));
|
||||
|
||||
// Update the view
|
||||
_renderState();
|
||||
@ -227,6 +207,25 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
_renderState();
|
||||
}
|
||||
|
||||
// Async process to reconcile messages sent or received in the background
|
||||
Future<void> _processUnreconciledMessages(
|
||||
IList<proto.Message> messages) async {
|
||||
await _reconciledMessagesCubit!
|
||||
.operateWrite((reconciledMessagesWriter) async {
|
||||
await _reconcileMessagesInner(
|
||||
reconciledMessagesWriter: reconciledMessagesWriter,
|
||||
messages: messages);
|
||||
});
|
||||
}
|
||||
|
||||
// Async process to send messages in the background
|
||||
Future<void> _processSendingMessages(IList<proto.Message> messages) async {
|
||||
for (final message in messages) {
|
||||
await _sentMessagesCubit!.operateWriteEventual(
|
||||
(writer) => writer.tryAddItem(message.writeToBuffer()));
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _reconcileMessagesInner(
|
||||
{required DHTShortArrayWrite reconciledMessagesWriter,
|
||||
required IList<proto.Message> messages}) async {
|
||||
@ -288,25 +287,6 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
}
|
||||
}
|
||||
|
||||
// Async process to reconcile messages sent or received in the background
|
||||
Future<void> _processUnreconciledMessages(
|
||||
IList<proto.Message> messages) async {
|
||||
await _reconciledMessagesCubit!
|
||||
.operateWrite((reconciledMessagesWriter) async {
|
||||
await _reconcileMessagesInner(
|
||||
reconciledMessagesWriter: reconciledMessagesWriter,
|
||||
messages: messages);
|
||||
});
|
||||
}
|
||||
|
||||
// Async process to send messages in the background
|
||||
Future<void> _processSendingMessages(IList<proto.Message> messages) async {
|
||||
for (final message in messages) {
|
||||
await _sentMessagesCubit!.operateWriteEventual(
|
||||
(writer) => writer.tryAddItem(message.writeToBuffer()));
|
||||
}
|
||||
}
|
||||
|
||||
// Produce a state for this cubit from the input cubits and queues
|
||||
void _renderState() {
|
||||
// Get all reconciled messages
|
||||
@ -315,15 +295,12 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
// Get all sent messages
|
||||
final sentMessages = _sentMessagesCubit?.state.state.asData?.value;
|
||||
// Get all items in the unreconciled queue
|
||||
final unreconciledMessages = _unreconciledMessagesQueue.state.asData?.value;
|
||||
final unreconciledMessages = _unreconciledMessagesQueue.queue;
|
||||
// Get all items in the unsent queue
|
||||
final sendingMessages = _sendingMessagesQueue.state.asData?.value;
|
||||
final sendingMessages = _sendingMessagesQueue.queue;
|
||||
|
||||
// If we aren't ready to render a state, say we're loading
|
||||
if (reconciledMessages == null ||
|
||||
sentMessages == null ||
|
||||
unreconciledMessages == null ||
|
||||
sendingMessages == null) {
|
||||
if (reconciledMessages == null || sentMessages == null) {
|
||||
emit(const AsyncLoading());
|
||||
return;
|
||||
}
|
||||
@ -428,8 +405,8 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
|
||||
DHTShortArrayCubit<proto.Message>? _rcvdMessagesCubit;
|
||||
DHTShortArrayCubit<proto.Message>? _reconciledMessagesCubit;
|
||||
|
||||
final PersistentQueueCubit<proto.Message> _unreconciledMessagesQueue;
|
||||
final PersistentQueueCubit<proto.Message> _sendingMessagesQueue;
|
||||
late final PersistentQueue<proto.Message> _unreconciledMessagesQueue;
|
||||
late final PersistentQueue<proto.Message> _sendingMessagesQueue;
|
||||
|
||||
StreamSubscription<DHTShortArrayBusyState<proto.Message>>? _sentSubscription;
|
||||
StreamSubscription<DHTShortArrayBusyState<proto.Message>>? _rcvdSubscription;
|
||||
|
@ -224,8 +224,13 @@ class InvitationDialogState extends State<InvitationDialog> {
|
||||
_validInvitation = null;
|
||||
widget.onValidationFailed();
|
||||
});
|
||||
} on VeilidAPIException {
|
||||
final errorText = translate('invitation_dialog.invalid_invitation');
|
||||
} on VeilidAPIException catch (e) {
|
||||
late final String errorText;
|
||||
if (e is VeilidAPIExceptionTryAgain) {
|
||||
errorText = translate('invitation_dialog.try_again_online');
|
||||
} else {
|
||||
errorText = translate('invitation_dialog.invalid_invitation');
|
||||
}
|
||||
if (mounted) {
|
||||
showErrorToast(context, errorText);
|
||||
}
|
||||
|
@ -4,8 +4,10 @@ import 'loggy.dart';
|
||||
|
||||
const Map<String, LogLevel> _blocChangeLogLevels = {
|
||||
'ConnectionStateCubit': LogLevel.off,
|
||||
'ActiveConversationMessagesBlocMapCubit': LogLevel.off,
|
||||
'ActiveSingleContactChatBlocMapCubit': LogLevel.off,
|
||||
'ActiveConversationsBlocMapCubit': LogLevel.off,
|
||||
'DHTShortArrayCubit<Message>': LogLevel.off,
|
||||
'PersistentQueueCubit<Message>': LogLevel.off,
|
||||
};
|
||||
const Map<String, LogLevel> _blocCreateCloseLogLevels = {};
|
||||
const Map<String, LogLevel> _blocErrorLogLevels = {};
|
||||
|
@ -390,14 +390,14 @@ class DHTRecord {
|
||||
// range we care about, don't pass it through
|
||||
final overlappedFirstSubkey = overlappedSubkeys.firstSubkey;
|
||||
final updateFirstSubkey = subkeys.firstSubkey;
|
||||
final updatedData = (overlappedFirstSubkey != null &&
|
||||
updateFirstSubkey != null &&
|
||||
overlappedFirstSubkey == updateFirstSubkey)
|
||||
? data
|
||||
: null;
|
||||
// Report only watched subkeys
|
||||
watchController?.add(DHTRecordWatchChange(
|
||||
local: local, data: updatedData, subkeys: overlappedSubkeys));
|
||||
if (overlappedFirstSubkey != null && updateFirstSubkey != null) {
|
||||
final updatedData =
|
||||
overlappedFirstSubkey == updateFirstSubkey ? data : null;
|
||||
|
||||
// Report only watched subkeys
|
||||
watchController?.add(DHTRecordWatchChange(
|
||||
local: local, data: updatedData, subkeys: overlappedSubkeys));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
195
packages/veilid_support/lib/src/persistent_queue.dart
Normal file
195
packages/veilid_support/lib/src/persistent_queue.dart
Normal file
@ -0,0 +1,195 @@
|
||||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:async_tools/async_tools.dart';
|
||||
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
|
||||
import 'package:mutex/mutex.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
|
||||
import 'table_db.dart';
|
||||
|
||||
class PersistentQueue<T extends GeneratedMessage>
|
||||
/*extends Cubit<AsyncValue<IList<T>>>*/ with
|
||||
TableDBBackedFromBuffer<IList<T>> {
|
||||
//
|
||||
PersistentQueue(
|
||||
{required String table,
|
||||
required String key,
|
||||
required T Function(Uint8List) fromBuffer,
|
||||
required Future<void> Function(IList<T>) closure,
|
||||
bool deleteOnClose = true})
|
||||
: _table = table,
|
||||
_key = key,
|
||||
_fromBuffer = fromBuffer,
|
||||
_closure = closure,
|
||||
_deleteOnClose = deleteOnClose {
|
||||
_initWait.add(_init);
|
||||
}
|
||||
|
||||
Future<void> close() async {
|
||||
// Ensure the init finished
|
||||
await _initWait();
|
||||
|
||||
// Close the sync add stream
|
||||
await _syncAddController.close();
|
||||
|
||||
// Stop the processing trigger
|
||||
await _queueReady.close();
|
||||
|
||||
// Wait for any setStates to finish
|
||||
await _queueMutex.acquire();
|
||||
|
||||
// Clean up table if desired
|
||||
if (_deleteOnClose) {
|
||||
await delete();
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _init() async {
|
||||
// Start the processor
|
||||
unawaited(Future.delayed(Duration.zero, () async {
|
||||
await _initWait();
|
||||
await for (final _ in _queueReady.stream) {
|
||||
await _process();
|
||||
}
|
||||
}));
|
||||
|
||||
// Start the sync add controller
|
||||
unawaited(Future.delayed(Duration.zero, () async {
|
||||
await _initWait();
|
||||
await for (final elem in _syncAddController.stream) {
|
||||
await addAll(elem);
|
||||
}
|
||||
}));
|
||||
|
||||
// Load the queue if we have one
|
||||
await _queueMutex.protect(() async {
|
||||
_queue = await load() ?? await store(IList<T>.empty());
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _updateQueueInner(IList<T> newQueue) async {
|
||||
_queue = await store(newQueue);
|
||||
if (_queue.isNotEmpty) {
|
||||
_queueReady.sink.add(null);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> add(T item) async {
|
||||
await _initWait();
|
||||
await _queueMutex.protect(() async {
|
||||
final newQueue = _queue.add(item);
|
||||
await _updateQueueInner(newQueue);
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> addAll(Iterable<T> items) async {
|
||||
await _initWait();
|
||||
await _queueMutex.protect(() async {
|
||||
final newQueue = _queue.addAll(items);
|
||||
await _updateQueueInner(newQueue);
|
||||
});
|
||||
}
|
||||
|
||||
void addSync(T item) {
|
||||
_syncAddController.sink.add([item]);
|
||||
}
|
||||
|
||||
void addAllSync(Iterable<T> items) {
|
||||
_syncAddController.sink.add(items);
|
||||
}
|
||||
|
||||
// Future<bool> get isEmpty async {
|
||||
// await _initWait();
|
||||
// return state.asData!.value.isEmpty;
|
||||
// }
|
||||
|
||||
// Future<bool> get isNotEmpty async {
|
||||
// await _initWait();
|
||||
// return state.asData!.value.isNotEmpty;
|
||||
// }
|
||||
|
||||
// Future<int> get length async {
|
||||
// await _initWait();
|
||||
// return state.asData!.value.length;
|
||||
// }
|
||||
|
||||
// Future<T?> pop() async {
|
||||
// await _initWait();
|
||||
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
|
||||
// final removedItem = Output<T>();
|
||||
// final queue = state.asData!.value.removeAt(0, removedItem);
|
||||
// await _setStateInner(queue);
|
||||
// return removedItem.value;
|
||||
// }));
|
||||
// }
|
||||
|
||||
// Future<IList<T>> popAll() async {
|
||||
// await _initWait();
|
||||
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
|
||||
// final queue = state.asData!.value;
|
||||
// await _setStateInner(IList<T>.empty);
|
||||
// return queue;
|
||||
// }));
|
||||
// }
|
||||
|
||||
Future<void> _process() async {
|
||||
// Take a copy of the current queue
|
||||
// (doesn't need queue mutex because this is a sync operation)
|
||||
final toProcess = _queue;
|
||||
final processCount = toProcess.length;
|
||||
if (processCount == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Run the processing closure
|
||||
await _closure(toProcess);
|
||||
|
||||
// If there was no exception, remove the processed items
|
||||
await _queueMutex.protect(() async {
|
||||
// Get the queue from the state again as items could
|
||||
// have been added during processing
|
||||
final newQueue = _queue.skip(processCount).toIList();
|
||||
await _updateQueueInner(newQueue);
|
||||
});
|
||||
}
|
||||
|
||||
IList<T> get queue => _queue;
|
||||
|
||||
// TableDBBacked
|
||||
@override
|
||||
String tableKeyName() => _key;
|
||||
|
||||
@override
|
||||
String tableName() => _table;
|
||||
|
||||
@override
|
||||
IList<T> valueFromBuffer(Uint8List bytes) {
|
||||
final reader = CodedBufferReader(bytes);
|
||||
var out = IList<T>();
|
||||
while (!reader.isAtEnd()) {
|
||||
out = out.add(_fromBuffer(reader.readBytesAsView()));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
@override
|
||||
Uint8List valueToBuffer(IList<T> val) {
|
||||
final writer = CodedBufferWriter();
|
||||
for (final elem in val) {
|
||||
writer.writeRawBytes(elem.writeToBuffer());
|
||||
}
|
||||
return writer.toBuffer();
|
||||
}
|
||||
|
||||
final String _table;
|
||||
final String _key;
|
||||
final T Function(Uint8List) _fromBuffer;
|
||||
final bool _deleteOnClose;
|
||||
final WaitSet _initWait = WaitSet();
|
||||
final Mutex _queueMutex = Mutex();
|
||||
IList<T> _queue = IList<T>.empty();
|
||||
final StreamController<Iterable<T>> _syncAddController = StreamController();
|
||||
final StreamController<void> _queueReady = StreamController();
|
||||
final Future<void> Function(IList<T>) _closure;
|
||||
}
|
@ -1,194 +0,0 @@
|
||||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:async_tools/async_tools.dart';
|
||||
import 'package:bloc/bloc.dart';
|
||||
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
|
||||
import 'package:mutex/mutex.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
|
||||
import 'table_db.dart';
|
||||
|
||||
class PersistentQueueCubit<T extends GeneratedMessage>
|
||||
extends Cubit<AsyncValue<IList<T>>> with TableDBBackedFromBuffer<IList<T>> {
|
||||
//
|
||||
PersistentQueueCubit(
|
||||
{required String table,
|
||||
required String key,
|
||||
required T Function(Uint8List) fromBuffer,
|
||||
bool deleteOnClose = true})
|
||||
: _table = table,
|
||||
_key = key,
|
||||
_fromBuffer = fromBuffer,
|
||||
_deleteOnClose = deleteOnClose,
|
||||
super(const AsyncValue.loading()) {
|
||||
_initWait.add(_build);
|
||||
unawaited(Future.delayed(Duration.zero, () async {
|
||||
await for (final elem in _syncAddController.stream) {
|
||||
await addAll(elem);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> close() async {
|
||||
// Ensure the init finished
|
||||
await _initWait();
|
||||
|
||||
// Close the sync add stream
|
||||
await _syncAddController.close();
|
||||
|
||||
// Wait for any setStates to finish
|
||||
await _stateMutex.acquire();
|
||||
|
||||
// Clean up table if desired
|
||||
if (_deleteOnClose) {
|
||||
await delete();
|
||||
}
|
||||
|
||||
await super.close();
|
||||
}
|
||||
|
||||
Future<void> _build() async {
|
||||
await _stateMutex.protect(() async {
|
||||
try {
|
||||
emit(AsyncValue.data(await load() ?? await store(IList<T>.empty())));
|
||||
} on Exception catch (e, stackTrace) {
|
||||
emit(AsyncValue.error(e, stackTrace));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _setStateInner(IList<T> newState) async {
|
||||
emit(AsyncValue.data(await store(newState)));
|
||||
}
|
||||
|
||||
Future<void> add(T item) async {
|
||||
await _initWait();
|
||||
await _stateMutex.protect(() async {
|
||||
final queue = state.asData!.value.add(item);
|
||||
await _setStateInner(queue);
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> addAll(IList<T> items) async {
|
||||
await _initWait();
|
||||
await _stateMutex.protect(() async {
|
||||
var queue = state.asData!.value;
|
||||
for (final item in items) {
|
||||
queue = queue.add(item);
|
||||
}
|
||||
await _setStateInner(queue);
|
||||
});
|
||||
}
|
||||
|
||||
void addSync(T item) {
|
||||
_syncAddController.sink.add([item].toIList());
|
||||
}
|
||||
|
||||
void addAllSync(IList<T> items) {
|
||||
_syncAddController.sink.add(items.toIList());
|
||||
}
|
||||
|
||||
Future<bool> get isEmpty async {
|
||||
await _initWait();
|
||||
return state.asData!.value.isEmpty;
|
||||
}
|
||||
|
||||
Future<bool> get isNotEmpty async {
|
||||
await _initWait();
|
||||
return state.asData!.value.isNotEmpty;
|
||||
}
|
||||
|
||||
Future<int> get length async {
|
||||
await _initWait();
|
||||
return state.asData!.value.length;
|
||||
}
|
||||
|
||||
// Future<T?> pop() async {
|
||||
// await _initWait();
|
||||
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
|
||||
// final removedItem = Output<T>();
|
||||
// final queue = state.asData!.value.removeAt(0, removedItem);
|
||||
// await _setStateInner(queue);
|
||||
// return removedItem.value;
|
||||
// }));
|
||||
// }
|
||||
|
||||
// Future<IList<T>> popAll() async {
|
||||
// await _initWait();
|
||||
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
|
||||
// final queue = state.asData!.value;
|
||||
// await _setStateInner(IList<T>.empty);
|
||||
// return queue;
|
||||
// }));
|
||||
// }
|
||||
|
||||
Future<R> process<R>(Future<R> Function(IList<T>) closure,
|
||||
{int? count}) async {
|
||||
await _initWait();
|
||||
// Only one processor at a time
|
||||
return _processingMutex.protect(() async {
|
||||
// Take 'count' items from the front of the list
|
||||
final toProcess = await _stateMutex.protect(() async {
|
||||
final queue = state.asData!.value;
|
||||
final processCount = (count ?? queue.length).clamp(0, queue.length);
|
||||
return queue.take(processCount).toIList();
|
||||
});
|
||||
|
||||
// Run the processing closure
|
||||
final processCount = toProcess.length;
|
||||
final out = await closure(toProcess);
|
||||
|
||||
// If there was nothing to process just return
|
||||
if (toProcess.isEmpty) {
|
||||
return out;
|
||||
}
|
||||
|
||||
// If there was no exception, remove the processed items
|
||||
return _stateMutex.protect(() async {
|
||||
// Get the queue from the state again as items could
|
||||
// have been added during processing
|
||||
final queue = state.asData!.value;
|
||||
final newQueue = queue.skip(processCount).toIList();
|
||||
await _setStateInner(newQueue);
|
||||
return out;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// TableDBBacked
|
||||
@override
|
||||
String tableKeyName() => _key;
|
||||
|
||||
@override
|
||||
String tableName() => _table;
|
||||
|
||||
@override
|
||||
IList<T> valueFromBuffer(Uint8List bytes) {
|
||||
final reader = CodedBufferReader(bytes);
|
||||
var out = IList<T>();
|
||||
while (!reader.isAtEnd()) {
|
||||
out = out.add(_fromBuffer(reader.readBytesAsView()));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
@override
|
||||
Uint8List valueToBuffer(IList<T> val) {
|
||||
final writer = CodedBufferWriter();
|
||||
for (final elem in val) {
|
||||
writer.writeRawBytes(elem.writeToBuffer());
|
||||
}
|
||||
return writer.toBuffer();
|
||||
}
|
||||
|
||||
final String _table;
|
||||
final String _key;
|
||||
final T Function(Uint8List) _fromBuffer;
|
||||
final bool _deleteOnClose;
|
||||
final WaitSet _initWait = WaitSet();
|
||||
final Mutex _stateMutex = Mutex();
|
||||
final Mutex _processingMutex = Mutex();
|
||||
final StreamController<IList<T>> _syncAddController = StreamController();
|
||||
}
|
@ -10,7 +10,7 @@ export 'src/config.dart';
|
||||
export 'src/identity.dart';
|
||||
export 'src/json_tools.dart';
|
||||
export 'src/memory_tools.dart';
|
||||
export 'src/persistent_queue_cubit.dart';
|
||||
export 'src/persistent_queue.dart';
|
||||
export 'src/protobuf_tools.dart';
|
||||
export 'src/table_db.dart';
|
||||
export 'src/veilid_log.dart' hide veilidLoggy;
|
||||
|
88
pubspec.lock
88
pubspec.lock
@ -45,10 +45,10 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: args
|
||||
sha256: eef6c46b622e0494a36c5a12d10d77fb4e855501a91c1b9ef9339326e58f0596
|
||||
sha256: "7cf60b9f0cc88203c5a190b4cd62a99feea42759a7fa695010eb5de1c0b2252a"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.4.2"
|
||||
version: "2.5.0"
|
||||
async:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -68,10 +68,10 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: awesome_extensions
|
||||
sha256: "7d235d64a81543a7e200a91b1149bef7d32241290fa483bae25b31be41449a7c"
|
||||
sha256: c3bf11d07a69fe10ff5541717b920661c7a87a791ee182851f1c92a2d15b95a2
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.0.13"
|
||||
version: "2.0.14"
|
||||
badges:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
@ -219,18 +219,18 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: camera_android
|
||||
sha256: ae5b9a996dfb8d77b02031b67f5500873d6402f33bd6a5283e932eef08542a51
|
||||
sha256: "7b0aba6398afa8475e2bc9115d976efb49cf8db781e922572d443795c04a4f4f"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "0.10.9"
|
||||
version: "0.10.9+1"
|
||||
camera_avfoundation:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: camera_avfoundation
|
||||
sha256: "5d009ae48de1c8ab621b1c4496dadb6e2a83f3223b76c6e6a4a252414105f561"
|
||||
sha256: "9dbbb253aaf201a69c40cf95571f366ca936305d2de012684e21f6f1b1433d31"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "0.9.15"
|
||||
version: "0.9.15+4"
|
||||
camera_platform_interface:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -371,10 +371,10 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: cupertino_icons
|
||||
sha256: d57953e10f9f8327ce64a508a355f0b1ec902193f66288e8cb5070e7c47eeb2d
|
||||
sha256: ba631d1c7f7bef6b729a622b7b752645a2d076dba9976925b8f25725a30e1ee6
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "1.0.6"
|
||||
version: "1.0.8"
|
||||
dart_style:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -533,10 +533,10 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: flutter_plugin_android_lifecycle
|
||||
sha256: b068ffc46f82a55844acfa4fdbb61fad72fa2aef0905548419d97f0f95c456da
|
||||
sha256: "8cf40eebf5dec866a6d1956ad7b4f7016e6c0cc69847ab946833b7d43743809f"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.0.17"
|
||||
version: "2.0.19"
|
||||
flutter_shaders:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -594,10 +594,10 @@ packages:
|
||||
dependency: "direct dev"
|
||||
description:
|
||||
name: freezed
|
||||
sha256: "24f77b50776d4285cc4b3a1665bb79852714c09b878363efbe64788c179c4284"
|
||||
sha256: a434911f643466d78462625df76fd9eb13e57348ff43fe1f77bbe909522c67a1
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.5.0"
|
||||
version: "2.5.2"
|
||||
freezed_annotation:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
@ -634,10 +634,10 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: go_router
|
||||
sha256: "5ed2687bc961f33a752017ccaa7edead3e5601b28b6376a5901bf24728556b85"
|
||||
sha256: "771c8feb40ad0ef639973d7ecf1b43d55ffcedb2207fd43fab030f5639e40446"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "13.2.2"
|
||||
version: "13.2.4"
|
||||
graphs:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -826,10 +826,10 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: motion_toast
|
||||
sha256: f3fe9f92d9956814a1aa040c22c8a6c432cfb0c9f783163d9ec64915838e4837
|
||||
sha256: "4763b2aa3499d0bf00ffd9737479b73141d0397f532542893156efb4a5ac1994"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.9.0"
|
||||
version: "2.9.1"
|
||||
mutex:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
@ -889,18 +889,18 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: path_provider
|
||||
sha256: b27217933eeeba8ff24845c34003b003b2b22151de3c908d0e679e8fe1aa078b
|
||||
sha256: c9e7d3a4cd1410877472158bee69963a4579f78b68c65a2b7d40d1a7a88bb161
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.1.2"
|
||||
version: "2.1.3"
|
||||
path_provider_android:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: path_provider_android
|
||||
sha256: "477184d672607c0a3bf68fbbf601805f92ef79c82b64b4d6eb318cbca4c48668"
|
||||
sha256: a248d8146ee5983446bf03ed5ea8f6533129a12b11f12057ad1b4a67a2b3b41d
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.2.2"
|
||||
version: "2.2.4"
|
||||
path_provider_foundation:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -977,10 +977,10 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: pointycastle
|
||||
sha256: "70fe966348fe08c34bf929582f1d8247d9d9408130723206472b4687227e4333"
|
||||
sha256: "79fbafed02cfdbe85ef3fd06c7f4bc2cbcba0177e61b765264853d4253b21744"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "3.8.0"
|
||||
version: "3.9.0"
|
||||
pool:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -1041,10 +1041,10 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: qr_code_dart_scan
|
||||
sha256: "8e9732d5b6e4e28d50647dc6d7713bf421148cadf28c768a10e9810bf6f3d87a"
|
||||
sha256: "948271f8dc39ab3798341783f0ab7bfdb723054fdc9ea0928c0a5be8503ee01c"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "0.7.6"
|
||||
version: "0.8.0"
|
||||
qr_flutter:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
@ -1113,18 +1113,18 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: searchable_listview
|
||||
sha256: "5535ea3efa4599cf23ce52870a9580b52ece5d691aa90655ebec76d5081c9592"
|
||||
sha256: d8513a968bdd540cb011220a5670b23b346e04a7bcb99690a859ed58092f72a4
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.11.1"
|
||||
version: "2.11.2"
|
||||
share_plus:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: share_plus
|
||||
sha256: "05ec043470319bfbabe0adbc90d3a84cbff0426b9d9f3a6e2ad3e131fa5fa629"
|
||||
sha256: fb5319f3aab4c5dda5ebb92dca978179ba21f8c783ee4380910ef4c1c6824f51
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "8.0.2"
|
||||
version: "8.0.3"
|
||||
share_plus_platform_interface:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -1137,18 +1137,18 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: shared_preferences
|
||||
sha256: "81429e4481e1ccfb51ede496e916348668fd0921627779233bd24cc3ff6abd02"
|
||||
sha256: d3bbe5553a986e83980916ded2f0b435ef2e1893dfaa29d5a7a790d0eca12180
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.2.2"
|
||||
version: "2.2.3"
|
||||
shared_preferences_android:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: shared_preferences_android
|
||||
sha256: "8568a389334b6e83415b6aae55378e158fbc2314e074983362d20c562780fb06"
|
||||
sha256: "1ee8bf911094a1b592de7ab29add6f826a7331fb854273d55918693d5364a1f2"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.2.1"
|
||||
version: "2.2.2"
|
||||
shared_preferences_foundation:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -1398,18 +1398,18 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: url_launcher
|
||||
sha256: "0ecc004c62fd3ed36a2ffcbe0dd9700aee63bd7532d0b642a488b1ec310f492e"
|
||||
sha256: "6ce1e04375be4eed30548f10a315826fd933c1e493206eab82eed01f438c8d2e"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "6.2.5"
|
||||
version: "6.2.6"
|
||||
url_launcher_android:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: url_launcher_android
|
||||
sha256: d4ed0711849dd8e33eb2dd69c25db0d0d3fdc37e0a62e629fe32f57a22db2745
|
||||
sha256: "360a6ed2027f18b73c8d98e159dda67a61b7f2e0f6ec26e86c3ada33b0621775"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "6.3.0"
|
||||
version: "6.3.1"
|
||||
url_launcher_ios:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -1446,10 +1446,10 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: url_launcher_web
|
||||
sha256: "3692a459204a33e04bc94f5fb91158faf4f2c8903281ddd82915adecdb1a901d"
|
||||
sha256: "8d9e750d8c9338601e709cd0885f95825086bd8b642547f26bda435aade95d8a"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.3.0"
|
||||
version: "2.3.1"
|
||||
url_launcher_windows:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -1462,10 +1462,10 @@ packages:
|
||||
dependency: "direct main"
|
||||
description:
|
||||
name: uuid
|
||||
sha256: cd210a09f7c18cbe5a02511718e0334de6559871052c90a90c0cca46a4aa81c8
|
||||
sha256: "814e9e88f21a176ae1359149021870e87f7cddaf633ab678a5d2b0bff7fd1ba8"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "4.3.3"
|
||||
version: "4.4.0"
|
||||
vector_graphics:
|
||||
dependency: transitive
|
||||
description:
|
||||
@ -1540,10 +1540,10 @@ packages:
|
||||
dependency: transitive
|
||||
description:
|
||||
name: web_socket_channel
|
||||
sha256: "1d8e795e2a8b3730c41b8a98a2dff2e0fb57ae6f0764a1c46ec5915387d257b2"
|
||||
sha256: "58c6666b342a38816b2e7e50ed0f1e261959630becd4c879c4f26bfa14aa5a42"
|
||||
url: "https://pub.dev"
|
||||
source: hosted
|
||||
version: "2.4.4"
|
||||
version: "2.4.5"
|
||||
win32:
|
||||
dependency: transitive
|
||||
description:
|
||||
|
22
pubspec.yaml
22
pubspec.yaml
@ -13,7 +13,7 @@ dependencies:
|
||||
archive: ^3.4.10
|
||||
async_tools:
|
||||
path: packages/async_tools
|
||||
awesome_extensions: ^2.0.13
|
||||
awesome_extensions: ^2.0.14
|
||||
badges: ^3.1.2
|
||||
basic_utils: ^5.7.0
|
||||
bloc: ^8.1.4
|
||||
@ -25,7 +25,7 @@ dependencies:
|
||||
circular_profile_avatar: ^2.0.5
|
||||
circular_reveal_animation: ^2.0.1
|
||||
cool_dropdown: ^2.1.0
|
||||
cupertino_icons: ^1.0.6
|
||||
cupertino_icons: ^1.0.8
|
||||
equatable: ^2.0.5
|
||||
fast_immutable_collections: ^10.2.2
|
||||
fixnum: ^1.1.0
|
||||
@ -46,7 +46,7 @@ dependencies:
|
||||
flutter_translate: ^4.0.4
|
||||
form_builder_validators: ^9.1.0
|
||||
freezed_annotation: ^2.4.1
|
||||
go_router: ^13.2.2
|
||||
go_router: ^13.2.4
|
||||
hydrated_bloc: ^9.1.5
|
||||
image: ^4.1.7
|
||||
intl: ^0.18.1
|
||||
@ -54,30 +54,30 @@ dependencies:
|
||||
loggy: ^2.0.3
|
||||
meta: ^1.11.0
|
||||
mobile_scanner: ^4.0.1
|
||||
motion_toast: ^2.9.0
|
||||
motion_toast: ^2.9.1
|
||||
mutex:
|
||||
path: packages/mutex
|
||||
pasteboard: ^0.2.0
|
||||
path: ^1.9.0
|
||||
path_provider: ^2.1.2
|
||||
path_provider: ^2.1.3
|
||||
pinput: ^4.0.0
|
||||
preload_page_view: ^0.2.0
|
||||
protobuf: ^3.1.0
|
||||
provider: ^6.1.2
|
||||
qr_code_dart_scan: ^0.7.6
|
||||
qr_code_dart_scan: ^0.8.0
|
||||
qr_flutter: ^4.1.0
|
||||
quickalert: ^1.1.0
|
||||
radix_colors: ^1.0.4
|
||||
reorderable_grid: ^1.0.10
|
||||
searchable_listview: ^2.11.1
|
||||
share_plus: ^8.0.2
|
||||
shared_preferences: ^2.2.2
|
||||
searchable_listview: ^2.11.2
|
||||
share_plus: ^8.0.3
|
||||
shared_preferences: ^2.2.3
|
||||
signal_strength_indicator: ^0.4.1
|
||||
split_view: ^3.2.1
|
||||
stack_trace: ^1.11.1
|
||||
stream_transform: ^2.1.0
|
||||
stylish_bottom_bar: ^1.1.0
|
||||
uuid: ^4.3.3
|
||||
uuid: ^4.4.0
|
||||
veilid:
|
||||
# veilid: ^0.0.1
|
||||
path: ../veilid/veilid-flutter
|
||||
@ -89,7 +89,7 @@ dependencies:
|
||||
|
||||
dev_dependencies:
|
||||
build_runner: ^2.4.9
|
||||
freezed: ^2.5.0
|
||||
freezed: ^2.5.2
|
||||
icons_launcher: ^2.1.7
|
||||
json_serializable: ^6.7.1
|
||||
lint_hard: ^4.0.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user