diff --git a/packages/veilid_support/example/integration_test/app_test.dart b/packages/veilid_support/example/integration_test/app_test.dart index 6912fd3..a0f3b7f 100644 --- a/packages/veilid_support/example/integration_test/app_test.dart +++ b/packages/veilid_support/example/integration_test/app_test.dart @@ -36,116 +36,116 @@ void main() { setUpAll(veilidFixture.attach); tearDownAll(veilidFixture.detach); - group('TableDB Tests', () { - group('TableDBArray Tests', () { - // test('create/delete TableDBArray', testTableDBArrayCreateDelete); + // group('TableDB Tests', () { + // group('TableDBArray Tests', () { + // // test('create/delete TableDBArray', testTableDBArrayCreateDelete); - group('TableDBArray Add/Get Tests', () { - for (final params in [ - // - (99, 3, 15), - (100, 4, 16), - (101, 5, 17), - // - (511, 3, 127), - (512, 4, 128), - (513, 5, 129), - // - (4095, 3, 1023), - (4096, 4, 1024), - (4097, 5, 1025), - // - (65535, 3, 16383), - (65536, 4, 16384), - (65537, 5, 16385), - ]) { - final count = params.$1; - final singles = params.$2; - final batchSize = params.$3; + // group('TableDBArray Add/Get Tests', () { + // for (final params in [ + // // + // (99, 3, 15), + // (100, 4, 16), + // (101, 5, 17), + // // + // (511, 3, 127), + // (512, 4, 128), + // (513, 5, 129), + // // + // (4095, 3, 1023), + // (4096, 4, 1024), + // (4097, 5, 1025), + // // + // (65535, 3, 16383), + // (65536, 4, 16384), + // (65537, 5, 16385), + // ]) { + // final count = params.$1; + // final singles = params.$2; + // final batchSize = params.$3; - test( - timeout: const Timeout(Duration(seconds: 480)), - 'add/remove TableDBArray count = $count batchSize=$batchSize', - makeTestTableDBArrayAddGetClear( - count: count, - singles: singles, - batchSize: batchSize, - crypto: const VeilidCryptoPublic()), - ); - } - }); + // test( + // timeout: const Timeout(Duration(seconds: 480)), + // 'add/remove TableDBArray count = $count batchSize=$batchSize', + // makeTestTableDBArrayAddGetClear( + // count: count, + // singles: singles, + // batchSize: batchSize, + // crypto: const VeilidCryptoPublic()), + // ); + // } + // }); - group('TableDBArray Insert Tests', () { - for (final params in [ - // - (99, 3, 15), - (100, 4, 16), - (101, 5, 17), - // - (511, 3, 127), - (512, 4, 128), - (513, 5, 129), - // - (4095, 3, 1023), - (4096, 4, 1024), - (4097, 5, 1025), - // - (65535, 3, 16383), - (65536, 4, 16384), - (65537, 5, 16385), - ]) { - final count = params.$1; - final singles = params.$2; - final batchSize = params.$3; + // group('TableDBArray Insert Tests', () { + // for (final params in [ + // // + // (99, 3, 15), + // (100, 4, 16), + // (101, 5, 17), + // // + // (511, 3, 127), + // (512, 4, 128), + // (513, 5, 129), + // // + // (4095, 3, 1023), + // (4096, 4, 1024), + // (4097, 5, 1025), + // // + // (65535, 3, 16383), + // (65536, 4, 16384), + // (65537, 5, 16385), + // ]) { + // final count = params.$1; + // final singles = params.$2; + // final batchSize = params.$3; - test( - timeout: const Timeout(Duration(seconds: 480)), - 'insert TableDBArray count=$count singles=$singles batchSize=$batchSize', - makeTestTableDBArrayInsert( - count: count, - singles: singles, - batchSize: batchSize, - crypto: const VeilidCryptoPublic()), - ); - } - }); + // test( + // timeout: const Timeout(Duration(seconds: 480)), + // 'insert TableDBArray count=$count singles=$singles batchSize=$batchSize', + // makeTestTableDBArrayInsert( + // count: count, + // singles: singles, + // batchSize: batchSize, + // crypto: const VeilidCryptoPublic()), + // ); + // } + // }); - group('TableDBArray Remove Tests', () { - for (final params in [ - // - (99, 3, 15), - (100, 4, 16), - (101, 5, 17), - // - (511, 3, 127), - (512, 4, 128), - (513, 5, 129), - // - (4095, 3, 1023), - (4096, 4, 1024), - (4097, 5, 1025), - // - (16383, 3, 4095), - (16384, 4, 4096), - (16385, 5, 4097), - ]) { - final count = params.$1; - final singles = params.$2; - final batchSize = params.$3; + // group('TableDBArray Remove Tests', () { + // for (final params in [ + // // + // (99, 3, 15), + // (100, 4, 16), + // (101, 5, 17), + // // + // (511, 3, 127), + // (512, 4, 128), + // (513, 5, 129), + // // + // (4095, 3, 1023), + // (4096, 4, 1024), + // (4097, 5, 1025), + // // + // (16383, 3, 4095), + // (16384, 4, 4096), + // (16385, 5, 4097), + // ]) { + // final count = params.$1; + // final singles = params.$2; + // final batchSize = params.$3; - test( - timeout: const Timeout(Duration(seconds: 480)), - 'remove TableDBArray count=$count singles=$singles batchSize=$batchSize', - makeTestTableDBArrayRemove( - count: count, - singles: singles, - batchSize: batchSize, - crypto: const VeilidCryptoPublic()), - ); - } - }); - }); - }); + // test( + // timeout: const Timeout(Duration(seconds: 480)), + // 'remove TableDBArray count=$count singles=$singles batchSize=$batchSize', + // makeTestTableDBArrayRemove( + // count: count, + // singles: singles, + // batchSize: batchSize, + // crypto: const VeilidCryptoPublic()), + // ); + // } + // }); + // }); + // }); group('DHT Support Tests', () { setUpAll(updateProcessorFixture.setUp); diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index 985b11f..d226b68 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -9,7 +9,6 @@ import 'package:meta/meta.dart'; import '../../../veilid_support.dart'; import '../../proto/proto.dart' as proto; -import '../interfaces/dht_add.dart'; part 'dht_log_spine.dart'; part 'dht_log_read.dart'; @@ -42,7 +41,7 @@ class DHTLogUpdate extends Equatable { /// * The head and tail position of the log /// - subkeyIdx = pos / recordsPerSubkey /// - recordIdx = pos % recordsPerSubkey -class DHTLog implements DHTDeleteable { +class DHTLog implements DHTDeleteable { //////////////////////////////////////////////////////////////// // Constructors @@ -172,24 +171,24 @@ class DHTLog implements DHTDeleteable { /// Add a reference to this log @override - Future ref() async => _mutex.protect(() async { + Future ref() async => _mutex.protect(() async { _openCount++; - return this; }); /// Free all resources for the DHTLog @override - Future close() async => _mutex.protect(() async { + Future close() async => _mutex.protect(() async { if (_openCount == 0) { throw StateError('already closed'); } _openCount--; if (_openCount != 0) { - return; + return false; } await _watchController?.close(); _watchController = null; await _spine.close(); + return true; }); /// Free all resources for the DHTLog and delete it from the DHT diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index 70155e8..0950c76 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -1,6 +1,6 @@ part of 'dht_log.dart'; -class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { +class _DHTLogPosition extends DHTCloseable { _DHTLogPosition._({ required _DHTLogSpine dhtLogSpine, required this.shortArray, @@ -12,13 +12,11 @@ class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { final _DHTLogSpine _dhtLogSpine; final DHTShortArray shortArray; - var _openCount = 1; final int _segmentNumber; - final Mutex _mutex = Mutex(); /// Check if the DHTLogPosition is open @override - bool get isOpen => _openCount > 0; + bool get isOpen => shortArray.isOpen; /// The type of the openable scope @override @@ -26,32 +24,13 @@ class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { /// Add a reference to this log @override - Future<_DHTLogPosition> ref() async => _mutex.protect(() async { - _openCount++; - return this; - }); + Future ref() async { + await shortArray.ref(); + } /// Free all resources for the DHTLogPosition @override - Future close() async => _mutex.protect(() async { - if (_openCount == 0) { - throw StateError('already closed'); - } - _openCount--; - if (_openCount != 0) { - return; - } - await _dhtLogSpine._segmentClosed(_segmentNumber); - }); -} - -class _OpenedSegment { - _OpenedSegment._({ - required this.shortArray, - }); - - final DHTShortArray shortArray; - int openCount = 1; + Future close() async => _dhtLogSpine._segmentClosed(_segmentNumber); } class _DHTLogSegmentLookup extends Equatable { @@ -81,7 +60,7 @@ class _DHTLogSpine { _tail = tail, _segmentStride = stride, _openedSegments = {}, - _spineCache = []; + _openCache = []; // Create a new spine record and push it to the network static Future<_DHTLogSpine> create( @@ -130,8 +109,8 @@ class _DHTLogSpine { return; } final futures = >[_spineRecord.close()]; - for (final (_, sc) in _spineCache) { - futures.add(sc.close()); + for (final seg in _openCache.toList()) { + futures.add(_segmentClosed(seg)); } await Future.wait(futures); @@ -308,7 +287,7 @@ class _DHTLogSpine { segmentKeyBytes); } - Future _openOrCreateSegmentInner(int segmentNumber) async { + Future _openOrCreateSegment(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be in mutex here'); assert(_spineRecord.writer != null, 'should be writable'); @@ -366,7 +345,7 @@ class _DHTLogSpine { } } - Future _openSegmentInner(int segmentNumber) async { + Future _openSegment(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be in mutex here'); // Lookup what subkey and segment subrange has this position's segment @@ -395,59 +374,6 @@ class _DHTLogSpine { return segmentRec; } - Future _openOrCreateSegment(int segmentNumber) async { - assert(_spineMutex.isLocked, 'should be in mutex here'); - - // See if we already have this in the cache - for (var i = 0; i < _spineCache.length; i++) { - if (_spineCache[i].$1 == segmentNumber) { - // Touch the element - final x = _spineCache.removeAt(i); - _spineCache.add(x); - // Return the shortarray for this position - return x.$2.ref(); - } - } - - // If we don't have it in the cache, get/create it and then cache a ref - final segment = await _openOrCreateSegmentInner(segmentNumber); - _spineCache.add((segmentNumber, await segment.ref())); - if (_spineCache.length > _spineCacheLength) { - // Trim the LRU cache - final (_, sa) = _spineCache.removeAt(0); - await sa.close(); - } - return segment; - } - - Future _openSegment(int segmentNumber) async { - assert(_spineMutex.isLocked, 'should be in mutex here'); - - // See if we already have this in the cache - for (var i = 0; i < _spineCache.length; i++) { - if (_spineCache[i].$1 == segmentNumber) { - // Touch the element - final x = _spineCache.removeAt(i); - _spineCache.add(x); - // Return the shortarray for this position - return x.$2.ref(); - } - } - - // If we don't have it in the cache, get it and then cache it - final segment = await _openSegmentInner(segmentNumber); - if (segment == null) { - return null; - } - _spineCache.add((segmentNumber, await segment.ref())); - if (_spineCache.length > _spineCacheLength) { - // Trim the LRU cache - final (_, sa) = _spineCache.removeAt(0); - await sa.close(); - } - return segment; - } - _DHTLogSegmentLookup _lookupSegment(int segmentNumber) { assert(_spineMutex.isLocked, 'should be in mutex here'); @@ -471,13 +397,15 @@ class _DHTLogSpine { int segmentNumber, int segmentPos, {bool onlyOpened = false}) async => _spineCacheMutex.protect(() async { - // Get the segment shortArray + // See if we have this segment opened already final openedSegment = _openedSegments[segmentNumber]; - late final DHTShortArray shortArray; + late DHTShortArray shortArray; if (openedSegment != null) { - openedSegment.openCount++; - shortArray = openedSegment.shortArray; + // If so, return a ref + await openedSegment.ref(); + shortArray = openedSegment; } else { + // Otherwise open a segment if (onlyOpened) { return null; } @@ -488,13 +416,26 @@ class _DHTLogSpine { if (newShortArray == null) { return null; } - - _openedSegments[segmentNumber] = - _OpenedSegment._(shortArray: newShortArray); - + // Keep in the opened segments table + _openedSegments[segmentNumber] = newShortArray; shortArray = newShortArray; } + // LRU cache the segment number + if (!_openCache.remove(segmentNumber)) { + // If this is new to the cache ref it when it goes in + await shortArray.ref(); + } + _openCache.add(segmentNumber); + if (_openCache.length > _openCacheSize) { + // Trim the LRU cache + final lruseg = _openCache.removeAt(0); + final lrusa = _openedSegments[lruseg]!; + if (await lrusa.close()) { + _openedSegments.remove(lruseg); + } + } + return _DHTLogPosition._( dhtLogSpine: this, shortArray: shortArray, @@ -521,15 +462,15 @@ class _DHTLogSpine { return lookupPositionBySegmentNumber(segmentNumber, segmentPos); } - Future _segmentClosed(int segmentNumber) async { + Future _segmentClosed(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be locked'); - await _spineCacheMutex.protect(() async { - final os = _openedSegments[segmentNumber]!; - os.openCount--; - if (os.openCount == 0) { + return _spineCacheMutex.protect(() async { + final sa = _openedSegments[segmentNumber]!; + if (await sa.close()) { _openedSegments.remove(segmentNumber); - await os.shortArray.close(); + return true; } + return false; }); } @@ -693,21 +634,26 @@ class _DHTLogSpine { // and force their short arrays to refresh their heads if // they are opened final segmentsToRefresh = <_DHTLogPosition>[]; - for (var curTail = oldTail; - curTail != _tail; - curTail = (curTail + - (DHTShortArray.maxElements - - (curTail % DHTShortArray.maxElements))) % - _positionLimit) { + var curTail = oldTail; + final endSegmentNumber = _tail ~/ DHTShortArray.maxElements; + while (true) { final segmentNumber = curTail ~/ DHTShortArray.maxElements; final segmentPos = curTail % DHTShortArray.maxElements; final dhtLogPosition = await lookupPositionBySegmentNumber( segmentNumber, segmentPos, onlyOpened: true); - if (dhtLogPosition == null) { - continue; + if (dhtLogPosition != null) { + segmentsToRefresh.add(dhtLogPosition); } - segmentsToRefresh.add(dhtLogPosition); + + if (segmentNumber == endSegmentNumber) { + break; + } + + curTail = (curTail + + (DHTShortArray.maxElements - + (curTail % DHTShortArray.maxElements))) % + _positionLimit; } // Refresh the segments that have probably changed @@ -759,7 +705,7 @@ class _DHTLogSpine { // LRU cache of DHT spine elements accessed recently // Pair of position and associated shortarray segment final Mutex _spineCacheMutex = Mutex(); - final List<(int, DHTShortArray)> _spineCache; - final Map _openedSegments; - static const int _spineCacheLength = 3; + final List _openCache; + final Map _openedSegments; + static const int _openCacheSize = 3; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index 521bf1f..4bd0ee6 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -36,7 +36,7 @@ enum DHTRecordRefreshMode { ///////////////////////////////////////////////// -class DHTRecord implements DHTDeleteable { +class DHTRecord implements DHTDeleteable { DHTRecord._( {required VeilidRoutingContext routingContext, required SharedDHTRecordData sharedDHTRecordData, @@ -64,25 +64,25 @@ class DHTRecord implements DHTDeleteable { /// Add a reference to this DHTRecord @override - Future ref() async => _mutex.protect(() async { + Future ref() async => _mutex.protect(() async { _openCount++; - return this; }); /// Free all resources for the DHTRecord @override - Future close() async => _mutex.protect(() async { + Future close() async => _mutex.protect(() async { if (_openCount == 0) { throw StateError('already closed'); } _openCount--; if (_openCount != 0) { - return; + return false; } await _watchController?.close(); _watchController = null; await DHTRecordPool.instance._recordClosed(this); + return true; }); /// Free all resources for the DHTRecord and delete it from the DHT diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index a84f02d..10ddf01 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -13,7 +13,7 @@ part 'dht_short_array_write.dart'; /////////////////////////////////////////////////////////////////////// -class DHTShortArray implements DHTDeleteable { +class DHTShortArray implements DHTDeleteable { //////////////////////////////////////////////////////////////// // Constructors @@ -148,25 +148,25 @@ class DHTShortArray implements DHTDeleteable { /// Add a reference to this shortarray @override - Future ref() async => _mutex.protect(() async { + Future ref() async => _mutex.protect(() async { _openCount++; - return this; }); /// Free all resources for the DHTShortArray @override - Future close() async => _mutex.protect(() async { + Future close() async => _mutex.protect(() async { if (_openCount == 0) { throw StateError('already closed'); } _openCount--; if (_openCount != 0) { - return; + return false; } await _watchController?.close(); _watchController = null; await _head.close(); + return true; }); /// Free all resources for the DHTShortArray and delete it from the DHT diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_closeable.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_closeable.dart index 65e9db1..c913340 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_closeable.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_closeable.dart @@ -2,19 +2,23 @@ import 'dart:async'; import 'package:meta/meta.dart'; -abstract class DHTCloseable { +abstract class DHTCloseable { + // Public interface + Future ref(); + Future close(); + + // Internal implementation + @protected bool get isOpen; @protected FutureOr scoped(); - Future ref(); - Future close(); } -abstract class DHTDeleteable extends DHTCloseable { +abstract class DHTDeleteable extends DHTCloseable { Future delete(); } -extension DHTCloseableExt on DHTCloseable { +extension DHTCloseableExt on DHTCloseable { /// Runs a closure that guarantees the DHTCloseable /// will be closed upon exit, even if an uncaught exception is thrown Future scope(Future Function(D) scopeFunction) async { @@ -29,7 +33,7 @@ extension DHTCloseableExt on DHTCloseable { } } -extension DHTDeletableExt on DHTDeleteable { +extension DHTDeletableExt on DHTDeleteable { /// Runs a closure that guarantees the DHTCloseable /// will be closed upon exit, and deleted if an an /// uncaught exception is thrown