diff --git a/packages/veilid_support/example/pubspec.lock b/packages/veilid_support/example/pubspec.lock index bc4ab6f..5c4355b 100644 --- a/packages/veilid_support/example/pubspec.lock +++ b/packages/veilid_support/example/pubspec.lock @@ -37,10 +37,10 @@ packages: dependency: "direct dev" description: name: async_tools - sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067 + sha256: "9611c1efeae7e6d342721d0c2caf2e4783d91fba6a9637d7badfa2dccf8de2a2" url: "https://pub.dev" source: hosted - version: "0.1.9" + version: "0.1.10" bloc: dependency: transitive description: @@ -53,10 +53,10 @@ packages: dependency: transitive description: name: bloc_advanced_tools - sha256: dfb142569814952af8d93e7fe045972d847e29382471687db59913e253202f6e + sha256: "63e57000df7259e3007dbfbbfd7dae3e0eca60eb2ac93cbe0c5a3de0e77c9972" url: "https://pub.dev" source: hosted - version: "0.1.12" + version: "0.1.13" boolean_selector: dependency: transitive description: @@ -65,6 +65,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.1.2" + buffer: + dependency: transitive + description: + name: buffer + sha256: "389da2ec2c16283c8787e0adaede82b1842102f8c8aae2f49003a766c5c6b3d1" + url: "https://pub.dev" + source: hosted + version: "1.2.3" change_case: dependency: transitive description: diff --git a/packages/veilid_support/example/pubspec.yaml b/packages/veilid_support/example/pubspec.yaml index 42f9885..86c8e7e 100644 --- a/packages/veilid_support/example/pubspec.yaml +++ b/packages/veilid_support/example/pubspec.yaml @@ -1,10 +1,10 @@ name: example description: "Veilid Support Example" -publish_to: 'none' # Remove this line if you wish to publish to pub.dev +publish_to: "none" # Remove this line if you wish to publish to pub.dev version: 1.0.0+1 environment: - sdk: '>=3.3.4 <4.0.0' + sdk: ">=3.3.4 <4.0.0" dependencies: collection: ^1.19.1 @@ -15,7 +15,7 @@ dependencies: path: ../ dev_dependencies: - async_tools: ^0.1.9 + async_tools: ^0.1.10 integration_test: sdk: flutter lint_hard: ^6.0.0 @@ -23,5 +23,9 @@ dev_dependencies: veilid_test: path: ../../../../veilid/veilid-flutter/packages/veilid_test +# dependency_overrides: +# async_tools: +# path: ../../../../dart_async_tools + flutter: uses-material-design: true diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart index 4e5e541..efb4c86 100644 --- a/packages/veilid_support/lib/src/persistent_queue.dart +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -2,13 +2,15 @@ import 'dart:async'; import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; +import 'package:buffer/buffer.dart'; import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:protobuf/protobuf.dart'; import 'config.dart'; import 'table_db.dart'; import 'veilid_log.dart'; +const _ksfSyncAdd = 'ksfSyncAdd'; + class PersistentQueue with TableDBBackedFromBuffer> { // PersistentQueue( @@ -17,7 +19,7 @@ class PersistentQueue with TableDBBackedFromBuffer> { required T Function(Uint8List) fromBuffer, required Uint8List Function(T) toBuffer, required Future Function(IList) closure, - bool deleteOnClose = true, + bool deleteOnClose = false, void Function(Object, StackTrace)? onError}) : _table = table, _key = key, @@ -33,13 +35,12 @@ class PersistentQueue with TableDBBackedFromBuffer> { // Ensure the init finished await _initWait(); - // Close the sync add stream - await _syncAddController.close(); - await _syncAddTask; + // Finish all sync adds + await serialFutureClose((this, _ksfSyncAdd)); // Stop the processing trigger + await _sspQueueReady.close(); await _queueReady.close(); - await _processorTask; // No more queue actions await _queueMutex.acquire(); @@ -50,7 +51,13 @@ class PersistentQueue with TableDBBackedFromBuffer> { } } - Future get wait async { + set deleteOnClose(bool d) { + _deleteOnClose = d; + } + + bool get deleteOnClose => _deleteOnClose; + + Future get waitEmpty async { // Ensure the init finished await _initWait(); @@ -64,21 +71,13 @@ class PersistentQueue with TableDBBackedFromBuffer> { Future _init(Completer _) async { // Start the processor - _processorTask = Future.delayed(Duration.zero, () async { + _sspQueueReady.follow(_queueReady.stream, true, (more) async { await _initWait(); - await for (final _ in _queueReady.stream) { + if (more) { await _process(); } }); - // Start the sync add controller - _syncAddTask = Future.delayed(Duration.zero, () async { - await _initWait(); - await for (final elem in _syncAddController.stream) { - await addAll(elem); - } - }); - // Load the queue if we have one try { await _queueMutex.protect(() async { @@ -98,7 +97,7 @@ class PersistentQueue with TableDBBackedFromBuffer> { assert(_queueMutex.isLocked, 'must be locked'); if (_queue.isNotEmpty) { if (!_queueReady.isClosed) { - _queueReady.sink.add(null); + _queueReady.sink.add(true); } } else { _queueDoneCompleter?.complete(); @@ -127,46 +126,24 @@ class PersistentQueue with TableDBBackedFromBuffer> { } void addSync(T item) { - _syncAddController.sink.add([item]); + serialFuture((this, _ksfSyncAdd), () async { + await add(item); + }); } void addAllSync(Iterable items) { - _syncAddController.sink.add(items); + serialFuture((this, _ksfSyncAdd), () async { + await addAll(items); + }); } - // Future get isEmpty async { - // await _initWait(); - // return state.asData!.value.isEmpty; - // } + Future pause() async { + await _sspQueueReady.pause(); + } - // Future get isNotEmpty async { - // await _initWait(); - // return state.asData!.value.isNotEmpty; - // } - - // Future get length async { - // await _initWait(); - // return state.asData!.value.length; - // } - - // Future pop() async { - // await _initWait(); - // return _processingMutex.protect(() async => _stateMutex.protect(() async { - // final removedItem = Output(); - // final queue = state.asData!.value.removeAt(0, removedItem); - // await _setStateInner(queue); - // return removedItem.value; - // })); - // } - - // Future> popAll() async { - // await _initWait(); - // return _processingMutex.protect(() async => _stateMutex.protect(() async { - // final queue = state.asData!.value; - // await _setStateInner(IList.empty); - // return queue; - // })); - // } + Future resume() async { + await _sspQueueReady.resume(); + } Future _process() async { try { @@ -210,9 +187,10 @@ class PersistentQueue with TableDBBackedFromBuffer> { IList valueFromBuffer(Uint8List bytes) { var out = IList(); try { - final reader = CodedBufferReader(bytes); - while (!reader.isAtEnd()) { - final bytes = reader.readBytesAsView(); + final reader = ByteDataReader()..add(bytes); + while (reader.remainingLength != 0) { + final count = reader.readUint32(); + final bytes = reader.read(count); try { final item = _fromBuffer(bytes); out = out.add(item); @@ -236,26 +214,29 @@ class PersistentQueue with TableDBBackedFromBuffer> { @override Uint8List valueToBuffer(IList val) { - final writer = CodedBufferWriter(); + final writer = ByteDataWriter(); for (final elem in val) { - writer.writeRawBytes(_toBuffer(elem)); + final bytes = _toBuffer(elem); + final count = bytes.lengthInBytes; + writer + ..writeUint32(count) + ..write(bytes); } - return writer.toBuffer(); + return writer.toBytes(); } final String _table; final String _key; final T Function(Uint8List) _fromBuffer; final Uint8List Function(T) _toBuffer; - final bool _deleteOnClose; + bool _deleteOnClose; final WaitSet _initWait = WaitSet(); final _queueMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); var _queue = IList.empty(); - final StreamController> _syncAddController = StreamController(); - final StreamController _queueReady = StreamController(); final Future Function(IList) _closure; final void Function(Object, StackTrace)? _onError; - late Future _processorTask; - late Future _syncAddTask; Completer? _queueDoneCompleter; + + final StreamController _queueReady = StreamController(); + final _sspQueueReady = SingleStateProcessor(); } diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index 8b59336..53adeb0 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -46,7 +46,7 @@ class _TableDBArrayBase { await _initWait(); } - Future _init(_) async { + Future _init(Completer _) async { // Load the array details await _mutex.protect(() async { _tableDB = await Veilid.instance.openTableDB(_table, 1); @@ -102,27 +102,27 @@ class _TableDBArrayBase { Future _add(Uint8List value) async { await _initWait(); - return _writeTransaction((t) async => _addInner(t, value)); + return _writeTransaction((t) => _addInner(t, value)); } Future _addAll(List values) async { await _initWait(); - return _writeTransaction((t) async => _addAllInner(t, values)); + return _writeTransaction((t) => _addAllInner(t, values)); } Future _insert(int pos, Uint8List value) async { await _initWait(); - return _writeTransaction((t) async => _insertInner(t, pos, value)); + return _writeTransaction((t) => _insertInner(t, pos, value)); } Future _insertAll(int pos, List values) async { await _initWait(); - return _writeTransaction((t) async => _insertAllInner(t, pos, values)); + return _writeTransaction((t) => _insertAllInner(t, pos, values)); } Future _get(int pos) async { await _initWait(); - return _mutex.protect(() async { + return _mutex.protect(() { if (!_open) { throw StateError('not open'); } @@ -132,7 +132,7 @@ class _TableDBArrayBase { Future> _getRange(int start, [int? end]) async { await _initWait(); - return _mutex.protect(() async { + return _mutex.protect(() { if (!_open) { throw StateError('not open'); } @@ -142,14 +142,13 @@ class _TableDBArrayBase { Future _remove(int pos, {Output? out}) async { await _initWait(); - return _writeTransaction((t) async => _removeInner(t, pos, out: out)); + return _writeTransaction((t) => _removeInner(t, pos, out: out)); } Future _removeRange(int start, int end, {Output>? out}) async { await _initWait(); - return _writeTransaction( - (t) async => _removeRangeInner(t, start, end, out: out)); + return _writeTransaction((t) => _removeRangeInner(t, start, end, out: out)); } Future clear() async { @@ -331,24 +330,24 @@ class _TableDBArrayBase { //////////////////////////////////////////////////////////// // Private implementation - static final Uint8List _headKey = Uint8List.fromList([$_, $H, $E, $A, $D]); + static final _headKey = Uint8List.fromList([$_, $H, $E, $A, $D]); static Uint8List _entryKey(int k) => (ByteData(4)..setUint32(0, k)).buffer.asUint8List(); static Uint8List _chunkKey(int n) => (ByteData(2)..setUint16(0, n)).buffer.asUint8List(); Future _writeTransaction( - Future Function(VeilidTableDBTransaction) closure) async => + Future Function(VeilidTableDBTransaction) closure) => _mutex.protect(() async { if (!_open) { throw StateError('not open'); } - final _oldLength = _length; - final _oldNextFree = _nextFree; - final _oldMaxEntry = _maxEntry; - final _oldHeadDelta = _headDelta; - final _oldTailDelta = _tailDelta; + final oldLength = _length; + final oldNextFree = _nextFree; + final oldMaxEntry = _maxEntry; + final oldHeadDelta = _headDelta; + final oldTailDelta = _tailDelta; try { final out = await transactionScope(_tableDB, (t) async { final out = await closure(t); @@ -365,11 +364,11 @@ class _TableDBArrayBase { return out; } on Exception { // restore head - _length = _oldLength; - _nextFree = _oldNextFree; - _maxEntry = _oldMaxEntry; - _headDelta = _oldHeadDelta; - _tailDelta = _oldTailDelta; + _length = oldLength; + _nextFree = oldNextFree; + _maxEntry = oldMaxEntry; + _headDelta = oldHeadDelta; + _tailDelta = oldTailDelta; // invalidate caches because they could have been written to _chunkCache.clear(); _dirtyChunks.clear(); @@ -415,7 +414,7 @@ class _TableDBArrayBase { _dirtyChunks[chunkNumber] = chunk; } - Future _insertIndexEntry(int pos) async => _insertIndexEntries(pos, 1); + Future _insertIndexEntry(int pos) => _insertIndexEntries(pos, 1); Future _insertIndexEntries(int start, int length) async { if (length == 0) { @@ -474,7 +473,7 @@ class _TableDBArrayBase { _tailDelta += length; } - Future _removeIndexEntry(int pos) async => _removeIndexEntries(pos, 1); + Future _removeIndexEntry(int pos) => _removeIndexEntries(pos, 1); Future _removeIndexEntries(int start, int length) async { if (length == 0) { @@ -624,20 +623,20 @@ class _TableDBArrayBase { var _initDone = false; final VeilidCrypto _crypto; final WaitSet _initWait = WaitSet(); - final Mutex _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); + final _mutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); // Change tracking - int _headDelta = 0; - int _tailDelta = 0; + var _headDelta = 0; + var _tailDelta = 0; // Head state - int _length = 0; - int _nextFree = 0; - int _maxEntry = 0; - static const int _indexStride = 16384; + var _length = 0; + var _nextFree = 0; + var _maxEntry = 0; + static const _indexStride = 16384; final List<(int, Uint8List)> _chunkCache = []; final Map _dirtyChunks = {}; - static const int _chunkCacheLength = 3; + static const _chunkCacheLength = 3; final StreamController _changeStream = StreamController.broadcast(); @@ -711,13 +710,12 @@ class TableDBArrayJson extends _TableDBArrayBase { Future add(T value) => _add(jsonEncodeBytes(value)); - Future addAll(List values) async => + Future addAll(List values) => _addAll(values.map(jsonEncodeBytes).toList()); - Future insert(int pos, T value) async => - _insert(pos, jsonEncodeBytes(value)); + Future insert(int pos, T value) => _insert(pos, jsonEncodeBytes(value)); - Future insertAll(int pos, List values) async => + Future insertAll(int pos, List values) => _insertAll(pos, values.map(jsonEncodeBytes).toList()); Future get( @@ -774,13 +772,12 @@ class TableDBArrayProtobuf Future add(T value) => _add(value.writeToBuffer()); - Future addAll(List values) async => + Future addAll(List values) => _addAll(values.map((x) => x.writeToBuffer()).toList()); - Future insert(int pos, T value) async => - _insert(pos, value.writeToBuffer()); + Future insert(int pos, T value) => _insert(pos, value.writeToBuffer()); - Future insertAll(int pos, List values) async => + Future insertAll(int pos, List values) => _insertAll(pos, values.map((x) => x.writeToBuffer()).toList()); Future get( diff --git a/packages/veilid_support/pubspec.lock b/packages/veilid_support/pubspec.lock index b4c7eef..0d2320e 100644 --- a/packages/veilid_support/pubspec.lock +++ b/packages/veilid_support/pubspec.lock @@ -37,10 +37,10 @@ packages: dependency: "direct main" description: name: async_tools - sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067 + sha256: "9611c1efeae7e6d342721d0c2caf2e4783d91fba6a9637d7badfa2dccf8de2a2" url: "https://pub.dev" source: hosted - version: "0.1.9" + version: "0.1.10" bloc: dependency: "direct main" description: @@ -53,10 +53,10 @@ packages: dependency: "direct main" description: name: bloc_advanced_tools - sha256: dfb142569814952af8d93e7fe045972d847e29382471687db59913e253202f6e + sha256: "63e57000df7259e3007dbfbbfd7dae3e0eca60eb2ac93cbe0c5a3de0e77c9972" url: "https://pub.dev" source: hosted - version: "0.1.12" + version: "0.1.13" boolean_selector: dependency: transitive description: @@ -65,6 +65,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.1.2" + buffer: + dependency: "direct main" + description: + name: buffer + sha256: "389da2ec2c16283c8787e0adaede82b1842102f8c8aae2f49003a766c5c6b3d1" + url: "https://pub.dev" + source: hosted + version: "1.2.3" build: dependency: transitive description: diff --git a/packages/veilid_support/pubspec.yaml b/packages/veilid_support/pubspec.yaml index 8864fa6..5fdb74b 100644 --- a/packages/veilid_support/pubspec.yaml +++ b/packages/veilid_support/pubspec.yaml @@ -7,9 +7,10 @@ environment: sdk: ">=3.2.0 <4.0.0" dependencies: - async_tools: ^0.1.9 + async_tools: ^0.1.10 bloc: ^9.0.0 - bloc_advanced_tools: ^0.1.12 + bloc_advanced_tools: ^0.1.13 + buffer: ^1.2.3 charcode: ^1.4.0 collection: ^1.19.1 convert: ^3.1.2 @@ -29,10 +30,10 @@ dependencies: path: ../../../veilid/veilid-flutter # dependency_overrides: -# async_tools: -# path: ../../../dart_async_tools -# bloc_advanced_tools: -# path: ../../../bloc_advanced_tools +# async_tools: +# path: ../../../dart_async_tools +# bloc_advanced_tools: +# path: ../../../bloc_advanced_tools dev_dependencies: build_runner: ^2.4.15 diff --git a/pubspec.lock b/pubspec.lock index 3b7f823..ae51a3d 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -92,10 +92,9 @@ packages: async_tools: dependency: "direct main" description: - name: async_tools - sha256: afd5426e76631172f8ce6a6359b264b092fa9d2a52cd2528100115be9525e067 - url: "https://pub.dev" - source: hosted + path: "../dart_async_tools" + relative: true + source: path version: "0.1.9" auto_size_text: dependency: "direct main" @@ -156,10 +155,9 @@ packages: bloc_advanced_tools: dependency: "direct main" description: - name: bloc_advanced_tools - sha256: dfb142569814952af8d93e7fe045972d847e29382471687db59913e253202f6e - url: "https://pub.dev" - source: hosted + path: "../bloc_advanced_tools" + relative: true + source: path version: "0.1.12" blurry_modal_progress_hud: dependency: "direct main" diff --git a/pubspec.yaml b/pubspec.yaml index fccff99..e605c0e 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -15,13 +15,13 @@ dependencies: animated_theme_switcher: ^2.0.10 ansicolor: ^2.0.3 archive: ^4.0.4 - async_tools: ^0.1.9 + async_tools: ^0.1.10 auto_size_text: ^3.0.0 awesome_extensions: ^2.0.21 badges: ^3.1.2 basic_utils: ^5.8.2 bloc: ^9.0.0 - bloc_advanced_tools: ^0.1.12 + bloc_advanced_tools: ^0.1.13 blurry_modal_progress_hud: ^1.1.1 change_case: ^2.2.0 charcode: ^1.4.0 @@ -108,10 +108,10 @@ dependencies: dependency_overrides: intl: ^0.20.2 # Until flutter_translate updates intl -# async_tools: -# path: ../dart_async_tools -# bloc_advanced_tools: -# path: ../bloc_advanced_tools +# async_tools: +# path: ../dart_async_tools +# bloc_advanced_tools: +# path: ../bloc_advanced_tools # searchable_listview: # path: ../Searchable-Listview # flutter_chat_core: