dht log passes tests

This commit is contained in:
Christien Rioux 2024-05-16 14:07:25 -04:00
parent 8cd73b2844
commit cf837e2176
17 changed files with 267 additions and 179 deletions

View File

@ -12,6 +12,8 @@ import 'test_dht_record_pool.dart';
import 'test_dht_short_array.dart';
void main() {
final startTime = DateTime.now();
IntegrationTestWidgetsFlutterBinding.ensureInitialized();
final veilidFixture =
DefaultVeilidFixture(programName: 'veilid_support integration test');
@ -39,26 +41,26 @@ void main() {
test('create pool', testDHTRecordPoolCreate);
// group('DHTRecordPool Tests', () {
// setUpAll(dhtRecordPoolFixture.setUp);
// tearDownAll(dhtRecordPoolFixture.tearDown);
group('DHTRecordPool Tests', () {
setUpAll(dhtRecordPoolFixture.setUp);
tearDownAll(dhtRecordPoolFixture.tearDown);
// test('create/delete record', testDHTRecordCreateDelete);
// test('record scopes', testDHTRecordScopes);
// test('create/delete deep record', testDHTRecordDeepCreateDelete);
// });
test('create/delete record', testDHTRecordCreateDelete);
test('record scopes', testDHTRecordScopes);
test('create/delete deep record', testDHTRecordDeepCreateDelete);
});
// group('DHTShortArray Tests', () {
// setUpAll(dhtRecordPoolFixture.setUp);
// tearDownAll(dhtRecordPoolFixture.tearDown);
group('DHTShortArray Tests', () {
setUpAll(dhtRecordPoolFixture.setUp);
tearDownAll(dhtRecordPoolFixture.tearDown);
// for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) {
// test('create shortarray stride=$stride',
// makeTestDHTShortArrayCreateDelete(stride: stride));
// test('add shortarray stride=$stride',
// makeTestDHTShortArrayAdd(stride: 256));
// }
// });
for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) {
test('create shortarray stride=$stride',
makeTestDHTShortArrayCreateDelete(stride: stride));
test('add shortarray stride=$stride',
makeTestDHTShortArrayAdd(stride: 256));
}
});
group('DHTLog Tests', () {
setUpAll(dhtRecordPoolFixture.setUp);
@ -75,4 +77,7 @@ void main() {
});
});
});
final endTime = DateTime.now();
print('Duration: ${endTime.difference(startTime)}');
}

View File

@ -42,7 +42,7 @@ Future<void> Function() makeTestDHTLogCreateDelete({required int stride}) =>
// Operate should still succeed because things aren't closed
expect(await dlog.operate((r) async => r.length), isZero);
await dlog.close();
await dlog.close();
await expectLater(() async => dlog.close(), throwsA(isA<StateError>()));
// Operate should fail
await expectLater(() async => dlog.operate((r) async => r.length),
throwsA(isA<StateError>()));
@ -51,8 +51,6 @@ Future<void> Function() makeTestDHTLogCreateDelete({required int stride}) =>
Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
() async {
final startTime = DateTime.now();
final dlog = await DHTLog.create(
debugName: 'log_add 1 stride $stride', stride: stride);
@ -121,10 +119,8 @@ Future<void> Function() makeTestDHTLogAddTruncate({required int stride}) =>
final dataset8 = await dlog.operate((r) async => r.getItemRange(0));
expect(dataset8, isEmpty);
}
print('delete and close\n');
await dlog.delete();
await dlog.close();
final endTime = DateTime.now();
print('Duration: ${endTime.difference(startTime)}');
};

View File

@ -48,7 +48,7 @@ Future<void> testDHTRecordCreateDelete() async {
// Set should succeed still
await rec3.tryWriteBytes(utf8.encode('test'));
await rec3.close();
await rec3.close();
await expectLater(() async => rec3.close(), throwsA(isA<StateError>()));
// Set should fail
await expectLater(() async => rec3.tryWriteBytes(utf8.encode('test')),
throwsA(isA<VeilidAPIException>()));
@ -84,7 +84,7 @@ Future<void> testDHTRecordScopes() async {
} on Exception {
assert(false, 'should not throw');
}
await rec2.close();
await expectLater(() async => rec2.close(), throwsA(isA<StateError>()));
await pool.deleteRecord(rec2.key);
}
@ -115,6 +115,7 @@ Future<void> testDHTRecordGetSet() async {
final val = await rec.get();
await pool.deleteRecord(rec.key);
expect(val, isNull);
await rec.close();
}
// Test set then get
@ -125,6 +126,7 @@ Future<void> testDHTRecordGetSet() async {
// Invalid subkey should throw
await expectLater(
() async => rec2.get(subkey: 1), throwsA(isA<VeilidAPIException>()));
await rec2.close();
await pool.deleteRecord(rec2.key);
}

View File

@ -43,7 +43,7 @@ Future<void> Function() makeTestDHTShortArrayCreateDelete(
// Operate should still succeed because things aren't closed
expect(await arr.operate((r) async => r.length), isZero);
await arr.close();
await arr.close();
await expectLater(() async => arr.close(), throwsA(isA<StateError>()));
// Operate should fail
await expectLater(() async => arr.operate((r) async => r.length),
throwsA(isA<StateError>()));
@ -52,8 +52,6 @@ Future<void> Function() makeTestDHTShortArrayCreateDelete(
Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
() async {
final startTime = DateTime.now();
final arr = await DHTShortArray.create(
debugName: 'sa_add 1 stride $stride', stride: stride);
@ -131,7 +129,4 @@ Future<void> Function() makeTestDHTShortArrayAdd({required int stride}) =>
await arr.delete();
await arr.close();
final endTime = DateTime.now();
print('Duration: ${endTime.difference(startTime)}');
};

View File

@ -42,11 +42,13 @@ class DHTLogUpdate extends Equatable {
/// * The head and tail position of the log
/// - subkeyIdx = pos / recordsPerSubkey
/// - recordIdx = pos % recordsPerSubkey
class DHTLog implements DHTOpenable {
class DHTLog implements DHTOpenable<DHTLog> {
////////////////////////////////////////////////////////////////
// Constructors
DHTLog._({required _DHTLogSpine spine}) : _spine = spine {
DHTLog._({required _DHTLogSpine spine})
: _spine = spine,
_openCount = 1 {
_spine.onUpdatedSpine = (update) {
_watchController?.sink.add(update);
};
@ -162,18 +164,29 @@ class DHTLog implements DHTOpenable {
/// Check if the DHTLog is open
@override
bool get isOpen => _spine.isOpen;
bool get isOpen => _openCount > 0;
/// Add a reference to this log
@override
Future<DHTLog> ref() async => _mutex.protect(() async {
_openCount++;
return this;
});
/// Free all resources for the DHTLog
@override
Future<void> close() async {
if (!isOpen) {
Future<void> close() async => _mutex.protect(() async {
if (_openCount == 0) {
throw StateError('already closed');
}
_openCount--;
if (_openCount != 0) {
return;
}
await _watchController?.close();
_watchController = null;
await _spine.close();
}
});
/// Free all resources for the DHTLog and delete it from the DHT
/// Will wait until the short array is closed to delete it
@ -284,6 +297,10 @@ class DHTLog implements DHTOpenable {
// Internal representation refreshed from spine record
final _DHTLogSpine _spine;
// Openable
int _openCount;
final _mutex = Mutex();
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
// Stream of external changes

View File

@ -17,7 +17,7 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
}
// Write item to the segment
return lookup.shortArray.operateWrite((write) async {
return lookup.shortArray.scope((sa) => sa.operateWrite((write) async {
// If this a new segment, then clear it in case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
@ -26,7 +26,7 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
throw StateError('appending should be at the end');
}
return write.tryAddItem(value);
});
}));
}
@override
@ -45,16 +45,19 @@ class _DHTLogAppend extends _DHTLogRead implements DHTAppendTruncateRandomRead {
}
final sacount = min(remaining, DHTShortArray.maxElements - lookup.pos);
final success = await lookup.shortArray.operateWrite((write) async {
// If this a new segment, then clear it in case we have wrapped around
final success =
await lookup.shortArray.scope((sa) => sa.operateWrite((write) async {
// If this a new segment, then clear it in
// case we have wrapped around
if (lookup.pos == 0) {
await write.clear();
} else if (lookup.pos != write.length) {
// We should always be appending at the length
throw StateError('appending should be at the end');
}
return write.tryAddItems(values.sublist(valueIdx, valueIdx + sacount));
});
return write
.tryAddItems(values.sublist(valueIdx, valueIdx + sacount));
}));
if (!success) {
return false;
}

View File

@ -19,8 +19,8 @@ class _DHTLogRead implements DHTRandomRead {
return null;
}
return lookup.shortArray.operate(
(read) => read.getItem(lookup.pos, forceRefresh: forceRefresh));
return lookup.shortArray.scope((sa) => sa.operate(
(read) => read.getItem(lookup.pos, forceRefresh: forceRefresh)));
}
(int, int) _clampStartLen(int start, int? len) {
@ -71,7 +71,7 @@ class _DHTLogRead implements DHTRandomRead {
// Check each segment for offline positions
var foundOffline = false;
await lookup.shortArray.operate((read) async {
await lookup.shortArray.scope((sa) => sa.operate((read) async {
final segmentOffline = await read.getOfflinePositions();
// For each shortarray segment go through their segment positions
@ -86,7 +86,7 @@ class _DHTLogRead implements DHTRandomRead {
foundOffline = true;
}
}
});
}));
// If we found nothing offline in this segment then we can stop
if (!foundOffline) {

View File

@ -15,6 +15,13 @@ class _DHTLogSegmentLookup extends Equatable {
List<Object?> get props => [subkey, segment];
}
class _SubkeyData {
_SubkeyData({required this.subkey, required this.data});
int subkey;
Uint8List data;
bool changed = false;
}
class _DHTLogSpine {
_DHTLogSpine._(
{required DHTRecord spineRecord,
@ -47,7 +54,7 @@ class _DHTLogSpine {
static Future<_DHTLogSpine> load({required DHTRecord spineRecord}) async {
// Get an updated spine head record copy if one exists
final spineHead = await spineRecord.getProtobuf(proto.DHTLog.fromBuffer,
subkey: 0, refreshMode: DHTRecordRefreshMode.refresh);
subkey: 0, refreshMode: DHTRecordRefreshMode.network);
if (spineHead == null) {
throw StateError('spine head missing during refresh');
}
@ -234,7 +241,7 @@ class _DHTLogSpine {
segmentKeyBytes);
}
Future<DHTShortArray> _getOrCreateSegmentInner(int segmentNumber) async {
Future<DHTShortArray> _openOrCreateSegmentInner(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
assert(_spineRecord.writer != null, 'should be writable');
@ -292,7 +299,7 @@ class _DHTLogSpine {
}
}
Future<DHTShortArray?> _getSegmentInner(int segmentNumber) async {
Future<DHTShortArray?> _openSegmentInner(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// Lookup what subkey and segment subrange has this position's segment
@ -321,7 +328,7 @@ class _DHTLogSpine {
return segmentRec;
}
Future<DHTShortArray> getOrCreateSegment(int segmentNumber) async {
Future<DHTShortArray> _openOrCreateSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// See if we already have this in the cache
@ -331,21 +338,22 @@ class _DHTLogSpine {
final x = _spineCache.removeAt(i);
_spineCache.add(x);
// Return the shortarray for this position
return x.$2;
return x.$2.ref();
}
}
// If we don't have it in the cache, get/create it and then cache it
final segment = await _getOrCreateSegmentInner(segmentNumber);
_spineCache.add((segmentNumber, segment));
// If we don't have it in the cache, get/create it and then cache a ref
final segment = await _openOrCreateSegmentInner(segmentNumber);
_spineCache.add((segmentNumber, await segment.ref()));
if (_spineCache.length > _spineCacheLength) {
// Trim the LRU cache
_spineCache.removeAt(0);
final (_, sa) = _spineCache.removeAt(0);
await sa.close();
}
return segment;
}
Future<DHTShortArray?> getSegment(int segmentNumber) async {
Future<DHTShortArray?> _openSegment(int segmentNumber) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
// See if we already have this in the cache
@ -355,19 +363,20 @@ class _DHTLogSpine {
final x = _spineCache.removeAt(i);
_spineCache.add(x);
// Return the shortarray for this position
return x.$2;
return x.$2.ref();
}
}
// If we don't have it in the cache, get it and then cache it
final segment = await _getSegmentInner(segmentNumber);
final segment = await _openSegmentInner(segmentNumber);
if (segment == null) {
return null;
}
_spineCache.add((segmentNumber, segment));
_spineCache.add((segmentNumber, await segment.ref()));
if (_spineCache.length > _spineCacheLength) {
// Trim the LRU cache
_spineCache.removeAt(0);
final (_, sa) = _spineCache.removeAt(0);
await sa.close();
}
return segment;
}
@ -409,8 +418,8 @@ class _DHTLogSpine {
// Get the segment shortArray
final shortArray = (_spineRecord.writer == null)
? await getSegment(segmentNumber)
: await getOrCreateSegment(segmentNumber);
? await _openSegment(segmentNumber)
: await _openOrCreateSegment(segmentNumber);
if (shortArray == null) {
return null;
}
@ -442,12 +451,16 @@ class _DHTLogSpine {
throw StateError('ring buffer underflow');
}
final oldHead = _head;
_head = (_head + count) % _positionLimit;
await _purgeUnusedSegments();
final newHead = _head;
await _purgeSegments(oldHead, newHead);
}
Future<void> _deleteSegmentsContiguous(int start, int end) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
DHTRecordPool.instance
.log('_deleteSegmentsContiguous: start=$start, end=$end');
final startSegmentNumber = start ~/ DHTShortArray.maxElements;
final startSegmentPos = start % DHTShortArray.maxElements;
@ -460,8 +473,7 @@ class _DHTLogSpine {
final lastDeleteSegment =
(endSegmentPos == 0) ? endSegmentNumber - 1 : endSegmentNumber - 2;
int? lastSubkey;
Uint8List? subkeyData;
_SubkeyData? lastSubkeyData;
for (var segmentNumber = firstDeleteSegment;
segmentNumber <= lastDeleteSegment;
segmentNumber++) {
@ -471,44 +483,48 @@ class _DHTLogSpine {
final subkey = l.subkey;
final segment = l.segment;
if (lastSubkey != subkey) {
if (subkey != lastSubkeyData?.subkey) {
// Flush subkey writes
if (lastSubkey != null) {
await _spineRecord.eventualWriteBytes(subkeyData!,
subkey: lastSubkey);
if (lastSubkeyData != null && lastSubkeyData.changed) {
await _spineRecord.eventualWriteBytes(lastSubkeyData.data,
subkey: lastSubkeyData.subkey);
}
xxx debug this, it takes forever
// Get next subkey
subkeyData = await _spineRecord.get(subkey: subkey);
if (subkeyData != null) {
lastSubkey = subkey;
// Get next subkey if available locally
final data = await _spineRecord.get(
subkey: subkey, refreshMode: DHTRecordRefreshMode.local);
if (data != null) {
lastSubkeyData = _SubkeyData(subkey: subkey, data: data);
} else {
lastSubkey = null;
lastSubkeyData = null;
// If the subkey was not available locally we can go to the
// last segment number at the end of this subkey
segmentNumber = ((subkey + 1) * DHTLog.segmentsPerSubkey) - 1;
}
}
if (subkeyData != null) {
final segmentKey = _getSegmentKey(subkeyData, segment);
if (lastSubkeyData != null) {
final segmentKey = _getSegmentKey(lastSubkeyData.data, segment);
if (segmentKey != null) {
await DHTRecordPool.instance.deleteRecord(segmentKey);
_setSegmentKey(subkeyData, segment, null);
_setSegmentKey(lastSubkeyData.data, segment, null);
lastSubkeyData.changed = true;
}
}
}
// Flush subkey writes
if (lastSubkey != null) {
await _spineRecord.eventualWriteBytes(subkeyData!, subkey: lastSubkey);
if (lastSubkeyData != null) {
await _spineRecord.eventualWriteBytes(lastSubkeyData.data,
subkey: lastSubkeyData.subkey);
}
}
Future<void> _purgeUnusedSegments() async {
Future<void> _purgeSegments(int from, int to) async {
assert(_spineMutex.isLocked, 'should be in mutex here');
if (_head < _tail) {
await _deleteSegmentsContiguous(0, _head);
await _deleteSegmentsContiguous(_tail, _positionLimit);
} else if (_head > _tail) {
await _deleteSegmentsContiguous(_tail, _head);
if (from < to) {
await _deleteSegmentsContiguous(from, to);
} else if (from > to) {
await _deleteSegmentsContiguous(from, _positionLimit);
await _deleteSegmentsContiguous(0, to);
}
}

View File

@ -39,7 +39,7 @@ class DefaultDHTRecordCubit<T> extends DHTRecordCubit<T> {
final firstSubkey = subkeys.firstOrNull!.low;
if (firstSubkey != defaultSubkey || updatedata == null) {
final maybeData =
await record.get(refreshMode: DHTRecordRefreshMode.refresh);
await record.get(refreshMode: DHTRecordRefreshMode.network);
if (maybeData == null) {
return null;
}

View File

@ -16,19 +16,27 @@ class DHTRecordWatchChange extends Equatable {
/// Refresh mode for DHT record 'get'
enum DHTRecordRefreshMode {
/// Return existing subkey values if they exist locally already
existing,
/// And then check the network for a newer value
/// This is the default refresh mode
cached,
/// Return existing subkey values only if they exist locally already
local,
/// Always check the network for a newer subkey value
refresh,
network,
/// Always check the network for a newer subkey value but only
/// return that value if its sequence number is newer than the local value
refreshOnlyUpdates,
update;
bool get _forceRefresh => this == network || this == update;
bool get _inspectLocal => this == local || this == update;
}
/////////////////////////////////////////////////
class DHTRecord implements DHTOpenable {
class DHTRecord implements DHTOpenable<DHTRecord> {
DHTRecord._(
{required VeilidRoutingContext routingContext,
required SharedDHTRecordData sharedDHTRecordData,
@ -40,7 +48,7 @@ class DHTRecord implements DHTOpenable {
_routingContext = routingContext,
_defaultSubkey = defaultSubkey,
_writer = writer,
_open = true,
_openCount = 1,
_sharedDHTRecordData = sharedDHTRecordData;
////////////////////////////////////////////////////////////////////////////
@ -48,25 +56,37 @@ class DHTRecord implements DHTOpenable {
/// Check if the DHTRecord is open
@override
bool get isOpen => _open;
bool get isOpen => _openCount > 0;
/// Add a reference to this DHTRecord
@override
Future<DHTRecord> ref() async => _mutex.protect(() async {
_openCount++;
return this;
});
/// Free all resources for the DHTRecord
@override
Future<void> close() async {
if (!_open) {
Future<void> close() async => _mutex.protect(() async {
if (_openCount == 0) {
throw StateError('already closed');
}
_openCount--;
if (_openCount != 0) {
return;
}
await watchController?.close();
await _watchController?.close();
_watchController = null;
await DHTRecordPool.instance._recordClosed(this);
_open = false;
}
});
/// Free all resources for the DHTRecord and delete it from the DHT
/// Will wait until the record is closed to delete it
@override
Future<void> delete() async {
Future<void> delete() async => _mutex.protect(() async {
await DHTRecordPool.instance.deleteRecord(key);
}
});
////////////////////////////////////////////////////////////////////////////
// Public API
@ -95,25 +115,37 @@ class DHTRecord implements DHTOpenable {
Future<Uint8List?> get(
{int subkey = -1,
DHTRecordCrypto? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
// Get the last sequence number if we need it
final lastSeq =
refreshMode._inspectLocal ? await _localSubkeySeq(subkey) : null;
// See if we only ever want the locally stored value
if (refreshMode == DHTRecordRefreshMode.local && lastSeq == null) {
// If it's not available locally already just return null now
return null;
}
final valueData = await _routingContext.getDHTValue(key, subkey,
forceRefresh: refreshMode != DHTRecordRefreshMode.existing);
forceRefresh: refreshMode._forceRefresh);
if (valueData == null) {
return null;
}
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
if (refreshMode == DHTRecordRefreshMode.refreshOnlyUpdates &&
// See if this get resulted in a newer sequence number
if (refreshMode == DHTRecordRefreshMode.update &&
lastSeq != null &&
valueData.seq <= lastSeq) {
// If we're only returning updates then punt now
return null;
}
// If we're returning a value, decrypt it
final out = (crypto ?? _crypto).decrypt(valueData.data, subkey);
if (outSeqNum != null) {
outSeqNum.save(valueData.seq);
}
_sharedDHTRecordData.subkeySeqCache[subkey] = valueData.seq;
return out;
}
@ -128,7 +160,7 @@ class DHTRecord implements DHTOpenable {
Future<T?> getJson<T>(T Function(dynamic) fromJson,
{int subkey = -1,
DHTRecordCrypto? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum}) async {
final data = await get(
subkey: subkey,
@ -154,7 +186,7 @@ class DHTRecord implements DHTOpenable {
T Function(List<int> i) fromBuffer,
{int subkey = -1,
DHTRecordCrypto? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.existing,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum}) async {
final data = await get(
subkey: subkey,
@ -176,7 +208,7 @@ class DHTRecord implements DHTOpenable {
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final lastSeq = await _localSubkeySeq(subkey);
final encryptedNewValue =
await (crypto ?? _crypto).encrypt(newValue, subkey);
@ -198,7 +230,6 @@ class DHTRecord implements DHTOpenable {
if (isUpdated && outSeqNum != null) {
outSeqNum.save(newValueData.seq);
}
_sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq;
// See if the encrypted data returned is exactly the same
// if so, shortcut and don't bother decrypting it
@ -228,7 +259,7 @@ class DHTRecord implements DHTOpenable {
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final lastSeq = await _localSubkeySeq(subkey);
final encryptedNewValue =
await (crypto ?? _crypto).encrypt(newValue, subkey);
@ -254,7 +285,6 @@ class DHTRecord implements DHTOpenable {
if (outSeqNum != null) {
outSeqNum.save(newValueData.seq);
}
_sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq;
// The encrypted data returned should be exactly the same
// as what we are trying to set,
@ -402,13 +432,13 @@ class DHTRecord implements DHTOpenable {
DHTRecordCrypto? crypto,
}) async {
// Set up watch requirements
watchController ??=
_watchController ??=
StreamController<DHTRecordWatchChange>.broadcast(onCancel: () {
// If there are no more listeners then we can get rid of the controller
watchController = null;
_watchController = null;
});
return watchController!.stream.listen(
return _watchController!.stream.listen(
(change) {
if (change.local && !localChanges) {
return;
@ -431,8 +461,8 @@ class DHTRecord implements DHTOpenable {
},
cancelOnError: true,
onError: (e) async {
await watchController!.close();
watchController = null;
await _watchController!.close();
_watchController = null;
});
}
@ -455,6 +485,14 @@ class DHTRecord implements DHTOpenable {
//////////////////////////////////////////////////////////////////////////
Future<int?> _localSubkeySeq(int subkey) async {
final rr = await _routingContext.inspectDHTRecord(
key,
subkeys: [ValueSubkeyRange.single(subkey)],
);
return rr.localSeqs.firstOrNull ?? 0xFFFFFFFF;
}
void _addValueChange(
{required bool local,
required Uint8List? data,
@ -464,7 +502,7 @@ class DHTRecord implements DHTOpenable {
final watchedSubkeys = ws.subkeys;
if (watchedSubkeys == null) {
// Report all subkeys
watchController?.add(
_watchController?.add(
DHTRecordWatchChange(local: local, data: data, subkeys: subkeys));
} else {
// Only some subkeys are being watched, see if the reported update
@ -479,7 +517,7 @@ class DHTRecord implements DHTOpenable {
overlappedFirstSubkey == updateFirstSubkey ? data : null;
// Report only watched subkeys
watchController?.add(DHTRecordWatchChange(
_watchController?.add(DHTRecordWatchChange(
local: local, data: updatedData, subkeys: overlappedSubkeys));
}
}
@ -504,10 +542,9 @@ class DHTRecord implements DHTOpenable {
final KeyPair? _writer;
final DHTRecordCrypto _crypto;
final String debugName;
bool _open;
@internal
StreamController<DHTRecordWatchChange>? watchController;
final _mutex = Mutex();
int _openCount;
StreamController<DHTRecordWatchChange>? _watchController;
@internal
WatchState? watchState;
}

View File

@ -93,7 +93,7 @@ class DHTRecordCubit<T> extends Cubit<AsyncValue<T>> {
for (final skr in subkeys) {
for (var sk = skr.low; sk <= skr.high; sk++) {
final data = await _record.get(
subkey: sk, refreshMode: DHTRecordRefreshMode.refreshOnlyUpdates);
subkey: sk, refreshMode: DHTRecordRefreshMode.update);
if (data != null) {
final newState = await _stateFunction(_record, updateSubkeys, data);
if (newState != null) {

View File

@ -88,7 +88,6 @@ class SharedDHTRecordData {
DHTRecordDescriptor recordDescriptor;
KeyPair? defaultWriter;
VeilidRoutingContext defaultRoutingContext;
Map<int, int> subkeySeqCache = {};
bool needsWatchStateUpdate = false;
WatchState? unionWatchState;
}

View File

@ -13,12 +13,13 @@ part 'dht_short_array_write.dart';
///////////////////////////////////////////////////////////////////////
class DHTShortArray implements DHTOpenable {
class DHTShortArray implements DHTOpenable<DHTShortArray> {
////////////////////////////////////////////////////////////////
// Constructors
DHTShortArray._({required DHTRecord headRecord})
: _head = _DHTShortArrayHead(headRecord: headRecord) {
: _head = _DHTShortArrayHead(headRecord: headRecord),
_openCount = 1 {
_head.onUpdatedHead = () {
_watchController?.sink.add(null);
};
@ -139,18 +140,30 @@ class DHTShortArray implements DHTOpenable {
/// Check if the shortarray is open
@override
bool get isOpen => _head.isOpen;
bool get isOpen => _openCount > 0;
/// Add a reference to this shortarray
@override
Future<DHTShortArray> ref() async => _mutex.protect(() async {
_openCount++;
return this;
});
/// Free all resources for the DHTShortArray
@override
Future<void> close() async {
if (!isOpen) {
Future<void> close() async => _mutex.protect(() async {
if (_openCount == 0) {
throw StateError('already closed');
}
_openCount--;
if (_openCount != 0) {
return;
}
await _watchController?.close();
_watchController = null;
await _head.close();
}
});
/// Free all resources for the DHTShortArray and delete it from the DHT
/// Will wait until the short array is closed to delete it
@ -255,6 +268,10 @@ class DHTShortArray implements DHTOpenable {
// Internal representation refreshed from head record
final _DHTShortArrayHead _head;
// Openable
int _openCount;
final _mutex = Mutex();
// Watch mutex to ensure we keep the representation valid
final Mutex _listenMutex = Mutex();
// Stream of external changes

View File

@ -248,7 +248,7 @@ class _DHTShortArrayHead {
Future<void> _loadHead() async {
// Get an updated head record copy if one exists
final head = await _headRecord.getProtobuf(proto.DHTShortArray.fromBuffer,
subkey: 0, refreshMode: DHTRecordRefreshMode.refresh);
subkey: 0, refreshMode: DHTRecordRefreshMode.network);
if (head == null) {
throw StateError('shortarray head missing during refresh');
}

View File

@ -22,8 +22,8 @@ class _DHTShortArrayRead implements DHTRandomRead {
final out = lookup.record.get(
subkey: lookup.recordSubkey,
refreshMode: refresh
? DHTRecordRefreshMode.refresh
: DHTRecordRefreshMode.existing,
? DHTRecordRefreshMode.network
: DHTRecordRefreshMode.cached,
outSeqNum: outSeqNum);
if (outSeqNum.value != null) {
_head.updatePositionSeq(pos, false, outSeqNum.value!);

View File

@ -1,12 +1,13 @@
import 'dart:async';
abstract class DHTOpenable {
abstract class DHTOpenable<C> {
bool get isOpen;
Future<C> ref();
Future<void> close();
Future<void> delete();
}
extension DHTOpenableExt<D extends DHTOpenable> on D {
extension DHTOpenableExt<D extends DHTOpenable<D>> on D {
/// Runs a closure that guarantees the DHTOpenable
/// will be closed upon exit, even if an uncaught exception is thrown
Future<T> scope<T>(Future<T> Function(D) scopeFunction) async {

View File

@ -301,7 +301,7 @@ Future<IdentityMaster> openIdentityMaster(
'IdentityMaster::openIdentityMaster::IdentityMasterRecord'))
.deleteScope((masterRec) async {
final identityMaster = (await masterRec.getJson(IdentityMaster.fromJson,
refreshMode: DHTRecordRefreshMode.refresh))!;
refreshMode: DHTRecordRefreshMode.network))!;
// Validate IdentityMaster
final masterRecordKey = masterRec.key;