persistent queue fixes

This commit is contained in:
Christien Rioux 2025-06-01 15:09:22 -04:00
parent b7752a7e95
commit fa72782f39
8 changed files with 131 additions and 134 deletions

View file

@ -37,10 +37,10 @@ packages:
dependency: "direct dev"
description:
name: async_tools
sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067
sha256: "9611c1efeae7e6d342721d0c2caf2e4783d91fba6a9637d7badfa2dccf8de2a2"
url: "https://pub.dev"
source: hosted
version: "0.1.9"
version: "0.1.10"
bloc:
dependency: transitive
description:
@ -53,10 +53,10 @@ packages:
dependency: transitive
description:
name: bloc_advanced_tools
sha256: dfb142569814952af8d93e7fe045972d847e29382471687db59913e253202f6e
sha256: "63e57000df7259e3007dbfbbfd7dae3e0eca60eb2ac93cbe0c5a3de0e77c9972"
url: "https://pub.dev"
source: hosted
version: "0.1.12"
version: "0.1.13"
boolean_selector:
dependency: transitive
description:
@ -65,6 +65,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "2.1.2"
buffer:
dependency: transitive
description:
name: buffer
sha256: "389da2ec2c16283c8787e0adaede82b1842102f8c8aae2f49003a766c5c6b3d1"
url: "https://pub.dev"
source: hosted
version: "1.2.3"
change_case:
dependency: transitive
description:

View file

@ -1,10 +1,10 @@
name: example
description: "Veilid Support Example"
publish_to: 'none' # Remove this line if you wish to publish to pub.dev
publish_to: "none" # Remove this line if you wish to publish to pub.dev
version: 1.0.0+1
environment:
sdk: '>=3.3.4 <4.0.0'
sdk: ">=3.3.4 <4.0.0"
dependencies:
collection: ^1.19.1
@ -15,7 +15,7 @@ dependencies:
path: ../
dev_dependencies:
async_tools: ^0.1.9
async_tools: ^0.1.10
integration_test:
sdk: flutter
lint_hard: ^6.0.0
@ -23,5 +23,9 @@ dev_dependencies:
veilid_test:
path: ../../../../veilid/veilid-flutter/packages/veilid_test
# dependency_overrides:
# async_tools:
# path: ../../../../dart_async_tools
flutter:
uses-material-design: true

View file

@ -2,13 +2,15 @@ import 'dart:async';
import 'dart:typed_data';
import 'package:async_tools/async_tools.dart';
import 'package:buffer/buffer.dart';
import 'package:fast_immutable_collections/fast_immutable_collections.dart';
import 'package:protobuf/protobuf.dart';
import 'config.dart';
import 'table_db.dart';
import 'veilid_log.dart';
const _ksfSyncAdd = 'ksfSyncAdd';
class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
//
PersistentQueue(
@ -17,7 +19,7 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
required T Function(Uint8List) fromBuffer,
required Uint8List Function(T) toBuffer,
required Future<void> Function(IList<T>) closure,
bool deleteOnClose = true,
bool deleteOnClose = false,
void Function(Object, StackTrace)? onError})
: _table = table,
_key = key,
@ -33,13 +35,12 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
// Ensure the init finished
await _initWait();
// Close the sync add stream
await _syncAddController.close();
await _syncAddTask;
// Finish all sync adds
await serialFutureClose((this, _ksfSyncAdd));
// Stop the processing trigger
await _sspQueueReady.close();
await _queueReady.close();
await _processorTask;
// No more queue actions
await _queueMutex.acquire();
@ -50,7 +51,13 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
}
}
Future<void> get wait async {
set deleteOnClose(bool d) {
_deleteOnClose = d;
}
bool get deleteOnClose => _deleteOnClose;
Future<void> get waitEmpty async {
// Ensure the init finished
await _initWait();
@ -64,21 +71,13 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
Future<void> _init(Completer<void> _) async {
// Start the processor
_processorTask = Future.delayed(Duration.zero, () async {
_sspQueueReady.follow(_queueReady.stream, true, (more) async {
await _initWait();
await for (final _ in _queueReady.stream) {
if (more) {
await _process();
}
});
// Start the sync add controller
_syncAddTask = Future.delayed(Duration.zero, () async {
await _initWait();
await for (final elem in _syncAddController.stream) {
await addAll(elem);
}
});
// Load the queue if we have one
try {
await _queueMutex.protect(() async {
@ -98,7 +97,7 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
assert(_queueMutex.isLocked, 'must be locked');
if (_queue.isNotEmpty) {
if (!_queueReady.isClosed) {
_queueReady.sink.add(null);
_queueReady.sink.add(true);
}
} else {
_queueDoneCompleter?.complete();
@ -127,46 +126,24 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
}
void addSync(T item) {
_syncAddController.sink.add([item]);
serialFuture((this, _ksfSyncAdd), () async {
await add(item);
});
}
void addAllSync(Iterable<T> items) {
_syncAddController.sink.add(items);
serialFuture((this, _ksfSyncAdd), () async {
await addAll(items);
});
}
// Future<bool> get isEmpty async {
// await _initWait();
// return state.asData!.value.isEmpty;
// }
Future<void> pause() async {
await _sspQueueReady.pause();
}
// Future<bool> get isNotEmpty async {
// await _initWait();
// return state.asData!.value.isNotEmpty;
// }
// Future<int> get length async {
// await _initWait();
// return state.asData!.value.length;
// }
// Future<T?> pop() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final removedItem = Output<T>();
// final queue = state.asData!.value.removeAt(0, removedItem);
// await _setStateInner(queue);
// return removedItem.value;
// }));
// }
// Future<IList<T>> popAll() async {
// await _initWait();
// return _processingMutex.protect(() async => _stateMutex.protect(() async {
// final queue = state.asData!.value;
// await _setStateInner(IList<T>.empty);
// return queue;
// }));
// }
Future<void> resume() async {
await _sspQueueReady.resume();
}
Future<void> _process() async {
try {
@ -210,9 +187,10 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
IList<T> valueFromBuffer(Uint8List bytes) {
var out = IList<T>();
try {
final reader = CodedBufferReader(bytes);
while (!reader.isAtEnd()) {
final bytes = reader.readBytesAsView();
final reader = ByteDataReader()..add(bytes);
while (reader.remainingLength != 0) {
final count = reader.readUint32();
final bytes = reader.read(count);
try {
final item = _fromBuffer(bytes);
out = out.add(item);
@ -236,26 +214,29 @@ class PersistentQueue<T> with TableDBBackedFromBuffer<IList<T>> {
@override
Uint8List valueToBuffer(IList<T> val) {
final writer = CodedBufferWriter();
final writer = ByteDataWriter();
for (final elem in val) {
writer.writeRawBytes(_toBuffer(elem));
final bytes = _toBuffer(elem);
final count = bytes.lengthInBytes;
writer
..writeUint32(count)
..write(bytes);
}
return writer.toBuffer();
return writer.toBytes();
}
final String _table;
final String _key;
final T Function(Uint8List) _fromBuffer;
final Uint8List Function(T) _toBuffer;
final bool _deleteOnClose;
bool _deleteOnClose;
final WaitSet<void, void> _initWait = WaitSet();
final _queueMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
var _queue = IList<T>.empty();
final StreamController<Iterable<T>> _syncAddController = StreamController();
final StreamController<void> _queueReady = StreamController();
final Future<void> Function(IList<T>) _closure;
final void Function(Object, StackTrace)? _onError;
late Future<void> _processorTask;
late Future<void> _syncAddTask;
Completer<void>? _queueDoneCompleter;
final StreamController<bool> _queueReady = StreamController();
final _sspQueueReady = SingleStateProcessor<bool>();
}

View file

@ -46,7 +46,7 @@ class _TableDBArrayBase {
await _initWait();
}
Future<void> _init(_) async {
Future<void> _init(Completer<void> _) async {
// Load the array details
await _mutex.protect(() async {
_tableDB = await Veilid.instance.openTableDB(_table, 1);
@ -102,27 +102,27 @@ class _TableDBArrayBase {
Future<void> _add(Uint8List value) async {
await _initWait();
return _writeTransaction((t) async => _addInner(t, value));
return _writeTransaction((t) => _addInner(t, value));
}
Future<void> _addAll(List<Uint8List> values) async {
await _initWait();
return _writeTransaction((t) async => _addAllInner(t, values));
return _writeTransaction((t) => _addAllInner(t, values));
}
Future<void> _insert(int pos, Uint8List value) async {
await _initWait();
return _writeTransaction((t) async => _insertInner(t, pos, value));
return _writeTransaction((t) => _insertInner(t, pos, value));
}
Future<void> _insertAll(int pos, List<Uint8List> values) async {
await _initWait();
return _writeTransaction((t) async => _insertAllInner(t, pos, values));
return _writeTransaction((t) => _insertAllInner(t, pos, values));
}
Future<Uint8List> _get(int pos) async {
await _initWait();
return _mutex.protect(() async {
return _mutex.protect(() {
if (!_open) {
throw StateError('not open');
}
@ -132,7 +132,7 @@ class _TableDBArrayBase {
Future<List<Uint8List>> _getRange(int start, [int? end]) async {
await _initWait();
return _mutex.protect(() async {
return _mutex.protect(() {
if (!_open) {
throw StateError('not open');
}
@ -142,14 +142,13 @@ class _TableDBArrayBase {
Future<void> _remove(int pos, {Output<Uint8List>? out}) async {
await _initWait();
return _writeTransaction((t) async => _removeInner(t, pos, out: out));
return _writeTransaction((t) => _removeInner(t, pos, out: out));
}
Future<void> _removeRange(int start, int end,
{Output<List<Uint8List>>? out}) async {
await _initWait();
return _writeTransaction(
(t) async => _removeRangeInner(t, start, end, out: out));
return _writeTransaction((t) => _removeRangeInner(t, start, end, out: out));
}
Future<void> clear() async {
@ -331,24 +330,24 @@ class _TableDBArrayBase {
////////////////////////////////////////////////////////////
// Private implementation
static final Uint8List _headKey = Uint8List.fromList([$_, $H, $E, $A, $D]);
static final _headKey = Uint8List.fromList([$_, $H, $E, $A, $D]);
static Uint8List _entryKey(int k) =>
(ByteData(4)..setUint32(0, k)).buffer.asUint8List();
static Uint8List _chunkKey(int n) =>
(ByteData(2)..setUint16(0, n)).buffer.asUint8List();
Future<T> _writeTransaction<T>(
Future<T> Function(VeilidTableDBTransaction) closure) async =>
Future<T> Function(VeilidTableDBTransaction) closure) =>
_mutex.protect(() async {
if (!_open) {
throw StateError('not open');
}
final _oldLength = _length;
final _oldNextFree = _nextFree;
final _oldMaxEntry = _maxEntry;
final _oldHeadDelta = _headDelta;
final _oldTailDelta = _tailDelta;
final oldLength = _length;
final oldNextFree = _nextFree;
final oldMaxEntry = _maxEntry;
final oldHeadDelta = _headDelta;
final oldTailDelta = _tailDelta;
try {
final out = await transactionScope(_tableDB, (t) async {
final out = await closure(t);
@ -365,11 +364,11 @@ class _TableDBArrayBase {
return out;
} on Exception {
// restore head
_length = _oldLength;
_nextFree = _oldNextFree;
_maxEntry = _oldMaxEntry;
_headDelta = _oldHeadDelta;
_tailDelta = _oldTailDelta;
_length = oldLength;
_nextFree = oldNextFree;
_maxEntry = oldMaxEntry;
_headDelta = oldHeadDelta;
_tailDelta = oldTailDelta;
// invalidate caches because they could have been written to
_chunkCache.clear();
_dirtyChunks.clear();
@ -415,7 +414,7 @@ class _TableDBArrayBase {
_dirtyChunks[chunkNumber] = chunk;
}
Future<void> _insertIndexEntry(int pos) async => _insertIndexEntries(pos, 1);
Future<void> _insertIndexEntry(int pos) => _insertIndexEntries(pos, 1);
Future<void> _insertIndexEntries(int start, int length) async {
if (length == 0) {
@ -474,7 +473,7 @@ class _TableDBArrayBase {
_tailDelta += length;
}
Future<void> _removeIndexEntry(int pos) async => _removeIndexEntries(pos, 1);
Future<void> _removeIndexEntry(int pos) => _removeIndexEntries(pos, 1);
Future<void> _removeIndexEntries(int start, int length) async {
if (length == 0) {
@ -624,20 +623,20 @@ class _TableDBArrayBase {
var _initDone = false;
final VeilidCrypto _crypto;
final WaitSet<void, void> _initWait = WaitSet();
final Mutex _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
final _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Change tracking
int _headDelta = 0;
int _tailDelta = 0;
var _headDelta = 0;
var _tailDelta = 0;
// Head state
int _length = 0;
int _nextFree = 0;
int _maxEntry = 0;
static const int _indexStride = 16384;
var _length = 0;
var _nextFree = 0;
var _maxEntry = 0;
static const _indexStride = 16384;
final List<(int, Uint8List)> _chunkCache = [];
final Map<int, Uint8List> _dirtyChunks = {};
static const int _chunkCacheLength = 3;
static const _chunkCacheLength = 3;
final StreamController<TableDBArrayUpdate> _changeStream =
StreamController.broadcast();
@ -711,13 +710,12 @@ class TableDBArrayJson<T> extends _TableDBArrayBase {
Future<void> add(T value) => _add(jsonEncodeBytes(value));
Future<void> addAll(List<T> values) async =>
Future<void> addAll(List<T> values) =>
_addAll(values.map(jsonEncodeBytes).toList());
Future<void> insert(int pos, T value) async =>
_insert(pos, jsonEncodeBytes(value));
Future<void> insert(int pos, T value) => _insert(pos, jsonEncodeBytes(value));
Future<void> insertAll(int pos, List<T> values) async =>
Future<void> insertAll(int pos, List<T> values) =>
_insertAll(pos, values.map(jsonEncodeBytes).toList());
Future<T> get(
@ -774,13 +772,12 @@ class TableDBArrayProtobuf<T extends GeneratedMessage>
Future<void> add(T value) => _add(value.writeToBuffer());
Future<void> addAll(List<T> values) async =>
Future<void> addAll(List<T> values) =>
_addAll(values.map((x) => x.writeToBuffer()).toList());
Future<void> insert(int pos, T value) async =>
_insert(pos, value.writeToBuffer());
Future<void> insert(int pos, T value) => _insert(pos, value.writeToBuffer());
Future<void> insertAll(int pos, List<T> values) async =>
Future<void> insertAll(int pos, List<T> values) =>
_insertAll(pos, values.map((x) => x.writeToBuffer()).toList());
Future<T> get(

View file

@ -37,10 +37,10 @@ packages:
dependency: "direct main"
description:
name: async_tools
sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067
sha256: "9611c1efeae7e6d342721d0c2caf2e4783d91fba6a9637d7badfa2dccf8de2a2"
url: "https://pub.dev"
source: hosted
version: "0.1.9"
version: "0.1.10"
bloc:
dependency: "direct main"
description:
@ -53,10 +53,10 @@ packages:
dependency: "direct main"
description:
name: bloc_advanced_tools
sha256: dfb142569814952af8d93e7fe045972d847e29382471687db59913e253202f6e
sha256: "63e57000df7259e3007dbfbbfd7dae3e0eca60eb2ac93cbe0c5a3de0e77c9972"
url: "https://pub.dev"
source: hosted
version: "0.1.12"
version: "0.1.13"
boolean_selector:
dependency: transitive
description:
@ -65,6 +65,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "2.1.2"
buffer:
dependency: "direct main"
description:
name: buffer
sha256: "389da2ec2c16283c8787e0adaede82b1842102f8c8aae2f49003a766c5c6b3d1"
url: "https://pub.dev"
source: hosted
version: "1.2.3"
build:
dependency: transitive
description:

View file

@ -7,9 +7,10 @@ environment:
sdk: ">=3.2.0 <4.0.0"
dependencies:
async_tools: ^0.1.9
async_tools: ^0.1.10
bloc: ^9.0.0
bloc_advanced_tools: ^0.1.12
bloc_advanced_tools: ^0.1.13
buffer: ^1.2.3
charcode: ^1.4.0
collection: ^1.19.1
convert: ^3.1.2
@ -29,10 +30,10 @@ dependencies:
path: ../../../veilid/veilid-flutter
# dependency_overrides:
# async_tools:
# path: ../../../dart_async_tools
# bloc_advanced_tools:
# path: ../../../bloc_advanced_tools
# async_tools:
# path: ../../../dart_async_tools
# bloc_advanced_tools:
# path: ../../../bloc_advanced_tools
dev_dependencies:
build_runner: ^2.4.15

View file

@ -92,10 +92,9 @@ packages:
async_tools:
dependency: "direct main"
description:
name: async_tools
sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067
url: "https://pub.dev"
source: hosted
path: "../dart_async_tools"
relative: true
source: path
version: "0.1.9"
auto_size_text:
dependency: "direct main"
@ -156,10 +155,9 @@ packages:
bloc_advanced_tools:
dependency: "direct main"
description:
name: bloc_advanced_tools
sha256: dfb142569814952af8d93e7fe045972d847e29382471687db59913e253202f6e
url: "https://pub.dev"
source: hosted
path: "../bloc_advanced_tools"
relative: true
source: path
version: "0.1.12"
blurry_modal_progress_hud:
dependency: "direct main"

View file

@ -15,13 +15,13 @@ dependencies:
animated_theme_switcher: ^2.0.10
ansicolor: ^2.0.3
archive: ^4.0.4
async_tools: ^0.1.9
async_tools: ^0.1.10
auto_size_text: ^3.0.0
awesome_extensions: ^2.0.21
badges: ^3.1.2
basic_utils: ^5.8.2
bloc: ^9.0.0
bloc_advanced_tools: ^0.1.12
bloc_advanced_tools: ^0.1.13
blurry_modal_progress_hud: ^1.1.1
change_case: ^2.2.0
charcode: ^1.4.0
@ -108,10 +108,10 @@ dependencies:
dependency_overrides:
intl: ^0.20.2 # Until flutter_translate updates intl
# async_tools:
# path: ../dart_async_tools
# bloc_advanced_tools:
# path: ../bloc_advanced_tools
# async_tools:
# path: ../dart_async_tools
# bloc_advanced_tools:
# path: ../bloc_advanced_tools
# searchable_listview:
# path: ../Searchable-Listview
# flutter_chat_core: