From b013e5efc9f4aee4df1012b69ae53e65dbffe6e4 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 19 Dec 2025 23:20:54 -0500 Subject: [PATCH] transactions work --- .../example/integration_test/app_test.dart | 10 +- .../test_dht_record_pool.dart | 12 +- .../src/dht_log/dht_log_spine.dart | 8 +- .../src/dht_record/dht_record.dart | 13 +- .../src/dht_record/dht_record_group.dart | 7 +- .../src/dht_record/dht_record_pool.dart | 11 +- .../src/dht_record/dht_transaction.dart | 320 ++++++++++++-- .../src/dht_short_array/dht_short_array.dart | 4 +- .../dht_short_array/dht_short_array_head.dart | 396 ++++++++++-------- 9 files changed, 542 insertions(+), 239 deletions(-) diff --git a/packages/veilid_support/example/integration_test/app_test.dart b/packages/veilid_support/example/integration_test/app_test.dart index 9f4dece..816d089 100644 --- a/packages/veilid_support/example/integration_test/app_test.dart +++ b/packages/veilid_support/example/integration_test/app_test.dart @@ -76,7 +76,10 @@ void main() { () => dhtRecordPoolFixture.setUp(defaultKind: cryptoKindVLD0)); tearDownAll(dhtRecordPoolFixture.tearDown); - for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + for (final stride in [ + DHTShortArray.maxElements, + DHTShortArray.minStride + ]) { test('dht_short_array:create_stride_$stride', makeTestDHTShortArrayCreateDelete(stride: stride)); test('dht_short_array:add_stride_$stride', @@ -91,7 +94,10 @@ void main() { () => dhtRecordPoolFixture.setUp(defaultKind: cryptoKindVLD0)); tearDownAll(dhtRecordPoolFixture.tearDown); - for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) { + for (final stride in [ + DHTShortArray.maxElements, + DHTShortArray.minStride + ]) { test('dht_log:create_stride_$stride', makeTestDHTLogCreateDelete(stride: stride)); test( diff --git a/packages/veilid_support/example/integration_test/test_dht_record_pool.dart b/packages/veilid_support/example/integration_test/test_dht_record_pool.dart index b26da08..ea39012 100644 --- a/packages/veilid_support/example/integration_test/test_dht_record_pool.dart +++ b/packages/veilid_support/example/integration_test/test_dht_record_pool.dart @@ -113,7 +113,7 @@ Future testDHTRecordGetSet() async { // Test get without set { final rec = await pool.createRecord(debugName: 'test_get_set 1'); - final val = await rec.get(); + final val = await rec.getBytes(); await pool.deleteRecord(rec.key); expect(val, isNull); await rec.close(); @@ -123,10 +123,10 @@ Future testDHTRecordGetSet() async { { final rec2 = await pool.createRecord(debugName: 'test_get_set 2'); expect(await rec2.tryWriteBytes(valdata), isNull); - expect(await rec2.get(), equals(valdata)); + expect(await rec2.getBytes(), equals(valdata)); // Invalid subkey should throw - await expectLater( - () async => rec2.get(subkey: 1), throwsA(isA())); + await expectLater(() async => rec2.getBytes(subkey: 1), + throwsA(isA())); await rec2.close(); await pool.deleteRecord(rec2.key); } @@ -135,12 +135,12 @@ Future testDHTRecordGetSet() async { { final rec3 = await pool.createRecord(debugName: 'test_get_set 3'); expect(await rec3.tryWriteBytes(valdata), isNull); - expect(await rec3.get(), equals(valdata)); + expect(await rec3.getBytes(), equals(valdata)); await rec3.close(); await pool.deleteRecord(rec3.key); final rec4 = await pool.openRecordRead(rec3.key, debugName: 'test_get_set 4'); - expect(await rec4.get(), equals(valdata)); + expect(await rec4.getBytes(), equals(valdata)); await rec4.close(); await pool.deleteRecord(rec4.key); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart index 5cc9045..a2118b1 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_log/dht_log_spine.dart @@ -397,7 +397,7 @@ class _DHTLogSpine { final segment = l.segment; try { - var subkeyData = await _spineRecord.get(subkey: subkey); + var subkeyData = await _spineRecord.getBytes(subkey: subkey); subkeyData ??= _makeEmptySubkey(); while (true) { @@ -466,7 +466,7 @@ class _DHTLogSpine { // See if we have the segment key locally try { RecordKey? segmentKey; - var subkeyData = await _spineRecord.get( + var subkeyData = await _spineRecord.getBytes( subkey: subkey, refreshMode: DHTRecordRefreshMode.local, ); @@ -475,7 +475,7 @@ class _DHTLogSpine { } if (segmentKey == null) { // If not, try from the network - subkeyData = await _spineRecord.get( + subkeyData = await _spineRecord.getBytes( subkey: subkey, refreshMode: DHTRecordRefreshMode.network, ); @@ -684,7 +684,7 @@ class _DHTLogSpine { } // Get next subkey if available locally - final data = await _spineRecord.get( + final data = await _spineRecord.getBytes( subkey: subkey, refreshMode: DHTRecordRefreshMode.local, ); diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index f9646cb..5778e40 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -22,13 +22,14 @@ class DHTRecordWatchChange extends Equatable { enum DHTRecordRefreshMode { /// Return existing subkey values if they exist locally already /// If not, check the network for a value - /// This is the default refresh mode + /// This is the default refresh mode for non-transactional gets cached, /// Return existing subkey values only if they exist locally already local, /// Always check the network for a newer subkey value + /// This is the default refresh mode for transactional gets network, /// Always check the network for a newer subkey value but only @@ -153,12 +154,12 @@ class DHTRecord implements DHTDeleteable { /// value or always check the network /// * 'outSeqNum' optionally returns the sequence number of the value being /// returned if one was returned. - Future get({ + Future getBytes({ int subkey = -1, CryptoCodec? crypto, DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, Output? outSeqNum, - }) => _wrapStats('get', () async { + }) => _wrapStats('getBytes', () async { subkey = subkeyOrDefault(subkey); // Get the last sequence number if we need it @@ -213,7 +214,7 @@ class DHTRecord implements DHTDeleteable { DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, Output? outSeqNum, }) async { - final data = await get( + final data = await getBytes( subkey: subkey, crypto: crypto, refreshMode: refreshMode, @@ -240,7 +241,7 @@ class DHTRecord implements DHTDeleteable { DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached, Output? outSeqNum, }) async { - final data = await get( + final data = await getBytes( subkey: subkey, crypto: crypto, refreshMode: refreshMode, @@ -397,7 +398,7 @@ class DHTRecord implements DHTDeleteable { // Get the existing data, do not allow force refresh here // because if we need a refresh the setDHTValue will fail anyway - var oldValue = await get( + var oldValue = await getBytes( subkey: subkey, crypto: crypto, outSeqNum: outSeqNum, diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_group.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_group.dart index 8e2a1ef..96b8097 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_group.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_group.dart @@ -356,6 +356,9 @@ class DHTRecordGroupTransaction { } } + /// Debug name + String get debugName => _inner._dhttx._inner.debugName ?? ''; + /// Check if the transaction is done bool get isDone => _inner._dhttx.isDone; @@ -378,7 +381,7 @@ class DHTRecordGroupTransaction { } subkey = member._record.subkeyOrDefault(subkey); - return _inner._dhttx.get(member._record.key, subkey); + return _inner._dhttx.getBytes(member._record.key, subkey); } /// Perform a set on a member within the transaction @@ -394,7 +397,7 @@ class DHTRecordGroupTransaction { subkey = member._record.subkeyOrDefault(subkey); - return _inner._dhttx.set( + return _inner._dhttx.tryWriteBytes( member._record.key, subkey, data, diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index d90e606..9ef1d58 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -979,9 +979,18 @@ class DHTRecordPool with TableDBBackedJson { records.map((rec) => rec.key).toList(), options: options, ); + + // Pull the whole sequence number set for all records + final recordInfos = await records + .map( + (x) async => + (x, await dhttx.inspect(x.key, scope: DHTReportScope.syncGet)), + ) + .wait; + return DHTTransaction._( pool: this, - records: records, + recordInfos: recordInfos, dhttx: dhttx, debugName: debugName, ); diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_transaction.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_transaction.dart index f680caa..b51f855 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_transaction.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_transaction.dart @@ -1,11 +1,19 @@ part of 'dht_record_pool.dart'; +class DHTTransactionRecordInfo { + DHTRecord record; + DHTRecordReport report; + List valueChanges; + + DHTTransactionRecordInfo(this.record, this.report, this.valueChanges); +} + class DHTTransactionInner implements Finalize { - DHTRecordPool pool; - List records; - VeilidDHTTransaction dhttx; - String? debugName; - DHTTransactionInner(this.pool, this.records, this.dhttx, this.debugName); + final DHTRecordPool pool; + final Map recordInfos; + final VeilidDHTTransaction dhttx; + final String? debugName; + DHTTransactionInner(this.pool, this.recordInfos, this.dhttx, this.debugName); // Called by finalizer if commit or rollback was forgotten before drop @override @@ -14,15 +22,13 @@ class DHTTransactionInner implements Finalize { if (!dhttx.isDone) { pool.log( 'DHTTransaction($debugName) had to be finalized. ' - 'Rollback or Commit should be explicit. ' - '$dhttx over $records', + 'Rollback or Commit should be explicit.', ); - } else if (records.isNotEmpty) { + } else if (recordInfos.isNotEmpty) { pool.log( 'DHTTransaction($debugName) had to be finalized ' 'even though it was done. ' - 'This should not be a reachable condition. ' - '$dhttx over $records', + 'This should not be a reachable condition.', ); } // Always do this regardless of error type @@ -35,14 +41,10 @@ class DHTTransactionInner implements Finalize { // Called by commit, rollback, or finalizer to close out the object Future close() async { if (!dhttx.isDone) { - pool.log( - 'DHTTransaction($debugName) should have been finished before close. ' - '$dhttx over $records', - ); await dhttx.rollback(); } - await records.map((x) => x.close()).wait; - records.clear(); + await recordInfos.values.map((x) => x.record.close()).wait; + recordInfos.clear(); } } @@ -55,45 +57,266 @@ class DHTTransaction { DHTTransaction._({ required DHTRecordPool pool, - required List records, + required List recordInfos, required VeilidDHTTransaction dhttx, required String? debugName, - }) : _inner = DHTTransactionInner(pool, records, dhttx, debugName) { + }) : _inner = DHTTransactionInner( + pool, + {for (final r in recordInfos) r.record.key: r}, + dhttx, + debugName, + ) { // Ref all the records. Unref'd in close() - for (final rec in _inner.records) { - rec.ref(); + for (final recordInfo in recordInfos) { + recordInfo.record.ref(); } // Attach finalizer to ensure things clean up even if the // user forgets to close() _finalizer.attach(this, _inner, detach: this); } - /// Check if the transaction is done - bool get isDone => _inner.dhttx.isDone; - //////////////////////////////////////////////////////////////////////////// // Public Interface - Future get(RecordKey key, int subkey) => - _inner.dhttx.get(key, subkey); + /// Check if the transaction is done + bool get isDone => _inner.dhttx.isDone; - Future set( + /// Get a record's subkey value from this transaction + /// Returns the most recent value data for this subkey or null if this subkey + /// has not yet been written to. + /// * 'refreshMode' determines whether or not to return a locally existing + /// value or always check the network + /// * 'outSeqNum' optionally returns the sequence number of the value being + /// returned if one was returned. + Future getBytes( + RecordKey key, { + int subkey = -1, + CryptoCodec? crypto, + DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.network, + Output? outSeqNum, + }) { + final recordInfo = _inner.recordInfos[key]!; + final record = recordInfo.record; + subkey = record.subkeyOrDefault(subkey); + final lastSeq = _localSubkeySeq(recordInfo, subkey); + final networkSeq = _networkSubkeySeq(recordInfo, subkey); + + return _wrapStats(record, 'DHTTransaction::getBytes', () async { + Uint8List data; + int seq; + + switch (refreshMode) { + case DHTRecordRefreshMode.cached: + if (lastSeq != null) { + // Available locally, return it + final localValueData = await record.routingContext.getDHTValue( + record.key, + subkey, + ); + data = localValueData!.data; + seq = lastSeq; + } else { + // Get it from the transaction + if (networkSeq == null) { + return null; + } + final valueData = (await _inner.dhttx.get(key, subkey))!; + data = valueData.data; + seq = valueData.seq; + } + case DHTRecordRefreshMode.local: + if (lastSeq == null) { + // If it's not available locally already just return null now + return null; + } + // Available locally, return it + final localValueData = await record.routingContext.getDHTValue( + record.key, + subkey, + ); + data = localValueData!.data; + seq = lastSeq; + case DHTRecordRefreshMode.network: + // Get it from the transaction + if (networkSeq == null) { + return null; + } + final valueData = (await _inner.dhttx.get(key, subkey))!; + data = valueData.data; + seq = valueData.seq; + case DHTRecordRefreshMode.update: + if (networkSeq == null || + (lastSeq != null && networkSeq <= lastSeq)) { + // If we're only returning updates then punt now + return null; + } + // Get it from the transaction + final valueData = (await _inner.dhttx.get(key, subkey))!; + data = valueData.data; + seq = valueData.seq; + } + + // If we're returning a value, decrypt it + final out = (crypto ?? record._crypto).decrypt(data); + if (outSeqNum != null) { + outSeqNum.save(seq); + } + return out; + }); + } + + xxx do json and migrated get + + + /// Attempt to write a byte buffer to a DHTTransaction record subkey + /// If a newer value was found on the network, it is returned + /// If the value was succesfully written, null is returned + Future tryWriteBytes( RecordKey key, - int subkey, - Uint8List data, { + Uint8List newValue, { + int subkey = -1, + CryptoCodec? crypto, DHTTransactionSetValueOptions? options, - }) => _inner.dhttx.set(key, subkey, data, options: options); + Output? outSeqNum, + }) { + final recordInfo = _inner.recordInfos[key]!; + final record = recordInfo.record; + subkey = record.subkeyOrDefault(subkey); + final lastSeq = _localSubkeySeq(recordInfo, subkey); + return _wrapStats(record, 'DHTTransaction::getBytes', () async { + final encryptedNewValue = await (crypto ?? record._crypto).encrypt( + newValue, + ); + + final newValueData = await dhtRetryLoop( + () => _inner.dhttx.set( + key, + subkey, + encryptedNewValue, + options: DHTTransactionSetValueOptions( + writer: options?.writer ?? record._writer, + ), + ), + ); + + // Record new sequence number + int newSeqNum; + if (newValueData == null) { + newSeqNum = (lastSeq != null ? lastSeq + 1 : 0); + } else { + newSeqNum = newValueData.seq; + } + final isUpdated = newSeqNum != lastSeq; + if (outSeqNum != null) { + outSeqNum.save(newSeqNum); + } + + // See if the encrypted data returned is exactly the same + // if so, shortcut and don't bother decrypting it + if (newValueData == null || newValueData.data.equals(encryptedNewValue)) { + if (isUpdated) { + _addValueChange(recordInfo, subkey); + } + return null; + } + + // Decrypt value to return it + final decryptedNewValue = await (crypto ?? record._crypto).decrypt( + newValueData.data, + ); + if (isUpdated) { + _addValueChange(recordInfo, subkey); + } + return decryptedNewValue; + }); + } + + /// Like 'set' but with JSON marshal/unmarshal of the value + Future setJson( + RecordKey key, + T Function(dynamic) fromJson, + T newValue, { + int subkey = -1, + CryptoCodec? crypto, + DHTTransactionSetValueOptions? options, + Output? outSeqNum, + }) => + tryWriteBytes( + key, + jsonEncodeBytes(newValue), + subkey: subkey, + crypto: crypto, + options: options, + outSeqNum: outSeqNum, + ).then((out) { + if (out == null) { + return null; + } + return jsonDecodeBytes(fromJson, out); + }); + + /// Like 'tryWriteBytes' but with migrated marshal/unmarshal of the value + Future?> tryWriteMigrated( + MigrationCodec migrationCodec, + T newValue, { + int subkey = -1, + CryptoCodec? crypto, + SetDHTValueOptions? options, + Output? outSeqNum, + }) => + tryWriteBytes( + migrationCodec.toBytes(newValue), + subkey: subkey, + crypto: crypto, + options: options, + outSeqNum: outSeqNum, + ).then((out) { + if (out == null) { + return null; + } + return migrationCodec.fromBytes(out); + }); + + /// Return the inspection state of a set of subkeys of a record in + /// the DHTTransaction. + /// See Veilid's 'inspectDHTRecord' call for details on how this works Future inspect( RecordKey key, { List? subkeys, DHTReportScope scope = DHTReportScope.local, - }) => _inner.dhttx.inspect(key, subkeys: subkeys, scope: scope); + }) async { + final recordInfo = _inner.recordInfos[key]!; + // Shortcut if we already have the report, because it never changes + // until the commit + if ((subkeys == null || subkeys.equals(recordInfo.report.subkeys)) && + scope == DHTReportScope.syncGet) { + return recordInfo.report; + } + + // Get a new report + return _inner.dhttx.inspect(key, subkeys: subkeys, scope: scope); + } + + /// Apply all changes locally and remotely for all transactional + /// gets and sets. No changes will be made remotely or locally until this is + /// called. Note that even if you only 'get' values, a commit can still + /// make local changes if the values retrieved from the network are newer + /// than the previous local values. Future commit() async { try { await _inner.dhttx.commit(); + // Notify all value changes to local-aware watches + for (final recordInfo in _inner.recordInfos.values) { + recordInfo.record._addValueChange( + local: true, + data: null, + subkeys: recordInfo.valueChanges, + ); + } + // Always do this regardless of error type // ignore: avoid_catches_without_on_clauses } finally { @@ -102,14 +325,35 @@ class DHTTransaction { } } + /// Drop all changes locally and remotely for all transactional + /// gets and sets. No changes will be made remotely or locally if this + /// is called. Future rollback() async { - try { - await _inner.dhttx.rollback(); - // Always do this regardless of error type - // ignore: avoid_catches_without_on_clauses - } finally { - await _inner.records.map((x) => x.close()).wait; - _finalizer.detach(this); - } + await _inner.close(); + _finalizer.detach(this); } + + //////////////////////////////////////////////////////////////////////////// + // Private Implementation + + int? _localSubkeySeq(DHTTransactionRecordInfo recordInfo, int subkey) => + recordInfo.report.localSeqs[subkey]; + + int? _networkSubkeySeq(DHTTransactionRecordInfo recordInfo, int subkey) => + recordInfo.report.networkSeqs[subkey]; + + void _addValueChange(DHTTransactionRecordInfo recordInfo, int subkey) { + recordInfo.valueChanges = recordInfo.valueChanges.insertSubkey(subkey); + } + + Future _wrapStats( + DHTRecord record, + String func, + Future Function() closure, + ) => DHTRecordPool.instance._stats.measure( + record.key, + record.debugName, + func, + closure, + ); } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart index 9ac4c96..6b28fc2 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array.dart @@ -20,6 +20,7 @@ class DHTShortArray implements DHTDeleteable { // Fields static const maxElements = 256; + static const minStride = ((1024 * 1024) ~/ ValueData.maxLen) - 1; // 31 // Internal representation refreshed from head record final _DHTShortArrayHead _head; @@ -247,9 +248,6 @@ class DHTShortArray implements DHTDeleteable { /// Runs a closure allowing read-only access to the shortarray Future operate( Future Function(DHTShortArrayReadOperations) closure, - { - transaction: - } ) { if (!isOpen) { throw StateError('short array is not open"'); diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index 6e46cd9..fcfeb67 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -2,9 +2,7 @@ part of 'dht_short_array.dart'; class DHTShortArrayHeadLookup { final DHTRecord record; - final int recordSubkey; - final int? seq; DHTShortArrayHeadLookup({ @@ -14,18 +12,8 @@ class DHTShortArrayHeadLookup { }); } -class _DHTShortArrayHead { - //////////////////////////////////////////////////////////////////////////// - - // Head/element mutex to ensure we keep the representation valid - final _headMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); - - // Subscription to head record internal changes - StreamSubscription? _subscription; - - // Notify closure for external head changes - void Function()? onUpdatedHead; - +/// Mutex-locked inner class +class _DHTShortArrayHeadInner { // Head DHT record transactional group final DHTRecordGroup _headGroup; @@ -57,38 +45,45 @@ class _DHTShortArrayHead { // Migration codec static final _migrationCodec = DHTShortArrayMigrationCodec(); - _DHTShortArrayHead({required DHTRecordGroupMember headRecord}) - : _headGroup = DHTRecordGroup( - members: [headRecord], - debugName: '_DHTShortArrayHead', - ), - _linkedMembers = [], - _index = [], - _free = [], - _seqs = [], - _localSeqs = [] { - _calculateStride(); + // The current transaction if we have one + DHTRecordGroupTransaction? _currentTransaction; + + // Number of elements in short array + int get length => _index.length; + + //////////////////////////////////////////////////////////////////////////// + + _DHTShortArrayHeadInner({ + required DHTRecordGroup headGroup, + required int stride, + }) : _headGroup = headGroup, + _stride = stride, + _linkedMembers = [], + _index = [], + _free = [], + _seqs = [], + _localSeqs = [], + _currentTransaction = null { + // } - void _calculateStride() { - switch (_headGroup.single.schema) { - case DHTSchemaDFLT(oCnt: final oCnt): - if (oCnt <= 1) { - throw StateError('Invalid DFLT schema in DHTShortArray'); - } - _stride = oCnt - 1; - case DHTSchemaSMPL(oCnt: final oCnt, members: final members): - if (oCnt != 0 || members.length != 1 || members[0].mCnt <= 1) { - throw StateError('Invalid SMPL schema in DHTShortArray'); - } - _stride = members[0].mCnt - 1; + Future close() async { + if (!_headGroup.isOpen) { + throw StateError('should never close twice'); } - assert(_stride <= DHTShortArray.maxElements, 'stride too long'); + final currentTransaction = _currentTransaction; + _currentTransaction = null; + if (currentTransaction != null) { + DHTRecordPool.instance.log( + '_DHTShortArrayHeadInner(${_headGroup.single.debugName}): ' + 'should not close during transaction', + ); + await currentTransaction.rollback(); + } + await [_headGroup.close(), ..._linkedMembers.map((x) => x.close())].wait; } proto.DHTShortArray _toProto() { - assert(_headMutex.isLocked, 'should be in mutex here'); - final head = proto.DHTShortArray(); head.keys.addAll(_linkedMembers.map((lr) => lr.key.toProto())); head.index = List.of(_index); @@ -98,144 +93,11 @@ class _DHTShortArrayHead { return head; } - RecordKey get recordKey => _headGroup.single.key; - KeyPair? get writer => _headGroup.single.writer; - OwnedDHTRecordPointer? get recordPointer => - _headGroup.single.ownedDHTRecordPointer; - - int get length => _index.length; - - bool get isOpen => _headGroup.isDone; - - Future close() async { - await _headMutex.protect(() async { - if (!isOpen) { - throw StateError('should never close twice'); - } - await [_headGroup.close(), ..._linkedMembers.map((x) => x.close())].wait; - }); - } - - /// Returns true if all of the record deletions were processed immediately - /// Returns false if any of the records were marked to be deleted later - Future delete() => _headMutex.protect( - () async => (await [ - _headGroup.delete(), - ..._linkedMembers.map((x) => x.delete()), - ].wait).reduce((value, element) => value && element), - ); - - Future operate(Future Function(_DHTShortArrayHead) closure) => - _headMutex.protect(() => closure(this)); - - Future operateWrite(Future Function(_DHTShortArrayHead) closure) => - _headMutex.protect(() async { - final oldLinkedRecords = List.of(_linkedRecords); - final oldIndex = List.of(_index); - final oldFree = List.of(_free); - final oldSeqs = List.of(_seqs); - try { - final out = await closure(this); - // Write head assuming it has been changed - if (!await _writeHead(allowOffline: false)) { - // Failed to write head means head got overwritten so write should - // be considered failed - throw const DHTExceptionOutdated(); - } - - onUpdatedHead?.call(); - return out; - } on Exception { - // Exception means state needs to be reverted - _linkedRecords = oldLinkedRecords; - _index = oldIndex; - _free = oldFree; - _seqs = oldSeqs; - - rethrow; - } - }); - - Future operateWriteEventual( - Future Function(_DHTShortArrayHead) closure, { - Duration? timeout, - }) { - final timeoutTs = timeout == null - ? null - : Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout)); - - return _headMutex.protect(() async { - late List oldLinkedRecords; - late List oldIndex; - late List oldFree; - late List oldSeqs; - - late T out; - try { - // Iterate until we have a successful element and head write - - do { - // Save off old values each pass of tryWriteHead because the head - // will have changed - oldLinkedRecords = List.of(_linkedRecords); - oldIndex = List.of(_index); - oldFree = List.of(_free); - oldSeqs = List.of(_seqs); - - // Try to do the element write - while (true) { - if (timeoutTs != null) { - final now = Veilid.instance.now(); - if (now >= timeoutTs) { - throw TimeoutException('timeout reached'); - } - } - try { - out = await closure(this); - break; - } on DHTExceptionOutdated { - // Failed to write in closure resets state - _linkedRecords = List.of(oldLinkedRecords); - _index = List.of(oldIndex); - _free = List.of(oldFree); - _seqs = List.of(oldSeqs); - } on Exception { - // Failed to write in closure resets state - _linkedRecords = List.of(oldLinkedRecords); - _index = List.of(oldIndex); - _free = List.of(oldFree); - _seqs = List.of(oldSeqs); - rethrow; - } - } - // Try to do the head write - } while (!await _writeHead(allowOffline: false)); - - onUpdatedHead?.call(); - } on Exception { - // Exception means state needs to be reverted - _linkedRecords = oldLinkedRecords; - _index = oldIndex; - _free = oldFree; - _seqs = oldSeqs; - - rethrow; - } - return out; - }); - } -} - -abstract class DHTShortArrayHeadOperations {} - -class _DHTShortArrayHeadOperations implements DHTShortArrayHeadOperations { - /// Serialize and write out the current head record, possibly updating it - /// if a newer copy is available online. Returns true if the write was + /// Serialize and write out the current head record and any other changes, + /// possibly updating it if a newer copy is available online. Returns true if the write was /// successful - Future _writeHead({required bool allowOffline}) async { - assert(_headMutex.isLocked, 'should be in mutex here'); - - final existingHead = await _headRecord.tryWriteMigrated( + Future _writeTransaction() async { + final existingHead = await _currentTransaction!.set(tryWriteMigrated( _migrationCodec, _toProto(), options: SetDHTValueOptions(allowOffline: allowOffline), @@ -558,6 +420,186 @@ class _DHTShortArrayHeadOperations implements DHTShortArrayHeadOperations { _seqs[idx] = newSeq; } } +} + +class _DHTShortArrayHead { + //////////////////////////////////////////////////////////////////////////// + + // Head/element mutex to ensure we keep the representation valid + final _headMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null); + + // Mutex locked inner class + late final _inner; + + // Head record group + final DHTRecordGroupMember _headRecord; + + // Subscription to head record internal changes + StreamSubscription? _subscription; + + // Notify closure for external head changes + void Function()? onUpdatedHead; + + // True if things are still open + bool _isOpen; + + //////////////////////////////////////////////////////////////////////////// + + _DHTShortArrayHead({required DHTRecordGroupMember headRecord}) + : _headRecord = headRecord, + _isOpen = true, + _inner = _DHTShortArrayHeadInner( + headGroup: DHTRecordGroup( + members: [headRecord], + debugName: '_DHTShortArrayHead', + ), + stride: _calculateStride(headRecord), + ); + + static int _calculateStride(DHTRecordGroupMember headRecord) { + late int stride; + switch (headRecord.schema) { + case DHTSchemaDFLT(oCnt: final oCnt): + if (oCnt <= 1) { + throw StateError('Invalid DFLT schema in DHTShortArray'); + } + stride = oCnt - 1; + case DHTSchemaSMPL(oCnt: final oCnt, members: final members): + if (oCnt != 0 || members.length != 1 || members[0].mCnt <= 1) { + throw StateError('Invalid SMPL schema in DHTShortArray'); + } + stride = members[0].mCnt - 1; + } + assert(stride <= DHTShortArray.maxElements, 'stride too long'); + assert(stride >= DHTShortArray.minStride, 'stride too short'); + + return stride; + } + + RecordKey get recordKey => _headRecord.key; + KeyPair? get writer => _headRecord.writer; + OwnedDHTRecordPointer? get recordPointer => _headRecord.ownedDHTRecordPointer; + bool get isOpen => _isOpen; + + Future close() async { + await _headMutex.protect(() async { + if (!_isOpen) { + throw StateError('should never close twice'); + } + await _inner.close(); + _isOpen = false; + }); + } + + /// Returns true if all of the record deletions were processed immediately + /// Returns false if any of the records were marked to be deleted later + Future delete() => _headMutex.protect( + () async => (await [ + _headGroup.delete(), + ..._linkedMembers.map((x) => x.delete()), + ].wait).reduce((value, element) => value && element), + ); + + Future operate( + Future Function(_DHTShortArrayHeadOperations) closure, + ) => _headMutex.protect(() => closure(_inner)); + + Future operateWrite(Future Function(_DHTShortArrayHead) closure) => + _headMutex.protect(() async { + final oldLinkedRecords = List.of(_linkedRecords); + final oldIndex = List.of(_index); + final oldFree = List.of(_free); + final oldSeqs = List.of(_seqs); + try { + final out = await closure(this); + // Write head assuming it has been changed + if (!await _writeHead(allowOffline: false)) { + // Failed to write head means head got overwritten so write should + // be considered failed + throw const DHTExceptionOutdated(); + } + + onUpdatedHead?.call(); + return out; + } on Exception { + // Exception means state needs to be reverted + _linkedRecords = oldLinkedRecords; + _index = oldIndex; + _free = oldFree; + _seqs = oldSeqs; + + rethrow; + } + }); + + Future operateWriteEventual( + Future Function(_DHTShortArrayHead) closure, { + Duration? timeout, + }) { + final timeoutTs = timeout == null + ? null + : Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout)); + + return _headMutex.protect(() async { + late List oldLinkedRecords; + late List oldIndex; + late List oldFree; + late List oldSeqs; + + late T out; + try { + // Iterate until we have a successful element and head write + + do { + // Save off old values each pass of tryWriteHead because the head + // will have changed + oldLinkedRecords = List.of(_linkedRecords); + oldIndex = List.of(_index); + oldFree = List.of(_free); + oldSeqs = List.of(_seqs); + + // Try to do the element write + while (true) { + if (timeoutTs != null) { + final now = Veilid.instance.now(); + if (now >= timeoutTs) { + throw TimeoutException('timeout reached'); + } + } + try { + out = await closure(this); + break; + } on DHTExceptionOutdated { + // Failed to write in closure resets state + _linkedRecords = List.of(oldLinkedRecords); + _index = List.of(oldIndex); + _free = List.of(oldFree); + _seqs = List.of(oldSeqs); + } on Exception { + // Failed to write in closure resets state + _linkedRecords = List.of(oldLinkedRecords); + _index = List.of(oldIndex); + _free = List.of(oldFree); + _seqs = List.of(oldSeqs); + rethrow; + } + } + // Try to do the head write + } while (!await _writeHead(allowOffline: false)); + + onUpdatedHead?.call(); + } on Exception { + // Exception means state needs to be reverted + _linkedRecords = oldLinkedRecords; + _index = oldIndex; + _free = oldFree; + _seqs = oldSeqs; + + rethrow; + } + return out; + }); + } ///////////////////////////////////////////////////////////////////////////// // Watch For Updates