diff --git a/packages/veilid_support/example/integration_test/app_test.dart b/packages/veilid_support/example/integration_test/app_test.dart index 1577d7a..b7a807f 100644 --- a/packages/veilid_support/example/integration_test/app_test.dart +++ b/packages/veilid_support/example/integration_test/app_test.dart @@ -1,6 +1,7 @@ import 'package:flutter/foundation.dart'; import 'package:integration_test/integration_test.dart'; import 'package:test/test.dart'; +import 'package:veilid_support/veilid_support.dart'; import 'package:veilid_test/veilid_test.dart'; import 'fixtures/fixtures.dart'; @@ -37,59 +38,159 @@ void main() { group('TableDB Tests', () { group('TableDBArray Tests', () { - test('create TableDBArray', makeTestTableDBArrayCreateDelete()); - test( - timeout: const Timeout(Duration(seconds: 480)), - 'add/truncate TableDBArray', - makeTestDHTLogAddTruncate(), - ); + // test('create/delete TableDBArray', testTableDBArrayCreateDelete); + + // group('TableDBArray Add/Get Tests', () { + // for (final params in [ + // // + // (99, 3, 15), + // (100, 4, 16), + // (101, 5, 17), + // // + // (511, 3, 127), + // (512, 4, 128), + // (513, 5, 129), + // // + // (4095, 3, 1023), + // (4096, 4, 1024), + // (4097, 5, 1025), + // // + // (65535, 3, 16383), + // (65536, 4, 16384), + // (65537, 5, 16385), + // ]) { + // final count = params.$1; + // final singles = params.$2; + // final batchSize = params.$3; + + // test( + // // timeout: const Timeout(Duration(seconds: 480)), + // 'add/remove TableDBArray count = $count batchSize=$batchSize', + // makeTestTableDBArrayAddGetClear( + // count: count, + // singles: singles, + // batchSize: batchSize, + // crypto: const VeilidCryptoPublic()), + // ); + // } + // }); + + // group('TableDBArray Insert Tests', () { + // for (final params in [ + // // + // (99, 3, 15), + // (100, 4, 16), + // (101, 5, 17), + // // + // (511, 3, 127), + // (512, 4, 128), + // (513, 5, 129), + // // + // (4095, 3, 1023), + // (4096, 4, 1024), + // (4097, 5, 1025), + // // + // (65535, 3, 16383), + // (65536, 4, 16384), + // (65537, 5, 16385), + // ]) { + // final count = params.$1; + // final singles = params.$2; + // final batchSize = params.$3; + + // test( + // // timeout: const Timeout(Duration(seconds: 480)), + // 'insert TableDBArray count=$count singles=$singles batchSize=$batchSize', + // makeTestTableDBArrayInsert( + // count: count, + // singles: singles, + // batchSize: batchSize, + // crypto: const VeilidCryptoPublic()), + // ); + // } + // }); + + group('TableDBArray Remove Tests', () { + for (final params in [ + // + (99, 3, 15), + (100, 4, 16), + (101, 5, 17), + // + (511, 3, 127), + (512, 4, 128), + (513, 5, 129), + // + (4095, 3, 1023), + (4096, 4, 1024), + (4097, 5, 1025), + // + (65535, 3, 16383), + (65536, 4, 16384), + (65537, 5, 16385), + ]) { + final count = params.$1; + final singles = params.$2; + final batchSize = params.$3; + + test( + // timeout: const Timeout(Duration(seconds: 480)), + 'remove TableDBArray count=$count singles=$singles batchSize=$batchSize', + makeTestTableDBArrayRemove( + count: count, + singles: singles, + batchSize: batchSize, + crypto: const VeilidCryptoPublic()), + ); + } + }); }); }); - group('DHT Support Tests', () { - setUpAll(updateProcessorFixture.setUp); - setUpAll(tickerFixture.setUp); - tearDownAll(tickerFixture.tearDown); - tearDownAll(updateProcessorFixture.tearDown); + // group('DHT Support Tests', () { + // setUpAll(updateProcessorFixture.setUp); + // setUpAll(tickerFixture.setUp); + // tearDownAll(tickerFixture.tearDown); + // tearDownAll(updateProcessorFixture.tearDown); - test('create pool', testDHTRecordPoolCreate); + // test('create pool', testDHTRecordPoolCreate); - group('DHTRecordPool Tests', () { - setUpAll(dhtRecordPoolFixture.setUp); - tearDownAll(dhtRecordPoolFixture.tearDown); + // group('DHTRecordPool Tests', () { + // setUpAll(dhtRecordPoolFixture.setUp); + // tearDownAll(dhtRecordPoolFixture.tearDown); - test('create/delete record', testDHTRecordCreateDelete); - test('record scopes', testDHTRecordScopes); - test('create/delete deep record', testDHTRecordDeepCreateDelete); - }); + // test('create/delete record', testDHTRecordCreateDelete); + // test('record scopes', testDHTRecordScopes); + // test('create/delete deep record', testDHTRecordDeepCreateDelete); + // }); - group('DHTShortArray Tests', () { - setUpAll(dhtRecordPoolFixture.setUp); - tearDownAll(dhtRecordPoolFixture.tearDown); + // group('DHTShortArray Tests', () { + // setUpAll(dhtRecordPoolFixture.setUp); + // tearDownAll(dhtRecordPoolFixture.tearDown); - for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { - test('create shortarray stride=$stride', - makeTestDHTShortArrayCreateDelete(stride: stride)); - test('add shortarray stride=$stride', - makeTestDHTShortArrayAdd(stride: stride)); - } - }); + // for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + // test('create shortarray stride=$stride', + // makeTestDHTShortArrayCreateDelete(stride: stride)); + // test('add shortarray stride=$stride', + // makeTestDHTShortArrayAdd(stride: stride)); + // } + // }); - group('DHTLog Tests', () { - setUpAll(dhtRecordPoolFixture.setUp); - tearDownAll(dhtRecordPoolFixture.tearDown); + // group('DHTLog Tests', () { + // setUpAll(dhtRecordPoolFixture.setUp); + // tearDownAll(dhtRecordPoolFixture.tearDown); - for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { - test('create log stride=$stride', - makeTestDHTLogCreateDelete(stride: stride)); - test( - timeout: const Timeout(Duration(seconds: 480)), - 'add/truncate log stride=$stride', - makeTestDHTLogAddTruncate(stride: stride), - ); - } - }); - }); + // for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + // test('create log stride=$stride', + // makeTestDHTLogCreateDelete(stride: stride)); + // test( + // timeout: const Timeout(Duration(seconds: 480)), + // 'add/truncate log stride=$stride', + // makeTestDHTLogAddTruncate(stride: stride), + // ); + // } + // }); + // }); }); }); } diff --git a/packages/veilid_support/example/integration_test/test_table_db_array.dart b/packages/veilid_support/example/integration_test/test_table_db_array.dart index e9087f6..81607a6 100644 --- a/packages/veilid_support/example/integration_test/test_table_db_array.dart +++ b/packages/veilid_support/example/integration_test/test_table_db_array.dart @@ -1,134 +1,213 @@ import 'dart:convert'; +import 'dart:math'; +import 'dart:typed_data'; import 'package:test/test.dart'; import 'package:veilid_support/veilid_support.dart'; -Future Function() makeTestTableDBArrayCreateDelete() => () async { - // Close before delete - { - final arr = await TableDBArray( - table: 'test', crypto: const VeilidCryptoPublic()); - - expect(await arr.operate((r) async => r.length), isZero); - expect(arr.isOpen, isTrue); - await arr.close(); - expect(arr.isOpen, isFalse); - await arr.delete(); - // Operate should fail - await expectLater(() async => arr.operate((r) async => r.length), - throwsA(isA())); - } - - // Close after delete - { - final arr = await DHTShortArray.create( - debugName: 'sa_create_delete 2 stride $stride', stride: stride); - await arr.delete(); - // Operate should still succeed because things aren't closed - expect(await arr.operate((r) async => r.length), isZero); - await arr.close(); - // Operate should fail - await expectLater(() async => arr.operate((r) async => r.length), - throwsA(isA())); - } - - // Close after delete multiple - // Okay to request delete multiple times before close - { - final arr = await DHTShortArray.create( - debugName: 'sa_create_delete 3 stride $stride', stride: stride); - await arr.delete(); - await arr.delete(); - // Operate should still succeed because things aren't closed - expect(await arr.operate((r) async => r.length), isZero); - await arr.close(); - await expectLater(() async => arr.close(), throwsA(isA())); - // Operate should fail - await expectLater(() async => arr.operate((r) async => r.length), - throwsA(isA())); - } - }; - -Future Function() makeTestTableDBArrayAdd({required int stride}) => - () async { - final arr = await DHTShortArray.create( - debugName: 'sa_add 1 stride $stride', stride: stride); - - final dataset = Iterable.generate(256) - .map((n) => utf8.encode('elem $n')) - .toList(); - - print('adding singles\n'); - { - final res = await arr.operateWrite((w) async { - for (var n = 4; n < 8; n++) { - print('$n '); - final success = await w.tryAddItem(dataset[n]); - expect(success, isTrue); - } - }); - expect(res, isNull); - } - - print('adding batch\n'); - { - final res = await arr.operateWrite((w) async { - print('${dataset.length ~/ 2}-${dataset.length}'); - final success = await w.tryAddItems( - dataset.sublist(dataset.length ~/ 2, dataset.length)); - expect(success, isTrue); - }); - expect(res, isNull); - } - - print('inserting singles\n'); - { - final res = await arr.operateWrite((w) async { - for (var n = 0; n < 4; n++) { - print('$n '); - final success = await w.tryInsertItem(n, dataset[n]); - expect(success, isTrue); - } - }); - expect(res, isNull); - } - - print('inserting batch\n'); - { - final res = await arr.operateWrite((w) async { - print('8-${dataset.length ~/ 2}'); - final success = await w.tryInsertItems( - 8, dataset.sublist(8, dataset.length ~/ 2)); - expect(success, isTrue); - }); - expect(res, isNull); - } - - //print('get all\n'); - { - final dataset2 = await arr.operate((r) async => r.getItemRange(0)); - expect(dataset2, equals(dataset)); - } - { - final dataset3 = - await arr.operate((r) async => r.getItemRange(64, length: 128)); - expect(dataset3, equals(dataset.sublist(64, 64 + 128))); - } - - //print('clear\n'); - { - await arr.operateWriteEventual((w) async { - await w.clear(); - return true; - }); - } - - //print('get all\n'); - { - final dataset4 = await arr.operate((r) async => r.getItemRange(0)); - expect(dataset4, isEmpty); - } +Future testTableDBArrayCreateDelete() async { + // Close before delete + { + final arr = + TableDBArray(table: 'testArray', crypto: const VeilidCryptoPublic()); + expect(() => arr.length, throwsA(isA())); + expect(arr.isOpen, isTrue); + await arr.initWait(); + expect(arr.isOpen, isTrue); + expect(arr.length, isZero); + await arr.close(); + expect(arr.isOpen, isFalse); + await arr.delete(); + expect(arr.isOpen, isFalse); + } + // Async create with close after delete and then reopen + { + final arr = await TableDBArray.make( + table: 'testArray', crypto: const VeilidCryptoPublic()); + expect(arr.length, isZero); + expect(arr.isOpen, isTrue); + await expectLater(() async { await arr.delete(); - await arr.close(); + }, throwsA(isA())); + expect(arr.isOpen, isTrue); + await arr.close(); + expect(arr.isOpen, isFalse); + + final arr2 = await TableDBArray.make( + table: 'testArray', crypto: const VeilidCryptoPublic()); + expect(arr2.isOpen, isTrue); + expect(arr.isOpen, isFalse); + await arr2.close(); + expect(arr2.isOpen, isFalse); + await arr2.delete(); + } +} + +Uint8List makeData(int n) => utf8.encode('elem $n'); +List makeDataBatch(int n, int batchSize) => + List.generate(batchSize, (x) => makeData(n + x)); + +Future Function() makeTestTableDBArrayAddGetClear( + {required int count, + required int singles, + required int batchSize, + required VeilidCrypto crypto}) => + () async { + final arr = await TableDBArray.make(table: 'testArray', crypto: crypto); + + print('adding'); + { + for (var n = 0; n < count;) { + var toAdd = min(batchSize, count - n); + for (var s = 0; s < min(singles, toAdd); s++) { + await arr.add(makeData(n)); + toAdd--; + n++; + } + + await arr.addAll(makeDataBatch(n, toAdd)); + n += toAdd; + + print(' $n/$count'); + } + } + + print('get singles'); + { + for (var n = 0; n < batchSize; n++) { + expect(await arr.get(n), equals(makeData(n))); + } + } + + print('get batch'); + { + for (var n = batchSize; n < count; n += batchSize) { + final toGet = min(batchSize, count - n); + expect(await arr.getRange(n, toGet), equals(makeDataBatch(n, toGet))); + } + } + + print('clear'); + { + await arr.clear(); + expect(arr.length, isZero); + } + + await arr.close(delete: true); + }; + +Future Function() makeTestTableDBArrayInsert( + {required int count, + required int singles, + required int batchSize, + required VeilidCrypto crypto}) => + () async { + final arr = await TableDBArray.make(table: 'testArray', crypto: crypto); + + final match = []; + + print('inserting'); + { + for (var n = 0; n < count;) { + final start = n; + var toAdd = min(batchSize, count - n); + for (var s = 0; s < min(singles, toAdd); s++) { + final data = makeData(n); + await arr.insert(start, data); + match.insert(start, data); + toAdd--; + n++; + } + + final data = makeDataBatch(n, toAdd); + await arr.insertAll(start, data); + match.insertAll(start, data); + n += toAdd; + + print(' $n/$count'); + } + } + + print('get singles'); + { + for (var n = 0; n < batchSize; n++) { + expect(await arr.get(n), equals(match[n])); + } + } + + print('get batch'); + { + for (var n = batchSize; n < count; n += batchSize) { + final toGet = min(batchSize, count - n); + expect(await arr.getRange(n, toGet), + equals(match.sublist(n, n + toGet))); + } + } + + print('clear'); + { + await arr.clear(); + expect(arr.length, isZero); + } + + await arr.close(delete: true); + }; + + +Future Function() makeTestTableDBArrayRemove( + {required int count, + required int singles, + required int batchSize, + required VeilidCrypto crypto}) => + () async { + final arr = await TableDBArray.make(table: 'testArray', crypto: crypto); + + final match = []; +xxx removal test + print('inserting'); + { + for (var n = 0; n < count;) { + final start = n; + var toAdd = min(batchSize, count - n); + for (var s = 0; s < min(singles, toAdd); s++) { + final data = makeData(n); + await arr.insert(start, data); + match.insert(start, data); + toAdd--; + n++; + } + + final data = makeDataBatch(n, toAdd); + await arr.insertAll(start, data); + match.insertAll(start, data); + n += toAdd; + + print(' $n/$count'); + } + } + + print('get singles'); + { + for (var n = 0; n < batchSize; n++) { + expect(await arr.get(n), equals(match[n])); + } + } + + print('get batch'); + { + for (var n = batchSize; n < count; n += batchSize) { + final toGet = min(batchSize, count - n); + expect(await arr.getRange(n, toGet), + equals(match.sublist(n, n + toGet))); + } + } + + print('clear'); + { + await arr.clear(); + expect(arr.length, isZero); + } + + await arr.close(delete: true); }; diff --git a/packages/veilid_support/lib/src/table_db_array.dart b/packages/veilid_support/lib/src/table_db_array.dart index 504fb16..dbabd3a 100644 --- a/packages/veilid_support/lib/src/table_db_array.dart +++ b/packages/veilid_support/lib/src/table_db_array.dart @@ -16,10 +16,25 @@ class TableDBArray { _initWait.add(_init); } + static Future make({ + required String table, + required VeilidCrypto crypto, + }) async { + final out = TableDBArray(table: table, crypto: crypto); + await out._initWait(); + return out; + } + + Future initWait() async { + await _initWait(); + } + Future _init() async { // Load the array details await _mutex.protect(() async { _tableDB = await Veilid.instance.openTableDB(_table, 1); + await _loadHead(); + _initDone = true; }); } @@ -27,23 +42,45 @@ class TableDBArray { // Ensure the init finished await _initWait(); - await _mutex.acquire(); - - await _changeStream.close(); - _tableDB.close(); - + // Allow multiple attempts to close + if (_open) { + await _mutex.protect(() async { + await _changeStream.close(); + _tableDB.close(); + _open = false; + }); + } if (delete) { await Veilid.instance.deleteTableDB(_table); } } + Future delete() async { + await _initWait(); + if (_open) { + throw StateError('should be closed first'); + } + await Veilid.instance.deleteTableDB(_table); + } + Future> listen(void Function() onChanged) async => _changeStream.stream.listen((_) => onChanged()); //////////////////////////////////////////////////////////// // Public interface - int get length => _length; + int get length { + if (!_open) { + throw StateError('not open'); + } + if (!_initDone) { + throw StateError('not initialized'); + } + + return _length; + } + + bool get isOpen => _open; Future add(Uint8List value) async { await _initWait(); @@ -67,12 +104,22 @@ class TableDBArray { Future get(int pos) async { await _initWait(); - return _mutex.protect(() async => _getInner(pos)); + return _mutex.protect(() async { + if (!_open) { + throw StateError('not open'); + } + return _getInner(pos); + }); } - Future> getAll(int start, int length) async { + Future> getRange(int start, int length) async { await _initWait(); - return _mutex.protect(() async => _getAllInner(start, length)); + return _mutex.protect(() async { + if (!_open) { + throw StateError('not open'); + } + return _getRangeInner(start, length); + }); } Future remove(int pos, {Output? out}) async { @@ -96,6 +143,7 @@ class TableDBArray { } _length = 0; _nextFree = 0; + _maxEntry = 0; _dirtyChunks.clear(); _chunkCache.clear(); }); @@ -175,7 +223,7 @@ class TableDBArray { return (await _loadEntry(entry))!; } - Future> _getAllInner(int start, int length) async { + Future> _getRangeInner(int start, int length) async { if (length < 0) { throw StateError('length should not be negative'); } @@ -252,11 +300,16 @@ class TableDBArray { Future _writeTransaction( Future Function(VeilidTableDBTransaction) closure) async => _mutex.protect(() async { + if (!_open) { + throw StateError('not open'); + } + final _oldLength = _length; final _oldNextFree = _nextFree; + final _oldMaxEntry = _maxEntry; try { final out = await transactionScope(_tableDB, (t) async { - final out = closure(t); + final out = await closure(t); await _saveHead(t); await _flushDirtyChunks(t); return out; @@ -267,6 +320,7 @@ class TableDBArray { // restore head _length = _oldLength; _nextFree = _oldNextFree; + _maxEntry = _oldMaxEntry; // invalidate caches because they could have been written to _chunkCache.clear(); _dirtyChunks.clear(); @@ -322,34 +376,35 @@ class TableDBArray { if (start < 0 || start >= _length) { throw IndexError.withLength(start, _length); } - final end = start + length - 1; // Slide everything over in reverse - final toCopyTotal = _length - start; - var dest = end + toCopyTotal; var src = _length - 1; + var dest = src + length; (int, Uint8List)? lastSrcChunk; (int, Uint8List)? lastDestChunk; while (src >= start) { + final remaining = (src - start) + 1; final srcChunkNumber = src ~/ _indexStride; final srcIndex = src % _indexStride; - final srcLength = srcIndex + 1; + final srcLength = min(remaining, srcIndex + 1); final srcChunk = (lastSrcChunk != null && (lastSrcChunk.$1 == srcChunkNumber)) ? lastSrcChunk.$2 : await _loadIndexChunk(srcChunkNumber); + _dirtyChunks[srcChunkNumber] = srcChunk; lastSrcChunk = (srcChunkNumber, srcChunk); final destChunkNumber = dest ~/ _indexStride; final destIndex = dest % _indexStride; - final destLength = destIndex + 1; + final destLength = min(remaining, destIndex + 1); final destChunk = (lastDestChunk != null && (lastDestChunk.$1 == destChunkNumber)) ? lastDestChunk.$2 : await _loadIndexChunk(destChunkNumber); + _dirtyChunks[destChunkNumber] = destChunk; lastDestChunk = (destChunkNumber, destChunk); final toCopy = min(srcLength, destLength); @@ -395,6 +450,7 @@ class TableDBArray { (lastSrcChunk != null && (lastSrcChunk.$1 == srcChunkNumber)) ? lastSrcChunk.$2 : await _loadIndexChunk(srcChunkNumber); + _dirtyChunks[srcChunkNumber] = srcChunk; lastSrcChunk = (srcChunkNumber, srcChunk); final destChunkNumber = dest ~/ _indexStride; @@ -405,6 +461,7 @@ class TableDBArray { (lastDestChunk != null && (lastDestChunk.$1 == destChunkNumber)) ? lastDestChunk.$2 : await _loadIndexChunk(destChunkNumber); + _dirtyChunks[destChunkNumber] = destChunk; lastDestChunk = (destChunkNumber, destChunk); final toCopy = min(srcLength, destLength); @@ -453,7 +510,7 @@ class TableDBArray { Future _flushDirtyChunks(VeilidTableDBTransaction t) async { for (final ec in _dirtyChunks.entries) { - await _tableDB.store(0, _chunkKey(ec.key), ec.value); + await t.store(0, _chunkKey(ec.key), ec.value); } _dirtyChunks.clear(); } @@ -464,25 +521,28 @@ class TableDBArray { if (headBytes == null) { _length = 0; _nextFree = 0; + _maxEntry = 0; } else { final b = headBytes.buffer.asByteData(); _length = b.getUint32(0); _nextFree = b.getUint32(4); + _maxEntry = b.getUint32(8); } } Future _saveHead(VeilidTableDBTransaction t) async { assert(_mutex.isLocked, 'should be locked'); - final b = ByteData(8) + final b = ByteData(12) ..setUint32(0, _length) - ..setUint32(4, _nextFree); + ..setUint32(4, _nextFree) + ..setUint32(8, _maxEntry); await t.store(0, _headKey, b.buffer.asUint8List()); } Future _allocateEntry() async { assert(_mutex.isLocked, 'should be locked'); if (_nextFree == 0) { - return _length; + return _maxEntry++; } // pop endogenous free list final free = _nextFree; @@ -501,6 +561,8 @@ class TableDBArray { final String _table; late final VeilidTableDB _tableDB; + var _open = true; + var _initDone = false; final VeilidCrypto _crypto; final WaitSet _initWait = WaitSet(); final Mutex _mutex = Mutex(); @@ -508,6 +570,7 @@ class TableDBArray { // Head state int _length = 0; int _nextFree = 0; + int _maxEntry = 0; static const int _indexStride = 16384; final List<(int, Uint8List)> _chunkCache = []; final Map _dirtyChunks = {};