From 8cd73b2844c3894dc6fde77a1e18890294e29288 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 15 May 2024 22:45:50 -0400 Subject: [PATCH] checkpoint --- .../example/integration_test/app_test.dart | 44 ++++-- .../fixtures/dht_record_pool_fixture.dart | 13 +- .../integration_test/test_dht_log.dart | 130 ++++++++++++++++++ .../test_dht_record_pool.dart | 74 +++++----- .../test_dht_short_array.dart | 38 ++++- .../lib/dht_support/src/dht_log/dht_log.dart | 1 + .../src/dht_log/dht_log_append.dart | 60 ++++++-- .../src/dht_log/dht_log_spine.dart | 83 +++++++++-- .../src/dht_record/dht_record_pool.dart | 106 +++++++++++--- .../dht_short_array/dht_short_array_head.dart | 8 +- .../dht_short_array_write.dart | 40 ++++-- .../src/interfaces/dht_append_truncate.dart | 7 + .../src/interfaces/dht_openable.dart | 8 +- .../src/interfaces/dht_random_write.dart | 15 ++ 14 files changed, 513 insertions(+), 114 deletions(-) create mode 100644 packages/veilid_support/example/integration_test/test_dht_log.dart diff --git a/packages/veilid_support/example/integration_test/app_test.dart b/packages/veilid_support/example/integration_test/app_test.dart index 4a5b1e1..ba7785c 100644 --- a/packages/veilid_support/example/integration_test/app_test.dart +++ b/packages/veilid_support/example/integration_test/app_test.dart @@ -7,6 +7,7 @@ import 'package:integration_test/integration_test.dart'; import 'package:veilid_test/veilid_test.dart'; import 'fixtures/fixtures.dart'; +import 'test_dht_log.dart'; import 'test_dht_record_pool.dart'; import 'test_dht_short_array.dart'; @@ -38,24 +39,37 @@ void main() { test('create pool', testDHTRecordPoolCreate); - group('DHTRecordPool Tests', () { + // group('DHTRecordPool Tests', () { + // setUpAll(dhtRecordPoolFixture.setUp); + // tearDownAll(dhtRecordPoolFixture.tearDown); + + // test('create/delete record', testDHTRecordCreateDelete); + // test('record scopes', testDHTRecordScopes); + // test('create/delete deep record', testDHTRecordDeepCreateDelete); + // }); + + // group('DHTShortArray Tests', () { + // setUpAll(dhtRecordPoolFixture.setUp); + // tearDownAll(dhtRecordPoolFixture.tearDown); + + // for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + // test('create shortarray stride=$stride', + // makeTestDHTShortArrayCreateDelete(stride: stride)); + // test('add shortarray stride=$stride', + // makeTestDHTShortArrayAdd(stride: 256)); + // } + // }); + + group('DHTLog Tests', () { setUpAll(dhtRecordPoolFixture.setUp); tearDownAll(dhtRecordPoolFixture.tearDown); - test('create/delete record', testDHTRecordCreateDelete); - test('record scopes', testDHTRecordScopes); - test('create/delete deep record', testDHTRecordDeepCreateDelete); - }); - - group('DHTShortArray Tests', () { - setUpAll(dhtRecordPoolFixture.setUp); - tearDownAll(dhtRecordPoolFixture.tearDown); - - for (final stride in [256, 64, 32, 16, 8, 4, 2, 1]) { - test('create shortarray stride=$stride', - makeTestDHTShortArrayCreateDelete(stride: stride)); - test('add shortarray stride=$stride', - makeTestDHTShortArrayAdd(stride: 256)); + for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + test('create log stride=$stride', + makeTestDHTLogCreateDelete(stride: stride)); + test('add/truncate log stride=$stride', + makeTestDHTLogAddTruncate(stride: 256), + timeout: const Timeout(Duration(seconds: 480))); } }); }); diff --git a/packages/veilid_support/example/integration_test/fixtures/dht_record_pool_fixture.dart b/packages/veilid_support/example/integration_test/fixtures/dht_record_pool_fixture.dart index 216d00f..d38181f 100644 --- a/packages/veilid_support/example/integration_test/fixtures/dht_record_pool_fixture.dart +++ b/packages/veilid_support/example/integration_test/fixtures/dht_record_pool_fixture.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:async_tools/async_tools.dart'; +import 'package:flutter/foundation.dart'; import 'package:veilid_support/veilid_support.dart'; import 'package:veilid_test/veilid_test.dart'; @@ -12,9 +13,13 @@ class DHTRecordPoolFixture implements TickerFixtureTickable { UpdateProcessorFixture updateProcessorFixture; TickerFixture tickerFixture; - Future setUp() async { + Future setUp({bool purge = true}) async { await _fixtureMutex.acquire(); - await DHTRecordPool.init(); + if (purge) { + await Veilid.instance.debug('record purge local'); + await Veilid.instance.debug('record purge remote'); + } + await DHTRecordPool.init(logger: debugPrintSynchronously); tickerFixture.register(this); } @@ -22,6 +27,10 @@ class DHTRecordPoolFixture implements TickerFixtureTickable { assert(_fixtureMutex.isLocked, 'should not tearDown without setUp'); tickerFixture.unregister(this); await DHTRecordPool.close(); + + final recordList = await Veilid.instance.debug('record list local'); + debugPrintSynchronously('DHT Record List:\n$recordList'); + _fixtureMutex.release(); } diff --git a/packages/veilid_support/example/integration_test/test_dht_log.dart b/packages/veilid_support/example/integration_test/test_dht_log.dart new file mode 100644 index 0000000..fcdabad --- /dev/null +++ b/packages/veilid_support/example/integration_test/test_dht_log.dart @@ -0,0 +1,130 @@ +import 'dart:convert'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:veilid_support/veilid_support.dart'; + +Future Function() makeTestDHTLogCreateDelete({required int stride}) => + () async { + // Close before delete + { + final dlog = await DHTLog.create( + debugName: 'log_create_delete 1 stride $stride', stride: stride); + expect(await dlog.operate((r) async => r.length), isZero); + expect(dlog.isOpen, isTrue); + await dlog.close(); + expect(dlog.isOpen, isFalse); + await dlog.delete(); + // Operate should fail + await expectLater(() async => dlog.operate((r) async => r.length), + throwsA(isA())); + } + + // Close after delete + { + final dlog = await DHTLog.create( + debugName: 'log_create_delete 2 stride $stride', stride: stride); + await dlog.delete(); + // Operate should still succeed because things aren't closed + expect(await dlog.operate((r) async => r.length), isZero); + await dlog.close(); + // Operate should fail + await expectLater(() async => dlog.operate((r) async => r.length), + throwsA(isA())); + } + + // Close after delete multiple + // Okay to request delete multiple times before close + { + final dlog = await DHTLog.create( + debugName: 'log_create_delete 3 stride $stride', stride: stride); + await dlog.delete(); + await dlog.delete(); + // Operate should still succeed because things aren't closed + expect(await dlog.operate((r) async => r.length), isZero); + await dlog.close(); + await dlog.close(); + // Operate should fail + await expectLater(() async => dlog.operate((r) async => r.length), + throwsA(isA())); + } + }; + +Future Function() makeTestDHTLogAddTruncate({required int stride}) => + () async { + final startTime = DateTime.now(); + + final dlog = await DHTLog.create( + debugName: 'log_add 1 stride $stride', stride: stride); + + final dataset = Iterable.generate(1000) + .map((n) => utf8.encode('elem $n')) + .toList(); + + print('adding\n'); + { + final res = await dlog.operateAppend((w) async { + const chunk = 50; + for (var n = 0; n < dataset.length; n += chunk) { + print('$n-${n + chunk - 1} '); + final success = + await w.tryAppendItems(dataset.sublist(n, n + chunk)); + expect(success, isTrue); + } + }); + expect(res, isNull); + } + + print('get all\n'); + { + final dataset2 = await dlog.operate((r) async => r.getItemRange(0)); + expect(dataset2, equals(dataset)); + } + { + final dataset3 = + await dlog.operate((r) async => r.getItemRange(64, length: 128)); + expect(dataset3, equals(dataset.sublist(64, 64 + 128))); + } + { + final dataset4 = + await dlog.operate((r) async => r.getItemRange(0, length: 1000)); + expect(dataset4, equals(dataset.sublist(0, 1000))); + } + { + final dataset5 = + await dlog.operate((r) async => r.getItemRange(500, length: 499)); + expect(dataset5, equals(dataset.sublist(500, 999))); + } + print('truncate\n'); + { + await dlog.operateAppend((w) async => w.truncate(5)); + } + { + final dataset6 = await dlog + .operate((r) async => r.getItemRange(500 - 5, length: 499)); + expect(dataset6, equals(dataset.sublist(500, 999))); + } + print('truncate 2\n'); + { + await dlog.operateAppend((w) async => w.truncate(251)); + } + { + final dataset7 = await dlog + .operate((r) async => r.getItemRange(500 - 256, length: 499)); + expect(dataset7, equals(dataset.sublist(500, 999))); + } + print('clear\n'); + { + await dlog.operateAppend((w) async => w.clear()); + } + print('get all\n'); + { + final dataset8 = await dlog.operate((r) async => r.getItemRange(0)); + expect(dataset8, isEmpty); + } + + await dlog.delete(); + await dlog.close(); + + final endTime = DateTime.now(); + print('Duration: ${endTime.difference(startTime)}'); + }; diff --git a/packages/veilid_support/example/integration_test/test_dht_record_pool.dart b/packages/veilid_support/example/integration_test/test_dht_record_pool.dart index 2f05d0b..45b26a7 100644 --- a/packages/veilid_support/example/integration_test/test_dht_record_pool.dart +++ b/packages/veilid_support/example/integration_test/test_dht_record_pool.dart @@ -151,17 +151,29 @@ Future testDHTRecordDeepCreateDelete() async { // Make root record final recroot = await pool.createRecord(debugName: 'test_deep_create_delete'); - for (var d = 0; d < numIterations; d++) { - // Make child set 1 - var parent = recroot; - final children = []; - for (var n = 0; n < numChildren; n++) { - final child = - await pool.createRecord(debugName: 'deep $n', parent: parent.key); - children.add(child); - parent = child; - } + // Make child set 1 + var parent = recroot; + final children = []; + for (var n = 0; n < numChildren; n++) { + final child = + await pool.createRecord(debugName: 'deep $n', parent: parent.key); + children.add(child); + parent = child; + } + // Should mark for deletion + expect(await pool.deleteRecord(recroot.key), isFalse); + + // Root should still be valid + expect(await pool.isValidRecordKey(recroot.key), isTrue); + + // Close root record + await recroot.close(); + + // Root should still be valid because children still exist + expect(await pool.isValidRecordKey(recroot.key), isTrue); + + for (var d = 0; d < numIterations; d++) { // Make child set 2 final children2 = []; parent = recroot; @@ -171,31 +183,31 @@ Future testDHTRecordDeepCreateDelete() async { children2.add(child); parent = child; } - // Should fail to delete root - await expectLater( - () async => pool.deleteRecord(recroot.key), throwsA(isA())); - - // Close child set 1 - await children.map((c) => c.close()).wait; - - // Delete child set 1 in reverse order - for (var n = numChildren - 1; n >= 0; n--) { - await pool.deleteRecord(children[n].key); - } - - // Should fail to delete root - await expectLater( - () async => pool.deleteRecord(recroot.key), throwsA(isA())); - - // Close child set 1 - await children2.map((c) => c.close()).wait; // Delete child set 2 in reverse order for (var n = numChildren - 1; n >= 0; n--) { - await pool.deleteRecord(children2[n].key); + expect(await pool.deleteRecord(children2[n].key), isFalse); } + + // Root should still be there + expect(await pool.isValidRecordKey(recroot.key), isTrue); + + // Close child set 2 + await children2.map((c) => c.close()).wait; + + // All child set 2 should be invalid + for (final c2 in children2) { + // Children should be invalid and deleted now + expect(await pool.isValidRecordKey(c2.key), isFalse); + } + + // Root should still be valid + expect(await pool.isValidRecordKey(recroot.key), isTrue); } - // Should be able to delete root now - await pool.deleteRecord(recroot.key); + // Close child set 1 + await children.map((c) => c.close()).wait; + + // Root should have gone away + expect(await pool.isValidRecordKey(recroot.key), isFalse); } diff --git a/packages/veilid_support/example/integration_test/test_dht_short_array.dart b/packages/veilid_support/example/integration_test/test_dht_short_array.dart index c2fcc2b..6ba2d23 100644 --- a/packages/veilid_support/example/integration_test/test_dht_short_array.dart +++ b/packages/veilid_support/example/integration_test/test_dht_short_array.dart @@ -61,10 +61,10 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => .map((n) => utf8.encode('elem $n')) .toList(); - print('adding\n'); + print('adding singles\n'); { final res = await arr.operateWrite((w) async { - for (var n = 0; n < dataset.length; n++) { + for (var n = 4; n < 8; n++) { print('$n '); final success = await w.tryAddItem(dataset[n]); expect(success, isTrue); @@ -73,6 +73,40 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => expect(res, isNull); } + print('adding batch\n'); + { + final res = await arr.operateWrite((w) async { + print('${dataset.length ~/ 2}-${dataset.length}'); + final success = await w.tryAddItems( + dataset.sublist(dataset.length ~/ 2, dataset.length)); + expect(success, isTrue); + }); + expect(res, isNull); + } + + print('inserting singles\n'); + { + final res = await arr.operateWrite((w) async { + for (var n = 0; n < 4; n++) { + print('$n '); + final success = await w.tryInsertItem(n, dataset[n]); + expect(success, isTrue); + } + }); + expect(res, isNull); + } + + print('inserting batch\n'); + { + final res = await arr.operateWrite((w) async { + print('8-${dataset.length ~/ 2}'); + final success = await w.tryInsertItems( + 8, dataset.sublist(8, dataset.length ~/ 2)); + expect(success, isTrue); + }); + expect(res, isNull); + } + //print('get all\n'); { final dataset2 = await arr.operate((r) async => r.getItemRange(0)); diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index 7513c8b..f7b606c 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:math'; import 'dart:typed_data'; import 'package:async_tools/async_tools.dart'; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart index 6a172a7..96c3eb4 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart @@ -9,34 +9,74 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead { @override Future tryAppendItem(Uint8List value) async { // Allocate empty index at the end of the list - final endPos = _spine.length; + final insertPos = _spine.length; _spine.allocateTail(1); - final lookup = await _spine.lookupPosition(endPos); + final lookup = await _spine.lookupPosition(insertPos); if (lookup == null) { throw StateError("can't write to dht log"); } + // Write item to the segment - return lookup.shortArray - .operateWrite((write) async => write.tryWriteItem(lookup.pos, value)); + return lookup.shortArray.operateWrite((write) async { + // If this a new segment, then clear it in case we have wrapped around + if (lookup.pos == 0) { + await write.clear(); + } else if (lookup.pos != write.length) { + // We should always be appending at the length + throw StateError('appending should be at the end'); + } + return write.tryAddItem(value); + }); + } + + @override + Future tryAppendItems(List values) async { + // Allocate empty index at the end of the list + final insertPos = _spine.length; + _spine.allocateTail(values.length); + + // Look up the first position and shortarray + for (var valueIdx = 0; valueIdx < values.length;) { + final remaining = values.length - valueIdx; + + final lookup = await _spine.lookupPosition(insertPos + valueIdx); + if (lookup == null) { + throw StateError("can't write to dht log"); + } + + final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos); + final success = await lookup.shortArray.operateWrite((write) async { + // If this a new segment, then clear it in case we have wrapped around + if (lookup.pos == 0) { + await write.clear(); + } else if (lookup.pos != write.length) { + // We should always be appending at the length + throw StateError('appending should be at the end'); + } + return write.tryAddItems(values.sublist(valueIdx, valueIdx + sacount)); + }); + if (!success) { + return false; + } + valueIdx += sacount; + } + return true; } @override Future truncate(int count) async { - final len = _spine.length; - if (count > len) { - count = len; - } + count = min(count, _spine.length); if (count == 0) { return; } if (count < 0) { throw StateError('can not remove negative items'); } - _spine.releaseHead(count); + await _spine.releaseHead(count); } @override Future clear() async { - _spine.releaseHead(_spine.length); + await _spine.releaseHead(_spine.length); } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index 65f7110..7ce3dbf 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -83,12 +83,8 @@ class _DHTLogSpine { Future delete() async { await _spineMutex.protect(() async { - final pool = DHTRecordPool.instance; - final futures = >[pool.deleteRecord(_spineRecord.key)]; - for (final (_, sc) in _spineCache) { - futures.add(sc.delete()); - } - await Future.wait(futures); + // Will deep delete all segment records as they are children + await _spineRecord.delete(); }); } @@ -218,7 +214,7 @@ class _DHTLogSpine { static TypedKey? _getSegmentKey(Uint8List subkeyData, int segment) { final decodedLength = TypedKey.decodedLength(); final segmentKeyBytes = subkeyData.sublist( - decodedLength * segment, (decodedLength + 1) * segment); + decodedLength * segment, decodedLength * (segment + 1)); if (segmentKeyBytes.equals(_emptySegmentKey)) { return null; } @@ -234,7 +230,7 @@ class _DHTLogSpine { } else { segmentKeyBytes = segmentKey.decode(); } - subkeyData.setRange(decodedLength * segment, (decodedLength + 1) * segment, + subkeyData.setRange(decodedLength * segment, decodedLength * (segment + 1), segmentKeyBytes); } @@ -435,7 +431,7 @@ class _DHTLogSpine { _tail = (_tail + count) % _positionLimit; } - void releaseHead(int count) { + Future releaseHead(int count) async { assert(_spineMutex.isLocked, 'should be locked'); final currentLength = length; @@ -447,6 +443,73 @@ class _DHTLogSpine { } _head = (_head + count) % _positionLimit; + await _purgeUnusedSegments(); + } + + Future _deleteSegmentsContiguous(int start, int end) async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + + final startSegmentNumber = start ~/ DHTShortArray.maxElements; + final startSegmentPos = start % DHTShortArray.maxElements; + + final endSegmentNumber = end ~/ DHTShortArray.maxElements; + final endSegmentPos = end % DHTShortArray.maxElements; + + final firstDeleteSegment = + (startSegmentPos == 0) ? startSegmentNumber : startSegmentNumber + 1; + final lastDeleteSegment = + (endSegmentPos == 0) ? endSegmentNumber - 1 : endSegmentNumber - 2; + + int? lastSubkey; + Uint8List? subkeyData; + for (var segmentNumber = firstDeleteSegment; + segmentNumber <= lastDeleteSegment; + segmentNumber++) { + // Lookup what subkey and segment subrange has this position's segment + // shortarray + final l = lookupSegment(segmentNumber); + final subkey = l.subkey; + final segment = l.segment; + + if (lastSubkey != subkey) { + // Flush subkey writes + if (lastSubkey != null) { + await _spineRecord.eventualWriteBytes(subkeyData!, + subkey: lastSubkey); + } + + xxx debug this, it takes forever + + // Get next subkey + subkeyData = await _spineRecord.get(subkey: subkey); + if (subkeyData != null) { + lastSubkey = subkey; + } else { + lastSubkey = null; + } + } + if (subkeyData != null) { + final segmentKey = _getSegmentKey(subkeyData, segment); + if (segmentKey != null) { + await DHTRecordPool.instance.deleteRecord(segmentKey); + _setSegmentKey(subkeyData, segment, null); + } + } + } + // Flush subkey writes + if (lastSubkey != null) { + await _spineRecord.eventualWriteBytes(subkeyData!, subkey: lastSubkey); + } + } + + Future _purgeUnusedSegments() async { + assert(_spineMutex.isLocked, 'should be in mutex here'); + if (_head < _tail) { + await _deleteSegmentsContiguous(0, _head); + await _deleteSegmentsContiguous(_tail, _positionLimit); + } else if (_head > _tail) { + await _deleteSegmentsContiguous(_tail, _head); + } } ///////////////////////////////////////////////////////////////////////////// @@ -532,7 +595,7 @@ class _DHTLogSpine { // Position of the start of the log (oldest items) int _head; - // Position of the end of the log (newest items) + // Position of the end of the log (newest items) (exclusive) int _tail; // LRU cache of DHT spine elements accessed recently diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index a64f461..a4748df 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -91,7 +91,6 @@ class SharedDHTRecordData { Map subkeySeqCache = {}; bool needsWatchStateUpdate = false; WatchState? unionWatchState; - bool deleteOnClose = false; } // Per opened record data @@ -128,6 +127,7 @@ class DHTRecordPool with TableDBBackedJson { : _state = const DHTRecordPoolAllocations(), _mutex = Mutex(), _opened = {}, + _markedForDelete = {}, _routingContext = routingContext, _veilid = veilid; @@ -140,6 +140,8 @@ class DHTRecordPool with TableDBBackedJson { final Mutex _mutex; // Which DHT records are currently open final Map _opened; + // Which DHT records are marked for deletion + final Set _markedForDelete; // Default routing context to use for new keys final VeilidRoutingContext _routingContext; // Convenience accessor @@ -288,6 +290,8 @@ class DHTRecordPool with TableDBBackedJson { return openedRecordInfo; } + // Called when a DHTRecord is closed + // Cleans up the opened record housekeeping and processes any late deletions Future _recordClosed(DHTRecord record) async { await _mutex.protect(() async { final key = record.key; @@ -301,14 +305,37 @@ class DHTRecordPool with TableDBBackedJson { } if (openedRecordInfo.records.isEmpty) { await _routingContext.closeDHTRecord(key); - if (openedRecordInfo.shared.deleteOnClose) { - await _deleteRecordInner(key); - } _opened.remove(key); + + await _checkForLateDeletesInner(key); } }); } + // Check to see if this key can finally be deleted + // If any parents are marked for deletion, try them first + Future _checkForLateDeletesInner(TypedKey key) async { + // Get parent list in bottom up order including our own key + final parents = []; + TypedKey? nextParent = key; + while (nextParent != null) { + parents.add(nextParent); + nextParent = getParentRecordKey(nextParent); + } + + // If any parent is ready to delete all its children do it + for (final parent in parents) { + if (_markedForDelete.contains(parent)) { + final deleted = await _deleteRecordInner(parent); + if (!deleted) { + // If we couldn't delete a child then no 'marked for delete' parents + // above us will be ready to delete either + break; + } + } + } + } + // Collect all dependencies (including the record itself) // in reverse (bottom-up/delete order) List _collectChildrenInner(TypedKey recordKey) { @@ -328,7 +355,13 @@ class DHTRecordPool with TableDBBackedJson { return allDeps.reversedView; } - String _debugChildren(TypedKey recordKey, {List? allDeps}) { + /// Collect all dependencies (including the record itself) + /// in reverse (bottom-up/delete order) + Future> collectChildren(TypedKey recordKey) => + _mutex.protect(() async => _collectChildrenInner(recordKey)); + + /// Print children + String debugChildren(TypedKey recordKey, {List? allDeps}) { allDeps ??= _collectChildrenInner(recordKey); // ignore: avoid_print var out = @@ -342,32 +375,48 @@ class DHTRecordPool with TableDBBackedJson { return out; } - Future _deleteRecordInner(TypedKey recordKey) async { - log('deleteDHTRecord: key=$recordKey'); + // Actual delete function + Future _finalizeDeleteRecordInner(TypedKey recordKey) async { + log('_finalizeDeleteRecordInner: key=$recordKey'); // Remove this child from parents await _removeDependenciesInner([recordKey]); await _routingContext.deleteDHTRecord(recordKey); + _markedForDelete.remove(recordKey); } - Future deleteRecord(TypedKey recordKey) async { - await _mutex.protect(() async { - final allDeps = _collectChildrenInner(recordKey); - - if (allDeps.singleOrNull != recordKey) { - final dbgstr = _debugChildren(recordKey, allDeps: allDeps); - throw StateError('must delete children first: $dbgstr'); + // Deep delete mechanism inside mutex + Future _deleteRecordInner(TypedKey recordKey) async { + final toDelete = _readyForDeleteInner(recordKey); + if (toDelete.isNotEmpty) { + // delete now + for (final deleteKey in toDelete) { + await _finalizeDeleteRecordInner(deleteKey); } + return true; + } + // mark for deletion + _markedForDelete.add(recordKey); + return false; + } - final ori = _opened[recordKey]; - if (ori != null) { - // delete after close - ori.shared.deleteOnClose = true; - } else { - // delete now - await _deleteRecordInner(recordKey); + /// Delete a record and its children if they are all closed + /// otherwise mark that record for deletion eventually + /// Returns true if the deletion was processed immediately + /// Returns false if the deletion was marked for later + Future deleteRecord(TypedKey recordKey) async => + _mutex.protect(() async => _deleteRecordInner(recordKey)); + + // If everything underneath is closed including itself, return the + // list of children (and itself) to finally actually delete + List _readyForDeleteInner(TypedKey recordKey) { + final allDeps = _collectChildrenInner(recordKey); + for (final dep in allDeps) { + if (_opened.containsKey(dep)) { + return []; } - }); + } + return allDeps; } void _validateParentInner(TypedKey? parent, TypedKey child) { @@ -456,6 +505,19 @@ class DHTRecordPool with TableDBBackedJson { } } + bool _isValidRecordKeyInner(TypedKey key) { + if (_state.rootRecords.contains(key)) { + return true; + } + if (_state.childrenByParent.containsKey(key.toJson())) { + return true; + } + return false; + } + + Future isValidRecordKey(TypedKey key) => + _mutex.protect(() async => _isValidRecordKeyInner(key)); + /////////////////////////////////////////////////////////////////////// /// Create a root DHTRecord that has no dependent records diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 0a2b7d2..e2bf392 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -67,12 +67,8 @@ class _DHTShortArrayHead { Future delete() async { await _headMutex.protect(() async { - final pool = DHTRecordPool.instance; - final futures = >[pool.deleteRecord(_headRecord.key)]; - for (final lr in _linkedRecords) { - futures.add(pool.deleteRecord(lr.key)); - } - await Future.wait(futures); + // Will deep delete all linked records as they are children + await _headRecord.delete(); }); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index 0d51663..dbd8984 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -8,19 +8,12 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead _DHTShortArrayWrite._(super.head) : super._(); @override - Future tryAddItem(Uint8List value) async { - // Allocate empty index at the end of the list - final pos = _head.length; - _head.allocateIndex(pos); + Future tryAddItem(Uint8List value) => + tryInsertItem(_head.length, value); - // Write item - final ok = await tryWriteItem(pos, value); - if (!ok) { - _head.freeIndex(pos); - } - - return ok; - } + @override + Future tryAddItems(List values) => + tryInsertItems(_head.length, values); @override Future tryInsertItem(int pos, Uint8List value) async { @@ -35,6 +28,29 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead return true; } + @override + Future tryInsertItems(int pos, List values) async { + // Allocate empty indices at the end of the list + for (var i = 0; i < values.length; i++) { + _head.allocateIndex(pos + i); + } + + // Write items + var success = true; + final dws = DelayedWaitSet(); + for (var i = 0; i < values.length; i++) { + dws.add(() async { + final ok = await tryWriteItem(pos + i, values[i]); + if (!ok) { + _head.freeIndex(pos + i); + success = false; + } + }); + } + await dws(chunkSize: maxDHTConcurrency, onChunkDone: (_) => success); + return success; + } + @override Future swapItem(int aPos, int bPos) async { if (aPos < 0 || aPos >= _head.length) { diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart index babcc7d..d98037c 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_append_truncate.dart @@ -14,6 +14,13 @@ abstract class DHTAppendTruncate { /// This may throw an exception if the number elements added exceeds limits. Future tryAppendItem(Uint8List value); + /// Try to add a list of items to the end of the DHT data structure. + /// Return true if the elements were successfully added, and false if the + /// state changed before the element could be added or a newer value was found + /// on the network. + /// This may throw an exception if the number elements added exceeds limits. + Future tryAppendItems(List values); + /// Try to remove a number of items from the head of the DHT data structure. /// Throws StateError if count < 0 Future truncate(int count); diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart index 1ee1140..e28f703 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart @@ -29,12 +29,12 @@ extension DHTOpenableExt on D { } try { - final out = await scopeFunction(this); - await close(); - return out; - } on Exception catch (_) { + return await scopeFunction(this); + } on Exception { await delete(); rethrow; + } finally { + await close(); } } diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart index 53f307c..17a450e 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_random_write.dart @@ -30,6 +30,13 @@ abstract class DHTRandomWrite { /// built-in limit of 'maxElements = 256' entries. Future tryAddItem(Uint8List value); + /// Try to add a list of items to the end of the DHTArray. Return true if the + /// elements were successfully added, and false if the state changed before + /// the elements could be added or a newer value was found on the network. + /// This may throw an exception if the number elements added exceeds the + /// built-in limit of 'maxElements = 256' entries. + Future tryAddItems(List values); + /// Try to insert an item as position 'pos' of the DHTArray. /// Return true if the element was successfully inserted, and false if the /// state changed before the element could be inserted or a newer value was @@ -38,6 +45,14 @@ abstract class DHTRandomWrite { /// built-in limit of 'maxElements = 256' entries. Future tryInsertItem(int pos, Uint8List value); + /// Try to insert items at position 'pos' of the DHTArray. + /// Return true if the elements were successfully inserted, and false if the + /// state changed before the elements could be inserted or a newer value was + /// found on the network. + /// This may throw an exception if the number elements added exceeds the + /// built-in limit of 'maxElements = 256' entries. + Future tryInsertItems(int pos, List values); + /// Swap items at position 'aPos' and 'bPos' in the DHTArray. /// Throws IndexError if either of the positions swapped exceed /// the length of the list