diff --git a/lib/chat/cubits/single_contact_messages_cubit.dart b/lib/chat/cubits/single_contact_messages_cubit.dart index 878858b..a3e7656 100644 --- a/lib/chat/cubits/single_contact_messages_cubit.dart +++ b/lib/chat/cubits/single_contact_messages_cubit.dart @@ -107,6 +107,7 @@ class SingleContactMessagesCubit extends Cubit { table: 'SingleContactUnsentMessages', key: _remoteConversationRecordKey.toString(), fromBuffer: proto.Message.fromBuffer, + toBuffer: (x) => x.writeToBuffer(), closure: _processUnsentMessages, onError: (e, st) { log.error('Exception while processing unsent messages: $e\n$st\n'); diff --git a/packages/veilid_support/example/integration_test/app_test.dart b/packages/veilid_support/example/integration_test/app_test.dart index 5dc7acd..2d3d0e2 100644 --- a/packages/veilid_support/example/integration_test/app_test.dart +++ b/packages/veilid_support/example/integration_test/app_test.dart @@ -8,6 +8,7 @@ import 'fixtures/fixtures.dart'; import 'test_dht_log.dart'; import 'test_dht_record_pool.dart'; import 'test_dht_short_array.dart'; +import 'test_persistent_queue.dart'; import 'test_table_db_array.dart'; void main() { @@ -32,60 +33,70 @@ void main() { debugPrintSynchronously('Duration: ${endTime.difference(startTime)}'); }); - group('Attached Tests', () { + group('attached', () { setUpAll(veilidFixture.attach); tearDownAll(veilidFixture.detach); - group('DHT Support Tests', () { + group('persistent_queue', () { + test('persistent_queue:open_close', testPersistentQueueOpenClose); + test('persistent_queue:add', testPersistentQueueAdd); + test('persistent_queue:add_sync', testPersistentQueueAddSync); + test('persistent_queue:add_persist', testPersistentQueueAddPersist); + test('persistent_queue:add_sync_persist', + testPersistentQueueAddSyncPersist); + }); + + group('dht_support', () { setUpAll(updateProcessorFixture.setUp); setUpAll(tickerFixture.setUp); tearDownAll(tickerFixture.tearDown); tearDownAll(updateProcessorFixture.tearDown); - test('create pool', testDHTRecordPoolCreate); + test('create_pool', testDHTRecordPoolCreate); - group('DHTRecordPool Tests', () { + group('dht_record_pool', () { setUpAll(dhtRecordPoolFixture.setUp); tearDownAll(dhtRecordPoolFixture.tearDown); - test('create/delete record', testDHTRecordCreateDelete); - test('record scopes', testDHTRecordScopes); - test('create/delete deep record', testDHTRecordDeepCreateDelete); + test('dht_record_pool:create_delete', testDHTRecordCreateDelete); + test('dht_record_pool:scopes', testDHTRecordScopes); + test('dht_record_pool:deep_create_delete', + testDHTRecordDeepCreateDelete); }); - group('DHTShortArray Tests', () { + group('dht_short_array', () { setUpAll(dhtRecordPoolFixture.setUp); tearDownAll(dhtRecordPoolFixture.tearDown); for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { - test('create shortarray stride=$stride', + test('dht_short_array:create_stride_$stride', makeTestDHTShortArrayCreateDelete(stride: stride)); - test('add shortarray stride=$stride', + test('dht_short_array:add_stride_$stride', makeTestDHTShortArrayAdd(stride: stride)); } }); - group('DHTLog Tests', () { + group('dht_log', () { setUpAll(dhtRecordPoolFixture.setUp); tearDownAll(dhtRecordPoolFixture.tearDown); for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { - test('create log stride=$stride', + test('dht_log:create_stride_$stride', makeTestDHTLogCreateDelete(stride: stride)); test( timeout: const Timeout(Duration(seconds: 480)), - 'add/truncate log stride=$stride', + 'dht_log:add_truncate_stride_$stride', makeTestDHTLogAddTruncate(stride: stride), ); } }); }); - group('TableDB Tests', () { - group('TableDBArray Tests', () { - // test('create/delete TableDBArray', testTableDBArrayCreateDelete); + group('table_db', () { + group('table_db_array', () { + test('table_db_array:create_delete', testTableDBArrayCreateDelete); - group('TableDBArray Add/Get Tests', () { + group('table_db_array:add_get', () { for (final params in [ // (99, 3, 15), @@ -110,7 +121,7 @@ void main() { test( timeout: const Timeout(Duration(seconds: 480)), - 'add/remove TableDBArray count = $count batchSize=$batchSize', + 'table_db_array:add_remove_count=${count}_batchSize=$batchSize', makeTestTableDBArrayAddGetClear( count: count, singles: singles, @@ -120,7 +131,7 @@ void main() { } }); - group('TableDBArray Insert Tests', () { + group('table_db_array:insert', () { for (final params in [ // (99, 3, 15), @@ -145,7 +156,8 @@ void main() { test( timeout: const Timeout(Duration(seconds: 480)), - 'insert TableDBArray count=$count singles=$singles batchSize=$batchSize', + 'table_db_array:insert_count=${count}_' + 'singles=${singles}_batchSize=$batchSize', makeTestTableDBArrayInsert( count: count, singles: singles, @@ -155,7 +167,7 @@ void main() { } }); - group('TableDBArray Remove Tests', () { + group('table_db_array:remove', () { for (final params in [ // (99, 3, 15), @@ -180,7 +192,8 @@ void main() { test( timeout: const Timeout(Duration(seconds: 480)), - 'remove TableDBArray count=$count singles=$singles batchSize=$batchSize', + 'table_db_array:remove_count=${count}_' + 'singles=${singles}_batchSize=$batchSize', makeTestTableDBArrayRemove( count: count, singles: singles, diff --git a/packages/veilid_support/example/pubspec.lock b/packages/veilid_support/example/pubspec.lock index 9af9773..bc4ab6f 100644 --- a/packages/veilid_support/example/pubspec.lock +++ b/packages/veilid_support/example/pubspec.lock @@ -106,7 +106,7 @@ packages: source: hosted version: "1.1.2" collection: - dependency: transitive + dependency: "direct main" description: name: collection sha256: "2f5709ae4d3d59dd8f7cd309b4e023046b57d8a6c82130785d2b0e5868084e76" diff --git a/packages/veilid_support/example/pubspec.yaml b/packages/veilid_support/example/pubspec.yaml index b8333e6..42f9885 100644 --- a/packages/veilid_support/example/pubspec.yaml +++ b/packages/veilid_support/example/pubspec.yaml @@ -7,6 +7,7 @@ environment: sdk: '>=3.3.4 <4.0.0' dependencies: + collection: ^1.19.1 cupertino_icons: ^1.0.8 flutter: sdk: flutter diff --git a/packages/veilid_support/lib/src/persistent_queue.dart b/packages/veilid_support/lib/src/persistent_queue.dart index 939a5b3..4e5e541 100644 --- a/packages/veilid_support/lib/src/persistent_queue.dart +++ b/packages/veilid_support/lib/src/persistent_queue.dart @@ -9,19 +9,20 @@ import 'config.dart'; import 'table_db.dart'; import 'veilid_log.dart'; -class PersistentQueue - with TableDBBackedFromBuffer> { +class PersistentQueue with TableDBBackedFromBuffer> { // PersistentQueue( {required String table, required String key, required T Function(Uint8List) fromBuffer, + required Uint8List Function(T) toBuffer, required Future Function(IList) closure, bool deleteOnClose = true, void Function(Object, StackTrace)? onError}) : _table = table, _key = key, _fromBuffer = fromBuffer, + _toBuffer = toBuffer, _closure = closure, _deleteOnClose = deleteOnClose, _onError = onError { @@ -34,11 +35,13 @@ class PersistentQueue // Close the sync add stream await _syncAddController.close(); + await _syncAddTask; // Stop the processing trigger await _queueReady.close(); + await _processorTask; - // Wait for any setStates to finish + // No more queue actions await _queueMutex.acquire(); // Clean up table if desired @@ -47,27 +50,40 @@ class PersistentQueue } } + Future get wait async { + // Ensure the init finished + await _initWait(); + + if (_queue.isEmpty) { + return; + } + final completer = Completer(); + _queueDoneCompleter = completer; + await completer.future; + } + Future _init(Completer _) async { // Start the processor - unawaited(Future.delayed(Duration.zero, () async { + _processorTask = Future.delayed(Duration.zero, () async { await _initWait(); await for (final _ in _queueReady.stream) { await _process(); } - })); + }); // Start the sync add controller - unawaited(Future.delayed(Duration.zero, () async { + _syncAddTask = Future.delayed(Duration.zero, () async { await _initWait(); await for (final elem in _syncAddController.stream) { await addAll(elem); } - })); + }); // Load the queue if we have one try { await _queueMutex.protect(() async { _queue = await load() ?? await store(IList.empty()); + _sendUpdateEventsInner(); }); } on Exception catch (e, st) { if (_onError != null) { @@ -78,11 +94,20 @@ class PersistentQueue } } + void _sendUpdateEventsInner() { + assert(_queueMutex.isLocked, 'must be locked'); + if (_queue.isNotEmpty) { + if (!_queueReady.isClosed) { + _queueReady.sink.add(null); + } + } else { + _queueDoneCompleter?.complete(); + } + } + Future _updateQueueInner(IList newQueue) async { _queue = await store(newQueue); - if (_queue.isNotEmpty) { - _queueReady.sink.add(null); - } + _sendUpdateEventsInner(); } Future add(T item) async { @@ -213,7 +238,7 @@ class PersistentQueue Uint8List valueToBuffer(IList val) { final writer = CodedBufferWriter(); for (final elem in val) { - writer.writeRawBytes(elem.writeToBuffer()); + writer.writeRawBytes(_toBuffer(elem)); } return writer.toBuffer(); } @@ -221,12 +246,16 @@ class PersistentQueue final String _table; final String _key; final T Function(Uint8List) _fromBuffer; + final Uint8List Function(T) _toBuffer; final bool _deleteOnClose; final WaitSet _initWait = WaitSet(); - final Mutex _queueMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); - IList _queue = IList.empty(); + final _queueMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); + var _queue = IList.empty(); final StreamController> _syncAddController = StreamController(); final StreamController _queueReady = StreamController(); final Future Function(IList) _closure; final void Function(Object, StackTrace)? _onError; + late Future _processorTask; + late Future _syncAddTask; + Completer? _queueDoneCompleter; }