diff --git a/packages/veilid_support/example/integration_test/app_test.dart b/packages/veilid_support/example/integration_test/app_test.dart index ba7785c..b3548a2 100644 --- a/packages/veilid_support/example/integration_test/app_test.dart +++ b/packages/veilid_support/example/integration_test/app_test.dart @@ -12,6 +12,8 @@ import 'test_dht_record_pool.dart'; import 'test_dht_short_array.dart'; void main() { + final startTime = DateTime.now(); + IntegrationTestWidgetsFlutterBinding.ensureInitialized(); final veilidFixture = DefaultVeilidFixture(programName: 'veilid_support integration test'); @@ -39,26 +41,26 @@ void main() { test('create pool', testDHTRecordPoolCreate); - // group('DHTRecordPool Tests', () { - // setUpAll(dhtRecordPoolFixture.setUp); - // tearDownAll(dhtRecordPoolFixture.tearDown); + group('DHTRecordPool Tests', () { + setUpAll(dhtRecordPoolFixture.setUp); + tearDownAll(dhtRecordPoolFixture.tearDown); - // test('create/delete record', testDHTRecordCreateDelete); - // test('record scopes', testDHTRecordScopes); - // test('create/delete deep record', testDHTRecordDeepCreateDelete); - // }); + test('create/delete record', testDHTRecordCreateDelete); + test('record scopes', testDHTRecordScopes); + test('create/delete deep record', testDHTRecordDeepCreateDelete); + }); - // group('DHTShortArray Tests', () { - // setUpAll(dhtRecordPoolFixture.setUp); - // tearDownAll(dhtRecordPoolFixture.tearDown); + group('DHTShortArray Tests', () { + setUpAll(dhtRecordPoolFixture.setUp); + tearDownAll(dhtRecordPoolFixture.tearDown); - // for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { - // test('create shortarray stride=$stride', - // makeTestDHTShortArrayCreateDelete(stride: stride)); - // test('add shortarray stride=$stride', - // makeTestDHTShortArrayAdd(stride: 256)); - // } - // }); + for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + test('create shortarray stride=$stride', + makeTestDHTShortArrayCreateDelete(stride: stride)); + test('add shortarray stride=$stride', + makeTestDHTShortArrayAdd(stride: 256)); + } + }); group('DHTLog Tests', () { setUpAll(dhtRecordPoolFixture.setUp); @@ -75,4 +77,7 @@ void main() { }); }); }); + + final endTime = DateTime.now(); + print('Duration: ${endTime.difference(startTime)}'); } diff --git a/packages/veilid_support/example/integration_test/test_dht_log.dart b/packages/veilid_support/example/integration_test/test_dht_log.dart index fcdabad..f8d758e 100644 --- a/packages/veilid_support/example/integration_test/test_dht_log.dart +++ b/packages/veilid_support/example/integration_test/test_dht_log.dart @@ -42,7 +42,7 @@ Future Function() makeTestDHTLogCreateDelete({required int stride}) => // Operate should still succeed because things aren't closed expect(await dlog.operate((r) async => r.length), isZero); await dlog.close(); - await dlog.close(); + await expectLater(() async => dlog.close(), throwsA(isA())); // Operate should fail await expectLater(() async => dlog.operate((r) async => r.length), throwsA(isA())); @@ -51,8 +51,6 @@ Future Function() makeTestDHTLogCreateDelete({required int stride}) => Future Function() makeTestDHTLogAddTruncate({required int stride}) => () async { - final startTime = DateTime.now(); - final dlog = await DHTLog.create( debugName: 'log_add 1 stride $stride', stride: stride); @@ -121,10 +119,8 @@ Future Function() makeTestDHTLogAddTruncate({required int stride}) => final dataset8 = await dlog.operate((r) async => r.getItemRange(0)); expect(dataset8, isEmpty); } + print('delete and close\n'); await dlog.delete(); await dlog.close(); - - final endTime = DateTime.now(); - print('Duration: ${endTime.difference(startTime)}'); }; diff --git a/packages/veilid_support/example/integration_test/test_dht_record_pool.dart b/packages/veilid_support/example/integration_test/test_dht_record_pool.dart index 45b26a7..2f52c00 100644 --- a/packages/veilid_support/example/integration_test/test_dht_record_pool.dart +++ b/packages/veilid_support/example/integration_test/test_dht_record_pool.dart @@ -48,7 +48,7 @@ Future testDHTRecordCreateDelete() async { // Set should succeed still await rec3.tryWriteBytes(utf8.encode('test')); await rec3.close(); - await rec3.close(); + await expectLater(() async => rec3.close(), throwsA(isA())); // Set should fail await expectLater(() async => rec3.tryWriteBytes(utf8.encode('test')), throwsA(isA())); @@ -84,7 +84,7 @@ Future testDHTRecordScopes() async { } on Exception { assert(false, 'should not throw'); } - await rec2.close(); + await expectLater(() async => rec2.close(), throwsA(isA())); await pool.deleteRecord(rec2.key); } @@ -115,6 +115,7 @@ Future testDHTRecordGetSet() async { final val = await rec.get(); await pool.deleteRecord(rec.key); expect(val, isNull); + await rec.close(); } // Test set then get @@ -125,6 +126,7 @@ Future testDHTRecordGetSet() async { // Invalid subkey should throw await expectLater( () async => rec2.get(subkey: 1), throwsA(isA())); + await rec2.close(); await pool.deleteRecord(rec2.key); } diff --git a/packages/veilid_support/example/integration_test/test_dht_short_array.dart b/packages/veilid_support/example/integration_test/test_dht_short_array.dart index 6ba2d23..7dead48 100644 --- a/packages/veilid_support/example/integration_test/test_dht_short_array.dart +++ b/packages/veilid_support/example/integration_test/test_dht_short_array.dart @@ -43,7 +43,7 @@ Future Function() makeTestDHTShortArrayCreateDelete( // Operate should still succeed because things aren't closed expect(await arr.operate((r) async => r.length), isZero); await arr.close(); - await arr.close(); + await expectLater(() async => arr.close(), throwsA(isA())); // Operate should fail await expectLater(() async => arr.operate((r) async => r.length), throwsA(isA())); @@ -52,8 +52,6 @@ Future Function() makeTestDHTShortArrayCreateDelete( Future Function() makeTestDHTShortArrayAdd({required int stride}) => () async { - final startTime = DateTime.now(); - final arr = await DHTShortArray.create( debugName: 'sa_add 1 stride $stride', stride: stride); @@ -131,7 +129,4 @@ Future Function() makeTestDHTShortArrayAdd({required int stride}) => await arr.delete(); await arr.close(); - - final endTime = DateTime.now(); - print('Duration: ${endTime.difference(startTime)}'); }; diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart index f7b606c..5dd36a0 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log.dart @@ -42,11 +42,13 @@ class DHTLogUpdate extends Equatable { /// * The head and tail position of the log /// - subkeyIdx = pos / recordsPerSubkey /// - recordIdx = pos % recordsPerSubkey -class DHTLog implements DHTOpenable { +class DHTLog implements DHTOpenable { //////////////////////////////////////////////////////////////// // Constructors - DHTLog._({required _DHTLogSpine spine}) : _spine = spine { + DHTLog._({required _DHTLogSpine spine}) + : _spine = spine, + _openCount = 1 { _spine.onUpdatedSpine = (update) { _watchController?.sink.add(update); }; @@ -162,18 +164,29 @@ class DHTLog implements DHTOpenable { /// Check if the DHTLog is open @override - bool get isOpen => _spine.isOpen; + bool get isOpen => _openCount > 0; + + /// Add a reference to this log + @override + Future ref() async => _mutex.protect(() async { + _openCount++; + return this; + }); /// Free all resources for the DHTLog @override - Future close() async { - if (!isOpen) { - return; - } - await _watchController?.close(); - _watchController = null; - await _spine.close(); - } + Future close() async => _mutex.protect(() async { + if (_openCount == 0) { + throw StateError('already closed'); + } + _openCount--; + if (_openCount != 0) { + return; + } + await _watchController?.close(); + _watchController = null; + await _spine.close(); + }); /// Free all resources for the DHTLog and delete it from the DHT /// Will wait until the short array is closed to delete it @@ -284,6 +297,10 @@ class DHTLog implements DHTOpenable { // Internal representation refreshed from spine record final _DHTLogSpine _spine; + // Openable + int _openCount; + final _mutex = Mutex(); + // Watch mutex to ensure we keep the representation valid final Mutex _listenMutex = Mutex(); // Stream of external changes diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart index 96c3eb4..26c22da 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_append.dart @@ -17,16 +17,16 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead { } // Write item to the segment - return lookup.shortArray.operateWrite((write) async { - // If this a new segment, then clear it in case we have wrapped around - if (lookup.pos == 0) { - await write.clear(); - } else if (lookup.pos != write.length) { - // We should always be appending at the length - throw StateError('appending should be at the end'); - } - return write.tryAddItem(value); - }); + return lookup.shortArray.scope((sa) => sa.operateWrite((write) async { + // If this a new segment, then clear it in case we have wrapped around + if (lookup.pos == 0) { + await write.clear(); + } else if (lookup.pos != write.length) { + // We should always be appending at the length + throw StateError('appending should be at the end'); + } + return write.tryAddItem(value); + })); } @override @@ -45,16 +45,19 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead { } final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos); - final success = await lookup.shortArray.operateWrite((write) async { - // If this a new segment, then clear it in case we have wrapped around - if (lookup.pos == 0) { - await write.clear(); - } else if (lookup.pos != write.length) { - // We should always be appending at the length - throw StateError('appending should be at the end'); - } - return write.tryAddItems(values.sublist(valueIdx, valueIdx + sacount)); - }); + final success = + await lookup.shortArray.scope((sa) => sa.operateWrite((write) async { + // If this a new segment, then clear it in + // case we have wrapped around + if (lookup.pos == 0) { + await write.clear(); + } else if (lookup.pos != write.length) { + // We should always be appending at the length + throw StateError('appending should be at the end'); + } + return write + .tryAddItems(values.sublist(valueIdx, valueIdx + sacount)); + })); if (!success) { return false; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart index 0919412..ea36fc2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_read.dart @@ -19,8 +19,8 @@ class _DHTLogRead implements DHTRandomRead { return null; } - return lookup.shortArray.operate( - (read) => read.getItem(lookup.pos, forceRefresh: forceRefresh)); + return lookup.shortArray.scope((sa) => sa.operate( + (read) => read.getItem(lookup.pos, forceRefresh: forceRefresh))); } (int, int) _clampStartLen(int start, int? len) { @@ -71,22 +71,22 @@ class _DHTLogRead implements DHTRandomRead { // Check each segment for offline positions var foundOffline = false; - await lookup.shortArray.operate((read) async { - final segmentOffline = await read.getOfflinePositions(); + await lookup.shortArray.scope((sa) => sa.operate((read) async { + final segmentOffline = await read.getOfflinePositions(); - // For each shortarray segment go through their segment positions - // in reverse order and see if they are offline - for (var segmentPos = lookup.pos; - segmentPos >= 0 && pos >= 0; - segmentPos--, pos--) { - // If the position in the segment is offline, then - // mark the position in the log as offline - if (segmentOffline.contains(segmentPos)) { - positionOffline.add(pos); - foundOffline = true; - } - } - }); + // For each shortarray segment go through their segment positions + // in reverse order and see if they are offline + for (var segmentPos = lookup.pos; + segmentPos >= 0 && pos >= 0; + segmentPos--, pos--) { + // If the position in the segment is offline, then + // mark the position in the log as offline + if (segmentOffline.contains(segmentPos)) { + positionOffline.add(pos); + foundOffline = true; + } + } + })); // If we found nothing offline in this segment then we can stop if (!foundOffline) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index 7ce3dbf..12a80c9 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -15,6 +15,13 @@ class _DHTLogSegmentLookup extends Equatable { List get props => [subkey, segment]; } +class _SubkeyData { + _SubkeyData({required this.subkey, required this.data}); + int subkey; + Uint8List data; + bool changed = false; +} + class _DHTLogSpine { _DHTLogSpine._( {required DHTRecord spineRecord, @@ -47,7 +54,7 @@ class _DHTLogSpine { static Future<_DHTLogSpine> load({required DHTRecord spineRecord}) async { // Get an updated spine head record copy if one exists final spineHead = await spineRecord.getProtobuf(proto.DHTLog.fromBuffer, - subkey: 0, refreshMode: DHTRecordRefreshMode.refresh); + subkey: 0, refreshMode: DHTRecordRefreshMode.network); if (spineHead == null) { throw StateError('spine head missing during refresh'); } @@ -234,7 +241,7 @@ class _DHTLogSpine { segmentKeyBytes); } - Future _getOrCreateSegmentInner(int segmentNumber) async { + Future _openOrCreateSegmentInner(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be in mutex here'); assert(_spineRecord.writer != null, 'should be writable'); @@ -292,7 +299,7 @@ class _DHTLogSpine { } } - Future _getSegmentInner(int segmentNumber) async { + Future _openSegmentInner(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be in mutex here'); // Lookup what subkey and segment subrange has this position's segment @@ -321,7 +328,7 @@ class _DHTLogSpine { return segmentRec; } - Future getOrCreateSegment(int segmentNumber) async { + Future _openOrCreateSegment(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be in mutex here'); // See if we already have this in the cache @@ -331,21 +338,22 @@ class _DHTLogSpine { final x = _spineCache.removeAt(i); _spineCache.add(x); // Return the shortarray for this position - return x.$2; + return x.$2.ref(); } } - // If we don't have it in the cache, get/create it and then cache it - final segment = await _getOrCreateSegmentInner(segmentNumber); - _spineCache.add((segmentNumber, segment)); + // If we don't have it in the cache, get/create it and then cache a ref + final segment = await _openOrCreateSegmentInner(segmentNumber); + _spineCache.add((segmentNumber, await segment.ref())); if (_spineCache.length > _spineCacheLength) { // Trim the LRU cache - _spineCache.removeAt(0); + final (_, sa) = _spineCache.removeAt(0); + await sa.close(); } return segment; } - Future getSegment(int segmentNumber) async { + Future _openSegment(int segmentNumber) async { assert(_spineMutex.isLocked, 'should be in mutex here'); // See if we already have this in the cache @@ -355,19 +363,20 @@ class _DHTLogSpine { final x = _spineCache.removeAt(i); _spineCache.add(x); // Return the shortarray for this position - return x.$2; + return x.$2.ref(); } } // If we don't have it in the cache, get it and then cache it - final segment = await _getSegmentInner(segmentNumber); + final segment = await _openSegmentInner(segmentNumber); if (segment == null) { return null; } - _spineCache.add((segmentNumber, segment)); + _spineCache.add((segmentNumber, await segment.ref())); if (_spineCache.length > _spineCacheLength) { // Trim the LRU cache - _spineCache.removeAt(0); + final (_, sa) = _spineCache.removeAt(0); + await sa.close(); } return segment; } @@ -409,8 +418,8 @@ class _DHTLogSpine { // Get the segment shortArray final shortArray = (_spineRecord.writer == null) - ? await getSegment(segmentNumber) - : await getOrCreateSegment(segmentNumber); + ? await _openSegment(segmentNumber) + : await _openOrCreateSegment(segmentNumber); if (shortArray == null) { return null; } @@ -442,12 +451,16 @@ class _DHTLogSpine { throw StateError('ring buffer underflow'); } + final oldHead = _head; _head = (_head + count) % _positionLimit; - await _purgeUnusedSegments(); + final newHead = _head; + await _purgeSegments(oldHead, newHead); } Future _deleteSegmentsContiguous(int start, int end) async { assert(_spineMutex.isLocked, 'should be in mutex here'); + DHTRecordPool.instance + .log('_deleteSegmentsContiguous: start=$start, end=$end'); final startSegmentNumber = start ~/ DHTShortArray.maxElements; final startSegmentPos = start % DHTShortArray.maxElements; @@ -460,8 +473,7 @@ class _DHTLogSpine { final lastDeleteSegment = (endSegmentPos == 0) ? endSegmentNumber - 1 : endSegmentNumber - 2; - int? lastSubkey; - Uint8List? subkeyData; + _SubkeyData? lastSubkeyData; for (var segmentNumber = firstDeleteSegment; segmentNumber <= lastDeleteSegment; segmentNumber++) { @@ -471,44 +483,48 @@ class _DHTLogSpine { final subkey = l.subkey; final segment = l.segment; - if (lastSubkey != subkey) { + if (subkey != lastSubkeyData?.subkey) { // Flush subkey writes - if (lastSubkey != null) { - await _spineRecord.eventualWriteBytes(subkeyData!, - subkey: lastSubkey); + if (lastSubkeyData != null && lastSubkeyData.changed) { + await _spineRecord.eventualWriteBytes(lastSubkeyData.data, + subkey: lastSubkeyData.subkey); } - xxx debug this, it takes forever - - // Get next subkey - subkeyData = await _spineRecord.get(subkey: subkey); - if (subkeyData != null) { - lastSubkey = subkey; + // Get next subkey if available locally + final data = await _spineRecord.get( + subkey: subkey, refreshMode: DHTRecordRefreshMode.local); + if (data != null) { + lastSubkeyData = _SubkeyData(subkey: subkey, data: data); } else { - lastSubkey = null; + lastSubkeyData = null; + // If the subkey was not available locally we can go to the + // last segment number at the end of this subkey + segmentNumber = ((subkey + 1) * DHTLog.segmentsPerSubkey) - 1; } } - if (subkeyData != null) { - final segmentKey = _getSegmentKey(subkeyData, segment); + if (lastSubkeyData != null) { + final segmentKey = _getSegmentKey(lastSubkeyData.data, segment); if (segmentKey != null) { await DHTRecordPool.instance.deleteRecord(segmentKey); - _setSegmentKey(subkeyData, segment, null); + _setSegmentKey(lastSubkeyData.data, segment, null); + lastSubkeyData.changed = true; } } } // Flush subkey writes - if (lastSubkey != null) { - await _spineRecord.eventualWriteBytes(subkeyData!, subkey: lastSubkey); + if (lastSubkeyData != null) { + await _spineRecord.eventualWriteBytes(lastSubkeyData.data, + subkey: lastSubkeyData.subkey); } } - Future _purgeUnusedSegments() async { + Future _purgeSegments(int from, int to) async { assert(_spineMutex.isLocked, 'should be in mutex here'); - if (_head < _tail) { - await _deleteSegmentsContiguous(0, _head); - await _deleteSegmentsContiguous(_tail, _positionLimit); - } else if (_head > _tail) { - await _deleteSegmentsContiguous(_tail, _head); + if (from < to) { + await _deleteSegmentsContiguous(from, to); + } else if (from > to) { + await _deleteSegmentsContiguous(from, _positionLimit); + await _deleteSegmentsContiguous(0, to); } } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart index 0b4e0b6..a333160 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/default_dht_record_cubit.dart @@ -39,7 +39,7 @@ class DefaultDHTRecordCubit extends DHTRecordCubit { final firstSubkey = subkeys.firstOrNull!.low; if (firstSubkey != defaultSubkey || updatedata == null) { final maybeData = - await record.get(refreshMode: DHTRecordRefreshMode.refresh); + await record.get(refreshMode: DHTRecordRefreshMode.network); if (maybeData == null) { return null; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index 3d625f8..7bf5129 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -16,19 +16,27 @@ class DHTRecordWatchChange extends Equatable { /// Refresh mode for DHT record 'get' enum DHTRecordRefreshMode { /// Return existing subkey values if they exist locally already - existing, + /// And then check the network for a newer value + /// This is the default refresh mode + cached, + + /// Return existing subkey values only if they exist locally already + local, /// Always check the network for a newer subkey value - refresh, + network, /// Always check the network for a newer subkey value but only /// return that value if its sequence number is newer than the local value - refreshOnlyUpdates, + update; + + bool get _forceRefresh => this == network || this == update; + bool get _inspectLocal => this == local || this == update; } ///////////////////////////////////////////////// -class DHTRecord implements DHTOpenable { +class DHTRecord implements DHTOpenable { DHTRecord._( {required VeilidRoutingContext routingContext, required SharedDHTRecordData sharedDHTRecordData, @@ -40,7 +48,7 @@ class DHTRecord implements DHTOpenable { _routingContext = routingContext, _defaultSubkey = defaultSubkey, _writer = writer, - _open = true, + _openCount = 1, _sharedDHTRecordData = sharedDHTRecordData; //////////////////////////////////////////////////////////////////////////// @@ -48,25 +56,37 @@ class DHTRecord implements DHTOpenable { /// Check if the DHTRecord is open @override - bool get isOpen => _open; + bool get isOpen => _openCount > 0; + + /// Add a reference to this DHTRecord + @override + Future ref() async => _mutex.protect(() async { + _openCount++; + return this; + }); /// Free all resources for the DHTRecord @override - Future close() async { - if (!_open) { - return; - } - await watchController?.close(); - await DHTRecordPool.instance._recordClosed(this); - _open = false; - } + Future close() async => _mutex.protect(() async { + if (_openCount == 0) { + throw StateError('already closed'); + } + _openCount--; + if (_openCount != 0) { + return; + } + + await _watchController?.close(); + _watchController = null; + await DHTRecordPool.instance._recordClosed(this); + }); /// Free all resources for the DHTRecord and delete it from the DHT /// Will wait until the record is closed to delete it @override - Future delete() async { - await DHTRecordPool.instance.deleteRecord(key); - } + Future delete() async => _mutex.protect(() async { + await DHTRecordPool.instance.deleteRecord(key); + }); //////////////////////////////////////////////////////////////////////////// // Public API @@ -95,25 +115,37 @@ class DHTRecord implements DHTOpenable { Future get( {int subkey = -1, DHTRecordCrypto? crypto, - DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); + + // Get the last sequence number if we need it + final lastSeq = + refreshMode._inspectLocal ? await _localSubkeySeq(subkey) : null; + + // See if we only ever want the locally stored value + if (refreshMode == DHTRecordRefreshMode.local && lastSeq == null) { + // If it's not available locally already just return null now + return null; + } + final valueData = await _routingContext.getDHTValue(key, subkey, - forceRefresh: refreshMode != DHTRecordRefreshMode.existing); + forceRefresh: refreshMode._forceRefresh); if (valueData == null) { return null; } - final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; - if (refreshMode == DHTRecordRefreshMode.refreshOnlyUpdates && + // See if this get resulted in a newer sequence number + if (refreshMode == DHTRecordRefreshMode.update && lastSeq != null && valueData.seq <= lastSeq) { + // If we're only returning updates then punt now return null; } + // If we're returning a value, decrypt it final out = (crypto ?? _crypto).decrypt(valueData.data, subkey); if (outSeqNum != null) { outSeqNum.save(valueData.seq); } - _sharedDHTRecordData.subkeySeqCache[subkey] = valueData.seq; return out; } @@ -128,7 +160,7 @@ class DHTRecord implements DHTOpenable { Future getJson(T Function(dynamic) fromJson, {int subkey = -1, DHTRecordCrypto? crypto, - DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, Output? outSeqNum}) async { final data = await get( subkey: subkey, @@ -154,7 +186,7 @@ class DHTRecord implements DHTOpenable { T Function(List i) fromBuffer, {int subkey = -1, DHTRecordCrypto? crypto, - DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, Output? outSeqNum}) async { final data = await get( subkey: subkey, @@ -176,7 +208,7 @@ class DHTRecord implements DHTOpenable { KeyPair? writer, Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); - final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; + final lastSeq = await _localSubkeySeq(subkey); final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue, subkey); @@ -198,7 +230,6 @@ class DHTRecord implements DHTOpenable { if (isUpdated && outSeqNum != null) { outSeqNum.save(newValueData.seq); } - _sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq; // See if the encrypted data returned is exactly the same // if so, shortcut and don't bother decrypting it @@ -228,7 +259,7 @@ class DHTRecord implements DHTOpenable { KeyPair? writer, Output? outSeqNum}) async { subkey = subkeyOrDefault(subkey); - final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey]; + final lastSeq = await _localSubkeySeq(subkey); final encryptedNewValue = await (crypto ?? _crypto).encrypt(newValue, subkey); @@ -254,7 +285,6 @@ class DHTRecord implements DHTOpenable { if (outSeqNum != null) { outSeqNum.save(newValueData.seq); } - _sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq; // The encrypted data returned should be exactly the same // as what we are trying to set, @@ -402,13 +432,13 @@ class DHTRecord implements DHTOpenable { DHTRecordCrypto? crypto, }) async { // Set up watch requirements - watchController ??= + _watchController ??= StreamController.broadcast(onCancel: () { // If there are no more listeners then we can get rid of the controller - watchController = null; + _watchController = null; }); - return watchController!.stream.listen( + return _watchController!.stream.listen( (change) { if (change.local && !localChanges) { return; @@ -431,8 +461,8 @@ class DHTRecord implements DHTOpenable { }, cancelOnError: true, onError: (e) async { - await watchController!.close(); - watchController = null; + await _watchController!.close(); + _watchController = null; }); } @@ -455,6 +485,14 @@ class DHTRecord implements DHTOpenable { ////////////////////////////////////////////////////////////////////////// + Future _localSubkeySeq(int subkey) async { + final rr = await _routingContext.inspectDHTRecord( + key, + subkeys: [ValueSubkeyRange.single(subkey)], + ); + return rr.localSeqs.firstOrNull ?? 0xFFFFFFFF; + } + void _addValueChange( {required bool local, required Uint8List? data, @@ -464,7 +502,7 @@ class DHTRecord implements DHTOpenable { final watchedSubkeys = ws.subkeys; if (watchedSubkeys == null) { // Report all subkeys - watchController?.add( + _watchController?.add( DHTRecordWatchChange(local: local, data: data, subkeys: subkeys)); } else { // Only some subkeys are being watched, see if the reported update @@ -479,7 +517,7 @@ class DHTRecord implements DHTOpenable { overlappedFirstSubkey == updateFirstSubkey ? data : null; // Report only watched subkeys - watchController?.add(DHTRecordWatchChange( + _watchController?.add(DHTRecordWatchChange( local: local, data: updatedData, subkeys: overlappedSubkeys)); } } @@ -504,10 +542,9 @@ class DHTRecord implements DHTOpenable { final KeyPair? _writer; final DHTRecordCrypto _crypto; final String debugName; - - bool _open; - @internal - StreamController? watchController; + final _mutex = Mutex(); + int _openCount; + StreamController? _watchController; @internal WatchState? watchState; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart index 8616658..1cfcfcd 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_cubit.dart @@ -93,7 +93,7 @@ class DHTRecordCubit extends Cubit> { for (final skr in subkeys) { for (var sk = skr.low; sk <= skr.high; sk++) { final data = await _record.get( - subkey: sk, refreshMode: DHTRecordRefreshMode.refreshOnlyUpdates); + subkey: sk, refreshMode: DHTRecordRefreshMode.update); if (data != null) { final newState = await _stateFunction(_record, updateSubkeys, data); if (newState != null) { diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index a4748df..a8e86a1 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -88,7 +88,6 @@ class SharedDHTRecordData { DHTRecordDescriptor recordDescriptor; KeyPair? defaultWriter; VeilidRoutingContext defaultRoutingContext; - Map subkeySeqCache = {}; bool needsWatchStateUpdate = false; WatchState? unionWatchState; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index 082a391..cd62fa6 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -13,12 +13,13 @@ part 'dht_short_array_write.dart'; /////////////////////////////////////////////////////////////////////// -class DHTShortArray implements DHTOpenable { +class DHTShortArray implements DHTOpenable { //////////////////////////////////////////////////////////////// // Constructors DHTShortArray._({required DHTRecord headRecord}) - : _head = _DHTShortArrayHead(headRecord: headRecord) { + : _head = _DHTShortArrayHead(headRecord: headRecord), + _openCount = 1 { _head.onUpdatedHead = () { _watchController?.sink.add(null); }; @@ -139,18 +140,30 @@ class DHTShortArray implements DHTOpenable { /// Check if the shortarray is open @override - bool get isOpen => _head.isOpen; + bool get isOpen => _openCount > 0; + + /// Add a reference to this shortarray + @override + Future ref() async => _mutex.protect(() async { + _openCount++; + return this; + }); /// Free all resources for the DHTShortArray @override - Future close() async { - if (!isOpen) { - return; - } - await _watchController?.close(); - _watchController = null; - await _head.close(); - } + Future close() async => _mutex.protect(() async { + if (_openCount == 0) { + throw StateError('already closed'); + } + _openCount--; + if (_openCount != 0) { + return; + } + + await _watchController?.close(); + _watchController = null; + await _head.close(); + }); /// Free all resources for the DHTShortArray and delete it from the DHT /// Will wait until the short array is closed to delete it @@ -255,6 +268,10 @@ class DHTShortArray implements DHTOpenable { // Internal representation refreshed from head record final _DHTShortArrayHead _head; + // Openable + int _openCount; + final _mutex = Mutex(); + // Watch mutex to ensure we keep the representation valid final Mutex _listenMutex = Mutex(); // Stream of external changes diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index e2bf392..1403e87 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -248,7 +248,7 @@ class _DHTShortArrayHead { Future _loadHead() async { // Get an updated head record copy if one exists final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer, - subkey: 0, refreshMode: DHTRecordRefreshMode.refresh); + subkey: 0, refreshMode: DHTRecordRefreshMode.network); if (head == null) { throw StateError('shortarray head missing during refresh'); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart index 88cefde..919564c 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_read.dart @@ -22,8 +22,8 @@ class _DHTShortArrayRead implements DHTRandomRead { final out = lookup.record.get( subkey: lookup.recordSubkey, refreshMode: refresh - ? DHTRecordRefreshMode.refresh - : DHTRecordRefreshMode.existing, + ? DHTRecordRefreshMode.network + : DHTRecordRefreshMode.cached, outSeqNum: outSeqNum); if (outSeqNum.value != null) { _head.updatePositionSeq(pos, false, outSeqNum.value!); diff --git a/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart b/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart index e28f703..ffd58f9 100644 --- a/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart +++ b/packages/veilid_support/lib/dht_support/src/interfaces/dht_openable.dart @@ -1,12 +1,13 @@ import 'dart:async'; -abstract class DHTOpenable { +abstract class DHTOpenable { bool get isOpen; + Future ref(); Future close(); Future delete(); } -extension DHTOpenableExt on D { +extension DHTOpenableExt> on D { /// Runs a closure that guarantees the DHTOpenable /// will be closed upon exit, even if an uncaught exception is thrown Future scope(Future Function(D) scopeFunction) async { diff --git a/packages/veilid_support/lib/src/identity.dart b/packages/veilid_support/lib/src/identity.dart index 2645894..5721461 100644 --- a/packages/veilid_support/lib/src/identity.dart +++ b/packages/veilid_support/lib/src/identity.dart @@ -301,7 +301,7 @@ Future openIdentityMaster( 'IdentityMaster::openIdentityMaster::IdentityMasterRecord')) .deleteScope((masterRec) async { final identityMaster = (await masterRec.getJson(IdentityMaster.fromJson, - refreshMode: DHTRecordRefreshMode.refresh))!; + refreshMode: DHTRecordRefreshMode.network))!; // Validate IdentityMaster final masterRecordKey = masterRec.key;