transactions work

This commit is contained in:
Christien Rioux 2025-12-19 23:20:54 -05:00
parent fb6bedebb3
commit b013e5efc9
9 changed files with 542 additions and 239 deletions

View file

@ -76,7 +76,10 @@ void main() {
() => dhtRecordPoolFixture.setUp(defaultKind: cryptoKindVLD0));
tearDownAll(dhtRecordPoolFixture.tearDown);
for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) {
for (final stride in [
DHTShortArray.maxElements,
DHTShortArray.minStride
]) {
test('dht_short_array:create_stride_$stride',
makeTestDHTShortArrayCreateDelete(stride: stride));
test('dht_short_array:add_stride_$stride',
@ -91,7 +94,10 @@ void main() {
() => dhtRecordPoolFixture.setUp(defaultKind: cryptoKindVLD0));
tearDownAll(dhtRecordPoolFixture.tearDown);
for (final stride in [256, 16 /*64, 32, 16, 8, 4, 2, 1 */]) {
for (final stride in [
DHTShortArray.maxElements,
DHTShortArray.minStride
]) {
test('dht_log:create_stride_$stride',
makeTestDHTLogCreateDelete(stride: stride));
test(

View file

@ -113,7 +113,7 @@ Future<void> testDHTRecordGetSet() async {
// Test get without set
{
final rec = await pool.createRecord(debugName: 'test_get_set 1');
final val = await rec.get();
final val = await rec.getBytes();
await pool.deleteRecord(rec.key);
expect(val, isNull);
await rec.close();
@ -123,10 +123,10 @@ Future<void> testDHTRecordGetSet() async {
{
final rec2 = await pool.createRecord(debugName: 'test_get_set 2');
expect(await rec2.tryWriteBytes(valdata), isNull);
expect(await rec2.get(), equals(valdata));
expect(await rec2.getBytes(), equals(valdata));
// Invalid subkey should throw
await expectLater(
() async => rec2.get(subkey: 1), throwsA(isA<VeilidAPIException>()));
await expectLater(() async => rec2.getBytes(subkey: 1),
throwsA(isA<VeilidAPIException>()));
await rec2.close();
await pool.deleteRecord(rec2.key);
}
@ -135,12 +135,12 @@ Future<void> testDHTRecordGetSet() async {
{
final rec3 = await pool.createRecord(debugName: 'test_get_set 3');
expect(await rec3.tryWriteBytes(valdata), isNull);
expect(await rec3.get(), equals(valdata));
expect(await rec3.getBytes(), equals(valdata));
await rec3.close();
await pool.deleteRecord(rec3.key);
final rec4 =
await pool.openRecordRead(rec3.key, debugName: 'test_get_set 4');
expect(await rec4.get(), equals(valdata));
expect(await rec4.getBytes(), equals(valdata));
await rec4.close();
await pool.deleteRecord(rec4.key);
}

View file

@ -397,7 +397,7 @@ class _DHTLogSpine {
final segment = l.segment;
try {
var subkeyData = await _spineRecord.get(subkey: subkey);
var subkeyData = await _spineRecord.getBytes(subkey: subkey);
subkeyData ??= _makeEmptySubkey();
while (true) {
@ -466,7 +466,7 @@ class _DHTLogSpine {
// See if we have the segment key locally
try {
RecordKey? segmentKey;
var subkeyData = await _spineRecord.get(
var subkeyData = await _spineRecord.getBytes(
subkey: subkey,
refreshMode: DHTRecordRefreshMode.local,
);
@ -475,7 +475,7 @@ class _DHTLogSpine {
}
if (segmentKey == null) {
// If not, try from the network
subkeyData = await _spineRecord.get(
subkeyData = await _spineRecord.getBytes(
subkey: subkey,
refreshMode: DHTRecordRefreshMode.network,
);
@ -684,7 +684,7 @@ class _DHTLogSpine {
}
// Get next subkey if available locally
final data = await _spineRecord.get(
final data = await _spineRecord.getBytes(
subkey: subkey,
refreshMode: DHTRecordRefreshMode.local,
);

View file

@ -22,13 +22,14 @@ class DHTRecordWatchChange extends Equatable {
enum DHTRecordRefreshMode {
/// Return existing subkey values if they exist locally already
/// If not, check the network for a value
/// This is the default refresh mode
/// This is the default refresh mode for non-transactional gets
cached,
/// Return existing subkey values only if they exist locally already
local,
/// Always check the network for a newer subkey value
/// This is the default refresh mode for transactional gets
network,
/// Always check the network for a newer subkey value but only
@ -153,12 +154,12 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
/// value or always check the network
/// * 'outSeqNum' optionally returns the sequence number of the value being
/// returned if one was returned.
Future<Uint8List?> get({
Future<Uint8List?> getBytes({
int subkey = -1,
CryptoCodec? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum,
}) => _wrapStats('get', () async {
}) => _wrapStats('getBytes', () async {
subkey = subkeyOrDefault(subkey);
// Get the last sequence number if we need it
@ -213,7 +214,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum,
}) async {
final data = await get(
final data = await getBytes(
subkey: subkey,
crypto: crypto,
refreshMode: refreshMode,
@ -240,7 +241,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.cached,
Output<int>? outSeqNum,
}) async {
final data = await get(
final data = await getBytes(
subkey: subkey,
crypto: crypto,
refreshMode: refreshMode,
@ -397,7 +398,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
// Get the existing data, do not allow force refresh here
// because if we need a refresh the setDHTValue will fail anyway
var oldValue = await get(
var oldValue = await getBytes(
subkey: subkey,
crypto: crypto,
outSeqNum: outSeqNum,

View file

@ -356,6 +356,9 @@ class DHTRecordGroupTransaction {
}
}
/// Debug name
String get debugName => _inner._dhttx._inner.debugName ?? '';
/// Check if the transaction is done
bool get isDone => _inner._dhttx.isDone;
@ -378,7 +381,7 @@ class DHTRecordGroupTransaction {
}
subkey = member._record.subkeyOrDefault(subkey);
return _inner._dhttx.get(member._record.key, subkey);
return _inner._dhttx.getBytes(member._record.key, subkey);
}
/// Perform a set on a member within the transaction
@ -394,7 +397,7 @@ class DHTRecordGroupTransaction {
subkey = member._record.subkeyOrDefault(subkey);
return _inner._dhttx.set(
return _inner._dhttx.tryWriteBytes(
member._record.key,
subkey,
data,

View file

@ -979,9 +979,18 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
records.map((rec) => rec.key).toList(),
options: options,
);
// Pull the whole sequence number set for all records
final recordInfos = await records
.map(
(x) async =>
(x, await dhttx.inspect(x.key, scope: DHTReportScope.syncGet)),
)
.wait;
return DHTTransaction._(
pool: this,
records: records,
recordInfos: recordInfos,
dhttx: dhttx,
debugName: debugName,
);

View file

@ -1,11 +1,19 @@
part of 'dht_record_pool.dart';
class DHTTransactionRecordInfo {
DHTRecord record;
DHTRecordReport report;
List<ValueSubkeyRange> valueChanges;
DHTTransactionRecordInfo(this.record, this.report, this.valueChanges);
}
class DHTTransactionInner implements Finalize {
DHTRecordPool pool;
List<DHTRecord> records;
VeilidDHTTransaction dhttx;
String? debugName;
DHTTransactionInner(this.pool, this.records, this.dhttx, this.debugName);
final DHTRecordPool pool;
final Map<RecordKey, DHTTransactionRecordInfo> recordInfos;
final VeilidDHTTransaction dhttx;
final String? debugName;
DHTTransactionInner(this.pool, this.recordInfos, this.dhttx, this.debugName);
// Called by finalizer if commit or rollback was forgotten before drop
@override
@ -14,15 +22,13 @@ class DHTTransactionInner implements Finalize {
if (!dhttx.isDone) {
pool.log(
'DHTTransaction($debugName) had to be finalized. '
'Rollback or Commit should be explicit. '
'$dhttx over $records',
'Rollback or Commit should be explicit.',
);
} else if (records.isNotEmpty) {
} else if (recordInfos.isNotEmpty) {
pool.log(
'DHTTransaction($debugName) had to be finalized '
'even though it was done. '
'This should not be a reachable condition. '
'$dhttx over $records',
'This should not be a reachable condition.',
);
}
// Always do this regardless of error type
@ -35,14 +41,10 @@ class DHTTransactionInner implements Finalize {
// Called by commit, rollback, or finalizer to close out the object
Future<void> close() async {
if (!dhttx.isDone) {
pool.log(
'DHTTransaction($debugName) should have been finished before close. '
'$dhttx over $records',
);
await dhttx.rollback();
}
await records.map((x) => x.close()).wait;
records.clear();
await recordInfos.values.map((x) => x.record.close()).wait;
recordInfos.clear();
}
}
@ -55,45 +57,266 @@ class DHTTransaction {
DHTTransaction._({
required DHTRecordPool pool,
required List<DHTRecord> records,
required List<DHTTransactionRecordInfo> recordInfos,
required VeilidDHTTransaction dhttx,
required String? debugName,
}) : _inner = DHTTransactionInner(pool, records, dhttx, debugName) {
}) : _inner = DHTTransactionInner(
pool,
{for (final r in recordInfos) r.record.key: r},
dhttx,
debugName,
) {
// Ref all the records. Unref'd in close()
for (final rec in _inner.records) {
rec.ref();
for (final recordInfo in recordInfos) {
recordInfo.record.ref();
}
// Attach finalizer to ensure things clean up even if the
// user forgets to close()
_finalizer.attach(this, _inner, detach: this);
}
/// Check if the transaction is done
bool get isDone => _inner.dhttx.isDone;
////////////////////////////////////////////////////////////////////////////
// Public Interface
Future<ValueData?> get(RecordKey key, int subkey) =>
_inner.dhttx.get(key, subkey);
/// Check if the transaction is done
bool get isDone => _inner.dhttx.isDone;
Future<ValueData?> set(
/// Get a record's subkey value from this transaction
/// Returns the most recent value data for this subkey or null if this subkey
/// has not yet been written to.
/// * 'refreshMode' determines whether or not to return a locally existing
/// value or always check the network
/// * 'outSeqNum' optionally returns the sequence number of the value being
/// returned if one was returned.
Future<Uint8List?> getBytes(
RecordKey key, {
int subkey = -1,
CryptoCodec? crypto,
DHTRecordRefreshMode refreshMode = DHTRecordRefreshMode.network,
Output<int>? outSeqNum,
}) {
final recordInfo = _inner.recordInfos[key]!;
final record = recordInfo.record;
subkey = record.subkeyOrDefault(subkey);
final lastSeq = _localSubkeySeq(recordInfo, subkey);
final networkSeq = _networkSubkeySeq(recordInfo, subkey);
return _wrapStats(record, 'DHTTransaction::getBytes', () async {
Uint8List data;
int seq;
switch (refreshMode) {
case DHTRecordRefreshMode.cached:
if (lastSeq != null) {
// Available locally, return it
final localValueData = await record.routingContext.getDHTValue(
record.key,
subkey,
);
data = localValueData!.data;
seq = lastSeq;
} else {
// Get it from the transaction
if (networkSeq == null) {
return null;
}
final valueData = (await _inner.dhttx.get(key, subkey))!;
data = valueData.data;
seq = valueData.seq;
}
case DHTRecordRefreshMode.local:
if (lastSeq == null) {
// If it's not available locally already just return null now
return null;
}
// Available locally, return it
final localValueData = await record.routingContext.getDHTValue(
record.key,
subkey,
);
data = localValueData!.data;
seq = lastSeq;
case DHTRecordRefreshMode.network:
// Get it from the transaction
if (networkSeq == null) {
return null;
}
final valueData = (await _inner.dhttx.get(key, subkey))!;
data = valueData.data;
seq = valueData.seq;
case DHTRecordRefreshMode.update:
if (networkSeq == null ||
(lastSeq != null && networkSeq <= lastSeq)) {
// If we're only returning updates then punt now
return null;
}
// Get it from the transaction
final valueData = (await _inner.dhttx.get(key, subkey))!;
data = valueData.data;
seq = valueData.seq;
}
// If we're returning a value, decrypt it
final out = (crypto ?? record._crypto).decrypt(data);
if (outSeqNum != null) {
outSeqNum.save(seq);
}
return out;
});
}
xxx do json and migrated get
/// Attempt to write a byte buffer to a DHTTransaction record subkey
/// If a newer value was found on the network, it is returned
/// If the value was succesfully written, null is returned
Future<Uint8List?> tryWriteBytes(
RecordKey key,
int subkey,
Uint8List data, {
Uint8List newValue, {
int subkey = -1,
CryptoCodec? crypto,
DHTTransactionSetValueOptions? options,
}) => _inner.dhttx.set(key, subkey, data, options: options);
Output<int>? outSeqNum,
}) {
final recordInfo = _inner.recordInfos[key]!;
final record = recordInfo.record;
subkey = record.subkeyOrDefault(subkey);
final lastSeq = _localSubkeySeq(recordInfo, subkey);
return _wrapStats(record, 'DHTTransaction::getBytes', () async {
final encryptedNewValue = await (crypto ?? record._crypto).encrypt(
newValue,
);
final newValueData = await dhtRetryLoop(
() => _inner.dhttx.set(
key,
subkey,
encryptedNewValue,
options: DHTTransactionSetValueOptions(
writer: options?.writer ?? record._writer,
),
),
);
// Record new sequence number
int newSeqNum;
if (newValueData == null) {
newSeqNum = (lastSeq != null ? lastSeq + 1 : 0);
} else {
newSeqNum = newValueData.seq;
}
final isUpdated = newSeqNum != lastSeq;
if (outSeqNum != null) {
outSeqNum.save(newSeqNum);
}
// See if the encrypted data returned is exactly the same
// if so, shortcut and don't bother decrypting it
if (newValueData == null || newValueData.data.equals(encryptedNewValue)) {
if (isUpdated) {
_addValueChange(recordInfo, subkey);
}
return null;
}
// Decrypt value to return it
final decryptedNewValue = await (crypto ?? record._crypto).decrypt(
newValueData.data,
);
if (isUpdated) {
_addValueChange(recordInfo, subkey);
}
return decryptedNewValue;
});
}
/// Like 'set' but with JSON marshal/unmarshal of the value
Future<T?> setJson<T>(
RecordKey key,
T Function(dynamic) fromJson,
T newValue, {
int subkey = -1,
CryptoCodec? crypto,
DHTTransactionSetValueOptions? options,
Output<int>? outSeqNum,
}) =>
tryWriteBytes(
key,
jsonEncodeBytes(newValue),
subkey: subkey,
crypto: crypto,
options: options,
outSeqNum: outSeqNum,
).then((out) {
if (out == null) {
return null;
}
return jsonDecodeBytes(fromJson, out);
});
/// Like 'tryWriteBytes' but with migrated marshal/unmarshal of the value
Future<MigratedValue<T>?> tryWriteMigrated<T>(
MigrationCodec<T> migrationCodec,
T newValue, {
int subkey = -1,
CryptoCodec? crypto,
SetDHTValueOptions? options,
Output<int>? outSeqNum,
}) =>
tryWriteBytes(
migrationCodec.toBytes(newValue),
subkey: subkey,
crypto: crypto,
options: options,
outSeqNum: outSeqNum,
).then((out) {
if (out == null) {
return null;
}
return migrationCodec.fromBytes(out);
});
/// Return the inspection state of a set of subkeys of a record in
/// the DHTTransaction.
/// See Veilid's 'inspectDHTRecord' call for details on how this works
Future<DHTRecordReport> inspect(
RecordKey key, {
List<ValueSubkeyRange>? subkeys,
DHTReportScope scope = DHTReportScope.local,
}) => _inner.dhttx.inspect(key, subkeys: subkeys, scope: scope);
}) async {
final recordInfo = _inner.recordInfos[key]!;
// Shortcut if we already have the report, because it never changes
// until the commit
if ((subkeys == null || subkeys.equals(recordInfo.report.subkeys)) &&
scope == DHTReportScope.syncGet) {
return recordInfo.report;
}
// Get a new report
return _inner.dhttx.inspect(key, subkeys: subkeys, scope: scope);
}
/// Apply all changes locally and remotely for all transactional
/// gets and sets. No changes will be made remotely or locally until this is
/// called. Note that even if you only 'get' values, a commit can still
/// make local changes if the values retrieved from the network are newer
/// than the previous local values.
Future<void> commit() async {
try {
await _inner.dhttx.commit();
// Notify all value changes to local-aware watches
for (final recordInfo in _inner.recordInfos.values) {
recordInfo.record._addValueChange(
local: true,
data: null,
subkeys: recordInfo.valueChanges,
);
}
// Always do this regardless of error type
// ignore: avoid_catches_without_on_clauses
} finally {
@ -102,14 +325,35 @@ class DHTTransaction {
}
}
/// Drop all changes locally and remotely for all transactional
/// gets and sets. No changes will be made remotely or locally if this
/// is called.
Future<void> rollback() async {
try {
await _inner.dhttx.rollback();
// Always do this regardless of error type
// ignore: avoid_catches_without_on_clauses
} finally {
await _inner.records.map((x) => x.close()).wait;
_finalizer.detach(this);
}
await _inner.close();
_finalizer.detach(this);
}
////////////////////////////////////////////////////////////////////////////
// Private Implementation
int? _localSubkeySeq(DHTTransactionRecordInfo recordInfo, int subkey) =>
recordInfo.report.localSeqs[subkey];
int? _networkSubkeySeq(DHTTransactionRecordInfo recordInfo, int subkey) =>
recordInfo.report.networkSeqs[subkey];
void _addValueChange(DHTTransactionRecordInfo recordInfo, int subkey) {
recordInfo.valueChanges = recordInfo.valueChanges.insertSubkey(subkey);
}
Future<T> _wrapStats<T>(
DHTRecord record,
String func,
Future<T> Function() closure,
) => DHTRecordPool.instance._stats.measure(
record.key,
record.debugName,
func,
closure,
);
}

View file

@ -20,6 +20,7 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray> {
// Fields
static const maxElements = 256;
static const minStride = ((1024 * 1024) ~/ ValueData.maxLen) - 1; // 31
// Internal representation refreshed from head record
final _DHTShortArrayHead _head;
@ -247,9 +248,6 @@ class DHTShortArray implements DHTDeleteable<DHTShortArray> {
/// Runs a closure allowing read-only access to the shortarray
Future<T> operate<T>(
Future<T> Function(DHTShortArrayReadOperations) closure,
{
transaction:
}
) {
if (!isOpen) {
throw StateError('short array is not open"');

View file

@ -2,9 +2,7 @@ part of 'dht_short_array.dart';
class DHTShortArrayHeadLookup {
final DHTRecord record;
final int recordSubkey;
final int? seq;
DHTShortArrayHeadLookup({
@ -14,18 +12,8 @@ class DHTShortArrayHeadLookup {
});
}
class _DHTShortArrayHead {
////////////////////////////////////////////////////////////////////////////
// Head/element mutex to ensure we keep the representation valid
final _headMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external head changes
void Function()? onUpdatedHead;
/// Mutex-locked inner class
class _DHTShortArrayHeadInner {
// Head DHT record transactional group
final DHTRecordGroup _headGroup;
@ -57,38 +45,45 @@ class _DHTShortArrayHead {
// Migration codec
static final _migrationCodec = DHTShortArrayMigrationCodec();
_DHTShortArrayHead({required DHTRecordGroupMember headRecord})
: _headGroup = DHTRecordGroup(
members: [headRecord],
debugName: '_DHTShortArrayHead',
),
_linkedMembers = [],
_index = [],
_free = [],
_seqs = [],
_localSeqs = [] {
_calculateStride();
// The current transaction if we have one
DHTRecordGroupTransaction? _currentTransaction;
// Number of elements in short array
int get length => _index.length;
////////////////////////////////////////////////////////////////////////////
_DHTShortArrayHeadInner({
required DHTRecordGroup headGroup,
required int stride,
}) : _headGroup = headGroup,
_stride = stride,
_linkedMembers = [],
_index = [],
_free = [],
_seqs = [],
_localSeqs = [],
_currentTransaction = null {
//
}
void _calculateStride() {
switch (_headGroup.single.schema) {
case DHTSchemaDFLT(oCnt: final oCnt):
if (oCnt <= 1) {
throw StateError('Invalid DFLT schema in DHTShortArray');
}
_stride = oCnt - 1;
case DHTSchemaSMPL(oCnt: final oCnt, members: final members):
if (oCnt != 0 || members.length != 1 || members[0].mCnt <= 1) {
throw StateError('Invalid SMPL schema in DHTShortArray');
}
_stride = members[0].mCnt - 1;
Future<void> close() async {
if (!_headGroup.isOpen) {
throw StateError('should never close twice');
}
assert(_stride <= DHTShortArray.maxElements, 'stride too long');
final currentTransaction = _currentTransaction;
_currentTransaction = null;
if (currentTransaction != null) {
DHTRecordPool.instance.log(
'_DHTShortArrayHeadInner(${_headGroup.single.debugName}): '
'should not close during transaction',
);
await currentTransaction.rollback();
}
await [_headGroup.close(), ..._linkedMembers.map((x) => x.close())].wait;
}
proto.DHTShortArray _toProto() {
assert(_headMutex.isLocked, 'should be in mutex here');
final head = proto.DHTShortArray();
head.keys.addAll(_linkedMembers.map((lr) => lr.key.toProto()));
head.index = List.of(_index);
@ -98,144 +93,11 @@ class _DHTShortArrayHead {
return head;
}
RecordKey get recordKey => _headGroup.single.key;
KeyPair? get writer => _headGroup.single.writer;
OwnedDHTRecordPointer? get recordPointer =>
_headGroup.single.ownedDHTRecordPointer;
int get length => _index.length;
bool get isOpen => _headGroup.isDone;
Future<void> close() async {
await _headMutex.protect(() async {
if (!isOpen) {
throw StateError('should never close twice');
}
await [_headGroup.close(), ..._linkedMembers.map((x) => x.close())].wait;
});
}
/// Returns true if all of the record deletions were processed immediately
/// Returns false if any of the records were marked to be deleted later
Future<bool> delete() => _headMutex.protect(
() async => (await [
_headGroup.delete(),
..._linkedMembers.map((x) => x.delete()),
].wait).reduce((value, element) => value && element),
);
Future<T> operate<T>(Future<T> Function(_DHTShortArrayHead) closure) =>
_headMutex.protect(() => closure(this));
Future<T> operateWrite<T>(Future<T> Function(_DHTShortArrayHead) closure) =>
_headMutex.protect(() async {
final oldLinkedRecords = List.of(_linkedRecords);
final oldIndex = List.of(_index);
final oldFree = List.of(_free);
final oldSeqs = List.of(_seqs);
try {
final out = await closure(this);
// Write head assuming it has been changed
if (!await _writeHead(allowOffline: false)) {
// Failed to write head means head got overwritten so write should
// be considered failed
throw const DHTExceptionOutdated();
}
onUpdatedHead?.call();
return out;
} on Exception {
// Exception means state needs to be reverted
_linkedRecords = oldLinkedRecords;
_index = oldIndex;
_free = oldFree;
_seqs = oldSeqs;
rethrow;
}
});
Future<T> operateWriteEventual<T>(
Future<T> Function(_DHTShortArrayHead) closure, {
Duration? timeout,
}) {
final timeoutTs = timeout == null
? null
: Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout));
return _headMutex.protect(() async {
late List<DHTRecord> oldLinkedRecords;
late List<int> oldIndex;
late List<int> oldFree;
late List<int?> oldSeqs;
late T out;
try {
// Iterate until we have a successful element and head write
do {
// Save off old values each pass of tryWriteHead because the head
// will have changed
oldLinkedRecords = List.of(_linkedRecords);
oldIndex = List.of(_index);
oldFree = List.of(_free);
oldSeqs = List.of(_seqs);
// Try to do the element write
while (true) {
if (timeoutTs != null) {
final now = Veilid.instance.now();
if (now >= timeoutTs) {
throw TimeoutException('timeout reached');
}
}
try {
out = await closure(this);
break;
} on DHTExceptionOutdated {
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);
_free = List.of(oldFree);
_seqs = List.of(oldSeqs);
} on Exception {
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);
_free = List.of(oldFree);
_seqs = List.of(oldSeqs);
rethrow;
}
}
// Try to do the head write
} while (!await _writeHead(allowOffline: false));
onUpdatedHead?.call();
} on Exception {
// Exception means state needs to be reverted
_linkedRecords = oldLinkedRecords;
_index = oldIndex;
_free = oldFree;
_seqs = oldSeqs;
rethrow;
}
return out;
});
}
}
abstract class DHTShortArrayHeadOperations {}
class _DHTShortArrayHeadOperations implements DHTShortArrayHeadOperations {
/// Serialize and write out the current head record, possibly updating it
/// if a newer copy is available online. Returns true if the write was
/// Serialize and write out the current head record and any other changes,
/// possibly updating it if a newer copy is available online. Returns true if the write was
/// successful
Future<bool> _writeHead({required bool allowOffline}) async {
assert(_headMutex.isLocked, 'should be in mutex here');
final existingHead = await _headRecord.tryWriteMigrated(
Future<bool> _writeTransaction() async {
final existingHead = await _currentTransaction!.set(tryWriteMigrated(
_migrationCodec,
_toProto(),
options: SetDHTValueOptions(allowOffline: allowOffline),
@ -558,6 +420,186 @@ class _DHTShortArrayHeadOperations implements DHTShortArrayHeadOperations {
_seqs[idx] = newSeq;
}
}
}
class _DHTShortArrayHead {
////////////////////////////////////////////////////////////////////////////
// Head/element mutex to ensure we keep the representation valid
final _headMutex = Mutex(debugLockTimeout: kIsDebugMode ? 60 : null);
// Mutex locked inner class
late final _inner;
// Head record group
final DHTRecordGroupMember _headRecord;
// Subscription to head record internal changes
StreamSubscription<DHTRecordWatchChange>? _subscription;
// Notify closure for external head changes
void Function()? onUpdatedHead;
// True if things are still open
bool _isOpen;
////////////////////////////////////////////////////////////////////////////
_DHTShortArrayHead({required DHTRecordGroupMember headRecord})
: _headRecord = headRecord,
_isOpen = true,
_inner = _DHTShortArrayHeadInner(
headGroup: DHTRecordGroup(
members: [headRecord],
debugName: '_DHTShortArrayHead',
),
stride: _calculateStride(headRecord),
);
static int _calculateStride(DHTRecordGroupMember headRecord) {
late int stride;
switch (headRecord.schema) {
case DHTSchemaDFLT(oCnt: final oCnt):
if (oCnt <= 1) {
throw StateError('Invalid DFLT schema in DHTShortArray');
}
stride = oCnt - 1;
case DHTSchemaSMPL(oCnt: final oCnt, members: final members):
if (oCnt != 0 || members.length != 1 || members[0].mCnt <= 1) {
throw StateError('Invalid SMPL schema in DHTShortArray');
}
stride = members[0].mCnt - 1;
}
assert(stride <= DHTShortArray.maxElements, 'stride too long');
assert(stride >= DHTShortArray.minStride, 'stride too short');
return stride;
}
RecordKey get recordKey => _headRecord.key;
KeyPair? get writer => _headRecord.writer;
OwnedDHTRecordPointer? get recordPointer => _headRecord.ownedDHTRecordPointer;
bool get isOpen => _isOpen;
Future<void> close() async {
await _headMutex.protect(() async {
if (!_isOpen) {
throw StateError('should never close twice');
}
await _inner.close();
_isOpen = false;
});
}
/// Returns true if all of the record deletions were processed immediately
/// Returns false if any of the records were marked to be deleted later
Future<bool> delete() => _headMutex.protect(
() async => (await [
_headGroup.delete(),
..._linkedMembers.map((x) => x.delete()),
].wait).reduce((value, element) => value && element),
);
Future<T> operate<T>(
Future<T> Function(_DHTShortArrayHeadOperations) closure,
) => _headMutex.protect(() => closure(_inner));
Future<T> operateWrite<T>(Future<T> Function(_DHTShortArrayHead) closure) =>
_headMutex.protect(() async {
final oldLinkedRecords = List.of(_linkedRecords);
final oldIndex = List.of(_index);
final oldFree = List.of(_free);
final oldSeqs = List.of(_seqs);
try {
final out = await closure(this);
// Write head assuming it has been changed
if (!await _writeHead(allowOffline: false)) {
// Failed to write head means head got overwritten so write should
// be considered failed
throw const DHTExceptionOutdated();
}
onUpdatedHead?.call();
return out;
} on Exception {
// Exception means state needs to be reverted
_linkedRecords = oldLinkedRecords;
_index = oldIndex;
_free = oldFree;
_seqs = oldSeqs;
rethrow;
}
});
Future<T> operateWriteEventual<T>(
Future<T> Function(_DHTShortArrayHead) closure, {
Duration? timeout,
}) {
final timeoutTs = timeout == null
? null
: Veilid.instance.now().offset(TimestampDuration.fromDuration(timeout));
return _headMutex.protect(() async {
late List<DHTRecord> oldLinkedRecords;
late List<int> oldIndex;
late List<int> oldFree;
late List<int?> oldSeqs;
late T out;
try {
// Iterate until we have a successful element and head write
do {
// Save off old values each pass of tryWriteHead because the head
// will have changed
oldLinkedRecords = List.of(_linkedRecords);
oldIndex = List.of(_index);
oldFree = List.of(_free);
oldSeqs = List.of(_seqs);
// Try to do the element write
while (true) {
if (timeoutTs != null) {
final now = Veilid.instance.now();
if (now >= timeoutTs) {
throw TimeoutException('timeout reached');
}
}
try {
out = await closure(this);
break;
} on DHTExceptionOutdated {
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);
_free = List.of(oldFree);
_seqs = List.of(oldSeqs);
} on Exception {
// Failed to write in closure resets state
_linkedRecords = List.of(oldLinkedRecords);
_index = List.of(oldIndex);
_free = List.of(oldFree);
_seqs = List.of(oldSeqs);
rethrow;
}
}
// Try to do the head write
} while (!await _writeHead(allowOffline: false));
onUpdatedHead?.call();
} on Exception {
// Exception means state needs to be reverted
_linkedRecords = oldLinkedRecords;
_index = oldIndex;
_free = oldFree;
_seqs = oldSeqs;
rethrow;
}
return out;
});
}
/////////////////////////////////////////////////////////////////////////////
// Watch For Updates