simplify reference counting

This commit is contained in:
Christien Rioux 2024-06-08 12:59:56 -04:00
parent 68aad4a94e
commit 0b9835b23d
6 changed files with 186 additions and 237 deletions

View file

@ -36,116 +36,116 @@ void main() {
setUpAll(veilidFixture.attach); setUpAll(veilidFixture.attach);
tearDownAll(veilidFixture.detach); tearDownAll(veilidFixture.detach);
group('TableDB Tests', () { // group('TableDB Tests', () {
group('TableDBArray Tests', () { // group('TableDBArray Tests', () {
// test('create/delete TableDBArray', testTableDBArrayCreateDelete); // // test('create/delete TableDBArray', testTableDBArrayCreateDelete);
group('TableDBArray Add/Get Tests', () { // group('TableDBArray Add/Get Tests', () {
for (final params in [ // for (final params in [
// // //
(99, 3, 15), // (99, 3, 15),
(100, 4, 16), // (100, 4, 16),
(101, 5, 17), // (101, 5, 17),
// // //
(511, 3, 127), // (511, 3, 127),
(512, 4, 128), // (512, 4, 128),
(513, 5, 129), // (513, 5, 129),
// // //
(4095, 3, 1023), // (4095, 3, 1023),
(4096, 4, 1024), // (4096, 4, 1024),
(4097, 5, 1025), // (4097, 5, 1025),
// // //
(65535, 3, 16383), // (65535, 3, 16383),
(65536, 4, 16384), // (65536, 4, 16384),
(65537, 5, 16385), // (65537, 5, 16385),
]) { // ]) {
final count = params.$1; // final count = params.$1;
final singles = params.$2; // final singles = params.$2;
final batchSize = params.$3; // final batchSize = params.$3;
test( // test(
timeout: const Timeout(Duration(seconds: 480)), // timeout: const Timeout(Duration(seconds: 480)),
'add/remove TableDBArray count = $count batchSize=$batchSize', // 'add/remove TableDBArray count = $count batchSize=$batchSize',
makeTestTableDBArrayAddGetClear( // makeTestTableDBArrayAddGetClear(
count: count, // count: count,
singles: singles, // singles: singles,
batchSize: batchSize, // batchSize: batchSize,
crypto: const VeilidCryptoPublic()), // crypto: const VeilidCryptoPublic()),
); // );
} // }
}); // });
group('TableDBArray Insert Tests', () { // group('TableDBArray Insert Tests', () {
for (final params in [ // for (final params in [
// // //
(99, 3, 15), // (99, 3, 15),
(100, 4, 16), // (100, 4, 16),
(101, 5, 17), // (101, 5, 17),
// // //
(511, 3, 127), // (511, 3, 127),
(512, 4, 128), // (512, 4, 128),
(513, 5, 129), // (513, 5, 129),
// // //
(4095, 3, 1023), // (4095, 3, 1023),
(4096, 4, 1024), // (4096, 4, 1024),
(4097, 5, 1025), // (4097, 5, 1025),
// // //
(65535, 3, 16383), // (65535, 3, 16383),
(65536, 4, 16384), // (65536, 4, 16384),
(65537, 5, 16385), // (65537, 5, 16385),
]) { // ]) {
final count = params.$1; // final count = params.$1;
final singles = params.$2; // final singles = params.$2;
final batchSize = params.$3; // final batchSize = params.$3;
test( // test(
timeout: const Timeout(Duration(seconds: 480)), // timeout: const Timeout(Duration(seconds: 480)),
'insert TableDBArray count=$count singles=$singles batchSize=$batchSize', // 'insert TableDBArray count=$count singles=$singles batchSize=$batchSize',
makeTestTableDBArrayInsert( // makeTestTableDBArrayInsert(
count: count, // count: count,
singles: singles, // singles: singles,
batchSize: batchSize, // batchSize: batchSize,
crypto: const VeilidCryptoPublic()), // crypto: const VeilidCryptoPublic()),
); // );
} // }
}); // });
group('TableDBArray Remove Tests', () { // group('TableDBArray Remove Tests', () {
for (final params in [ // for (final params in [
// // //
(99, 3, 15), // (99, 3, 15),
(100, 4, 16), // (100, 4, 16),
(101, 5, 17), // (101, 5, 17),
// // //
(511, 3, 127), // (511, 3, 127),
(512, 4, 128), // (512, 4, 128),
(513, 5, 129), // (513, 5, 129),
// // //
(4095, 3, 1023), // (4095, 3, 1023),
(4096, 4, 1024), // (4096, 4, 1024),
(4097, 5, 1025), // (4097, 5, 1025),
// // //
(16383, 3, 4095), // (16383, 3, 4095),
(16384, 4, 4096), // (16384, 4, 4096),
(16385, 5, 4097), // (16385, 5, 4097),
]) { // ]) {
final count = params.$1; // final count = params.$1;
final singles = params.$2; // final singles = params.$2;
final batchSize = params.$3; // final batchSize = params.$3;
test( // test(
timeout: const Timeout(Duration(seconds: 480)), // timeout: const Timeout(Duration(seconds: 480)),
'remove TableDBArray count=$count singles=$singles batchSize=$batchSize', // 'remove TableDBArray count=$count singles=$singles batchSize=$batchSize',
makeTestTableDBArrayRemove( // makeTestTableDBArrayRemove(
count: count, // count: count,
singles: singles, // singles: singles,
batchSize: batchSize, // batchSize: batchSize,
crypto: const VeilidCryptoPublic()), // crypto: const VeilidCryptoPublic()),
); // );
} // }
}); // });
}); // });
}); // });
group('DHT Support Tests', () { group('DHT Support Tests', () {
setUpAll(updateProcessorFixture.setUp); setUpAll(updateProcessorFixture.setUp);

View file

@ -9,7 +9,6 @@ import 'package:meta/meta.dart';
import '../../../veilid_support.dart'; import '../../../veilid_support.dart';
import '../../proto/proto.dart' as proto; import '../../proto/proto.dart' as proto;
import '../interfaces/dht_add.dart';
part 'dht_log_spine.dart'; part 'dht_log_spine.dart';
part 'dht_log_read.dart'; part 'dht_log_read.dart';
@ -42,7 +41,7 @@ class DHTLogUpdate extends Equatable {
/// * The head and tail position of the log /// * The head and tail position of the log
/// - subkeyIdx = pos / recordsPerSubkey /// - subkeyIdx = pos / recordsPerSubkey
/// - recordIdx = pos % recordsPerSubkey /// - recordIdx = pos % recordsPerSubkey
class DHTLog implements DHTDeleteable<DHTLog, DHTLog> { class DHTLog implements DHTDeleteable<DHTLog> {
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Constructors // Constructors
@ -172,24 +171,24 @@ class DHTLog implements DHTDeleteable<DHTLog, DHTLog> {
/// Add a reference to this log /// Add a reference to this log
@override @override
Future<DHTLog> ref() async => _mutex.protect(() async { Future<void> ref() async => _mutex.protect(() async {
_openCount++; _openCount++;
return this;
}); });
/// Free all resources for the DHTLog /// Free all resources for the DHTLog
@override @override
Future<void> close() async => _mutex.protect(() async { Future<bool> close() async => _mutex.protect(() async {
if (_openCount == 0) { if (_openCount == 0) {
throw StateError('already closed'); throw StateError('already closed');
} }
_openCount--; _openCount--;
if (_openCount != 0) { if (_openCount != 0) {
return; return false;
} }
await _watchController?.close(); await _watchController?.close();
_watchController = null; _watchController = null;
await _spine.close(); await _spine.close();
return true;
}); });
/// Free all resources for the DHTLog and delete it from the DHT /// Free all resources for the DHTLog and delete it from the DHT

View file

@ -1,6 +1,6 @@
part of 'dht_log.dart'; part of 'dht_log.dart';
class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> { class _DHTLogPosition extends DHTCloseable<DHTShortArray> {
_DHTLogPosition._({ _DHTLogPosition._({
required _DHTLogSpine dhtLogSpine, required _DHTLogSpine dhtLogSpine,
required this.shortArray, required this.shortArray,
@ -12,13 +12,11 @@ class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> {
final _DHTLogSpine _dhtLogSpine; final _DHTLogSpine _dhtLogSpine;
final DHTShortArray shortArray; final DHTShortArray shortArray;
var _openCount = 1;
final int _segmentNumber; final int _segmentNumber;
final Mutex _mutex = Mutex();
/// Check if the DHTLogPosition is open /// Check if the DHTLogPosition is open
@override @override
bool get isOpen => _openCount > 0; bool get isOpen => shortArray.isOpen;
/// The type of the openable scope /// The type of the openable scope
@override @override
@ -26,32 +24,13 @@ class _DHTLogPosition extends DHTCloseable<_DHTLogPosition, DHTShortArray> {
/// Add a reference to this log /// Add a reference to this log
@override @override
Future<_DHTLogPosition> ref() async => _mutex.protect(() async { Future<void> ref() async {
_openCount++; await shortArray.ref();
return this; }
});
/// Free all resources for the DHTLogPosition /// Free all resources for the DHTLogPosition
@override @override
Future<void> close() async => _mutex.protect(() async { Future<bool> close() async => _dhtLogSpine._segmentClosed(_segmentNumber);
if (_openCount == 0) {
throw StateError('already closed');
}
_openCount--;
if (_openCount != 0) {
return;
}
await _dhtLogSpine._segmentClosed(_segmentNumber);
});
}
class _OpenedSegment {
_OpenedSegment._({
required this.shortArray,
});
final DHTShortArray shortArray;
int openCount = 1;
} }
class _DHTLogSegmentLookup extends Equatable { class _DHTLogSegmentLookup extends Equatable {
@ -81,7 +60,7 @@ class _DHTLogSpine {
_tail = tail, _tail = tail,
_segmentStride = stride, _segmentStride = stride,
_openedSegments = {}, _openedSegments = {},
_spineCache = []; _openCache = [];
// Create a new spine record and push it to the network // Create a new spine record and push it to the network
static Future<_DHTLogSpine> create( static Future<_DHTLogSpine> create(
@ -130,8 +109,8 @@ class _DHTLogSpine {
return; return;
} }
final futures = <Future<void>>[_spineRecord.close()]; final futures = <Future<void>>[_spineRecord.close()];
for (final (_, sc) in _spineCache) { for (final seg in _openCache.toList()) {
futures.add(sc.close()); futures.add(_segmentClosed(seg));
} }
await Future.wait(futures); await Future.wait(futures);
@ -308,7 +287,7 @@ class _DHTLogSpine {
segmentKeyBytes); segmentKeyBytes);
} }
Future<DHTShortArray> _openOrCreateSegmentInner(int segmentNumber) async { Future<DHTShortArray> _openOrCreateSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here'); assert(_spineMutex.isLocked, 'should be in mutex here');
assert(_spineRecord.writer != null, 'should be writable'); assert(_spineRecord.writer != null, 'should be writable');
@ -366,7 +345,7 @@ class _DHTLogSpine {
} }
} }
Future<DHTShortArray?> _openSegmentInner(int segmentNumber) async { Future<DHTShortArray?> _openSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here'); assert(_spineMutex.isLocked, 'should be in mutex here');
// Lookup what subkey and segment subrange has this position's segment // Lookup what subkey and segment subrange has this position's segment
@ -395,59 +374,6 @@ class _DHTLogSpine {
return segmentRec; return segmentRec;
} }
Future<DHTShortArray> _openOrCreateSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// See if we already have this in the cache
for (var i = 0; i < _spineCache.length; i++) {
if (_spineCache[i].$1 == segmentNumber) {
// Touch the element
final x = _spineCache.removeAt(i);
_spineCache.add(x);
// Return the shortarray for this position
return x.$2.ref();
}
}
// If we don't have it in the cache, get/create it and then cache a ref
final segment = await _openOrCreateSegmentInner(segmentNumber);
_spineCache.add((segmentNumber, await segment.ref()));
if (_spineCache.length > _spineCacheLength) {
// Trim the LRU cache
final (_, sa) = _spineCache.removeAt(0);
await sa.close();
}
return segment;
}
Future<DHTShortArray?> _openSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// See if we already have this in the cache
for (var i = 0; i < _spineCache.length; i++) {
if (_spineCache[i].$1 == segmentNumber) {
// Touch the element
final x = _spineCache.removeAt(i);
_spineCache.add(x);
// Return the shortarray for this position
return x.$2.ref();
}
}
// If we don't have it in the cache, get it and then cache it
final segment = await _openSegmentInner(segmentNumber);
if (segment == null) {
return null;
}
_spineCache.add((segmentNumber, await segment.ref()));
if (_spineCache.length > _spineCacheLength) {
// Trim the LRU cache
final (_, sa) = _spineCache.removeAt(0);
await sa.close();
}
return segment;
}
_DHTLogSegmentLookup _lookupSegment(int segmentNumber) { _DHTLogSegmentLookup _lookupSegment(int segmentNumber) {
assert(_spineMutex.isLocked, 'should be in mutex here'); assert(_spineMutex.isLocked, 'should be in mutex here');
@ -471,13 +397,15 @@ class _DHTLogSpine {
int segmentNumber, int segmentPos, int segmentNumber, int segmentPos,
{bool onlyOpened = false}) async => {bool onlyOpened = false}) async =>
_spineCacheMutex.protect(() async { _spineCacheMutex.protect(() async {
// Get the segment shortArray // See if we have this segment opened already
final openedSegment = _openedSegments[segmentNumber]; final openedSegment = _openedSegments[segmentNumber];
late final DHTShortArray shortArray; late DHTShortArray shortArray;
if (openedSegment != null) { if (openedSegment != null) {
openedSegment.openCount++; // If so, return a ref
shortArray = openedSegment.shortArray; await openedSegment.ref();
shortArray = openedSegment;
} else { } else {
// Otherwise open a segment
if (onlyOpened) { if (onlyOpened) {
return null; return null;
} }
@ -488,13 +416,26 @@ class _DHTLogSpine {
if (newShortArray == null) { if (newShortArray == null) {
return null; return null;
} }
// Keep in the opened segments table
_openedSegments[segmentNumber] = _openedSegments[segmentNumber] = newShortArray;
_OpenedSegment._(shortArray: newShortArray);
shortArray = newShortArray; shortArray = newShortArray;
} }
// LRU cache the segment number
if (!_openCache.remove(segmentNumber)) {
// If this is new to the cache ref it when it goes in
await shortArray.ref();
}
_openCache.add(segmentNumber);
if (_openCache.length > _openCacheSize) {
// Trim the LRU cache
final lruseg = _openCache.removeAt(0);
final lrusa = _openedSegments[lruseg]!;
if (await lrusa.close()) {
_openedSegments.remove(lruseg);
}
}
return _DHTLogPosition._( return _DHTLogPosition._(
dhtLogSpine: this, dhtLogSpine: this,
shortArray: shortArray, shortArray: shortArray,
@ -521,15 +462,15 @@ class _DHTLogSpine {
return lookupPositionBySegmentNumber(segmentNumber, segmentPos); return lookupPositionBySegmentNumber(segmentNumber, segmentPos);
} }
Future<void> _segmentClosed(int segmentNumber) async { Future<bool> _segmentClosed(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be locked'); assert(_spineMutex.isLocked, 'should be locked');
await _spineCacheMutex.protect(() async { return _spineCacheMutex.protect(() async {
final os = _openedSegments[segmentNumber]!; final sa = _openedSegments[segmentNumber]!;
os.openCount--; if (await sa.close()) {
if (os.openCount == 0) {
_openedSegments.remove(segmentNumber); _openedSegments.remove(segmentNumber);
await os.shortArray.close(); return true;
} }
return false;
}); });
} }
@ -693,23 +634,28 @@ class _DHTLogSpine {
// and force their short arrays to refresh their heads if // and force their short arrays to refresh their heads if
// they are opened // they are opened
final segmentsToRefresh = <_DHTLogPosition>[]; final segmentsToRefresh = <_DHTLogPosition>[];
for (var curTail = oldTail; var curTail = oldTail;
curTail != _tail; final endSegmentNumber = _tail ~/ DHTShortArray.maxElements;
curTail = (curTail + while (true) {
(DHTShortArray.maxElements -
(curTail % DHTShortArray.maxElements))) %
_positionLimit) {
final segmentNumber = curTail ~/ DHTShortArray.maxElements; final segmentNumber = curTail ~/ DHTShortArray.maxElements;
final segmentPos = curTail % DHTShortArray.maxElements; final segmentPos = curTail % DHTShortArray.maxElements;
final dhtLogPosition = await lookupPositionBySegmentNumber( final dhtLogPosition = await lookupPositionBySegmentNumber(
segmentNumber, segmentPos, segmentNumber, segmentPos,
onlyOpened: true); onlyOpened: true);
if (dhtLogPosition == null) { if (dhtLogPosition != null) {
continue;
}
segmentsToRefresh.add(dhtLogPosition); segmentsToRefresh.add(dhtLogPosition);
} }
if (segmentNumber == endSegmentNumber) {
break;
}
curTail = (curTail +
(DHTShortArray.maxElements -
(curTail % DHTShortArray.maxElements))) %
_positionLimit;
}
// Refresh the segments that have probably changed // Refresh the segments that have probably changed
await segmentsToRefresh.map((p) async { await segmentsToRefresh.map((p) async {
await p.shortArray.refresh(); await p.shortArray.refresh();
@ -759,7 +705,7 @@ class _DHTLogSpine {
// LRU cache of DHT spine elements accessed recently // LRU cache of DHT spine elements accessed recently
// Pair of position and associated shortarray segment // Pair of position and associated shortarray segment
final Mutex _spineCacheMutex = Mutex(); final Mutex _spineCacheMutex = Mutex();
final List<(int, DHTShortArray)> _spineCache; final List<int> _openCache;
final Map<int, _OpenedSegment> _openedSegments; final Map<int, DHTShortArray> _openedSegments;
static const int _spineCacheLength = 3; static const int _openCacheSize = 3;
} }

View file

@ -36,7 +36,7 @@ enum DHTRecordRefreshMode {
///////////////////////////////////////////////// /////////////////////////////////////////////////
class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> { class DHTRecord implements DHTDeleteable<DHTRecord> {
DHTRecord._( DHTRecord._(
{required VeilidRoutingContext routingContext, {required VeilidRoutingContext routingContext,
required SharedDHTRecordData sharedDHTRecordData, required SharedDHTRecordData sharedDHTRecordData,
@ -64,25 +64,25 @@ class DHTRecord implements DHTDeleteable<DHTRecord, DHTRecord> {
/// Add a reference to this DHTRecord /// Add a reference to this DHTRecord
@override @override
Future<DHTRecord> ref() async => _mutex.protect(() async { Future<void> ref() async => _mutex.protect(() async {
_openCount++; _openCount++;
return this;
}); });
/// Free all resources for the DHTRecord /// Free all resources for the DHTRecord
@override @override
Future<void> close() async => _mutex.protect(() async { Future<bool> close() async => _mutex.protect(() async {
if (_openCount == 0) { if (_openCount == 0) {
throw StateError('already closed'); throw StateError('already closed');
} }
_openCount--; _openCount--;
if (_openCount != 0) { if (_openCount != 0) {
return; return false;
} }
await _watchController?.close(); await _watchController?.close();
_watchController = null; _watchController = null;
await DHTRecordPool.instance._recordClosed(this); await DHTRecordPool.instance._recordClosed(this);
return true;
}); });
/// Free all resources for the DHTRecord and delete it from the DHT /// Free all resources for the DHTRecord and delete it from the DHT

View file

@ -13,7 +13,7 @@ part 'dht_short_array_write.dart';
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> { class DHTShortArray implements DHTDeleteable<DHTShortArray> {
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Constructors // Constructors
@ -148,25 +148,25 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray, DHTShortArray> {
/// Add a reference to this shortarray /// Add a reference to this shortarray
@override @override
Future<DHTShortArray> ref() async => _mutex.protect(() async { Future<void> ref() async => _mutex.protect(() async {
_openCount++; _openCount++;
return this;
}); });
/// Free all resources for the DHTShortArray /// Free all resources for the DHTShortArray
@override @override
Future<void> close() async => _mutex.protect(() async { Future<bool> close() async => _mutex.protect(() async {
if (_openCount == 0) { if (_openCount == 0) {
throw StateError('already closed'); throw StateError('already closed');
} }
_openCount--; _openCount--;
if (_openCount != 0) { if (_openCount != 0) {
return; return false;
} }
await _watchController?.close(); await _watchController?.close();
_watchController = null; _watchController = null;
await _head.close(); await _head.close();
return true;
}); });
/// Free all resources for the DHTShortArray and delete it from the DHT /// Free all resources for the DHTShortArray and delete it from the DHT

View file

@ -2,19 +2,23 @@ import 'dart:async';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
abstract class DHTCloseable<C, D> { abstract class DHTCloseable<D> {
// Public interface
Future<void> ref();
Future<bool> close();
// Internal implementation
@protected
bool get isOpen; bool get isOpen;
@protected @protected
FutureOr<D> scoped(); FutureOr<D> scoped();
Future<C> ref();
Future<void> close();
} }
abstract class DHTDeleteable<C, D> extends DHTCloseable<C, D> { abstract class DHTDeleteable<D> extends DHTCloseable<D> {
Future<void> delete(); Future<void> delete();
} }
extension DHTCloseableExt<C, D> on DHTCloseable<C, D> { extension DHTCloseableExt<D> on DHTCloseable<D> {
/// Runs a closure that guarantees the DHTCloseable /// Runs a closure that guarantees the DHTCloseable
/// will be closed upon exit, even if an uncaught exception is thrown /// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(D) scopeFunction) async { Future<T> scope<T>(Future<T> Function(D) scopeFunction) async {
@ -29,7 +33,7 @@ extension DHTCloseableExt<C, D> on DHTCloseable<C, D> {
} }
} }
extension DHTDeletableExt<C, D> on DHTDeleteable<C, D> { extension DHTDeletableExt<D> on DHTDeleteable<D> {
/// Runs a closure that guarantees the DHTCloseable /// Runs a closure that guarantees the DHTCloseable
/// will be closed upon exit, and deleted if an an /// will be closed upon exit, and deleted if an an
/// uncaught exception is thrown /// uncaught exception is thrown