update handling of nulls in inspect results

This commit is contained in:
Christien Rioux 2025-04-25 17:13:58 -04:00
parent 933a22122a
commit 8194a79ce4
5 changed files with 31 additions and 31 deletions

View file

@ -511,7 +511,7 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
key,
subkeys: [ValueSubkeyRange.single(subkey)],
);
return rr.localSeqs.firstOrNull ?? emptySeq;
return rr.localSeqs.firstOrNull;
}
void _addValueChange(
@ -566,6 +566,4 @@ class DHTRecord implements DHTDeleteable<DHTRecord> {
int _openCount;
StreamController<DHTRecordWatchChange>? _watchController;
_WatchState? _watchState;
static const int emptySeq = 0xFFFFFFFF;
}

View file

@ -875,7 +875,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
if (fsc == null) {
return null;
}
final newerSubkeys = currentReport.newerSubkeys;
final newerSubkeys = currentReport.newerOnlineSubkeys;
final valueData = await dhtctx.getDHTValue(openedRecordKey, fsc.subkey,
forceRefresh: true);
@ -887,7 +887,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
log('inspect returned a newer seq than get: ${valueData.seq} < $fsc');
}
if (valueData.seq > fsc.oldSeq && valueData.seq != DHTRecord.emptySeq) {
if (fsc.oldSeq == null || valueData.seq > fsc.oldSeq!) {
processRemoteValueChange(VeilidUpdateValueChange(
key: openedRecordKey,
subkeys: newerSubkeys,

View file

@ -1,15 +1,14 @@
import 'package:veilid/veilid.dart';
import 'dht_record_pool.dart';
class DHTSeqChange {
const DHTSeqChange(this.subkey, this.oldSeq, this.newSeq);
final int subkey;
final int oldSeq;
final int? oldSeq;
final int newSeq;
}
extension DHTReportReportExt on DHTRecordReport {
List<ValueSubkeyRange> get newerSubkeys {
List<ValueSubkeyRange> get newerOnlineSubkeys {
if (networkSeqs.isEmpty || localSeqs.isEmpty || subkeys.isEmpty) {
return [];
}
@ -19,8 +18,10 @@ extension DHTReportReportExt on DHTRecordReport {
var i = 0;
for (final skr in subkeys) {
for (var sk = skr.low; sk <= skr.high; sk++) {
if (networkSeqs[i] > localSeqs[i] &&
networkSeqs[i] != DHTRecord.emptySeq) {
final nseq = networkSeqs[i];
final lseq = localSeqs[i];
if (nseq != null && (lseq == null || nseq > lseq)) {
if (currentSubkeys.isNotEmpty &&
currentSubkeys.last.high == (sk - 1)) {
currentSubkeys.add(ValueSubkeyRange(
@ -29,6 +30,7 @@ extension DHTReportReportExt on DHTRecordReport {
currentSubkeys.add(ValueSubkeyRange.single(sk));
}
}
i++;
}
}
@ -44,9 +46,11 @@ extension DHTReportReportExt on DHTRecordReport {
var i = 0;
for (final skr in subkeys) {
for (var sk = skr.low; sk <= skr.high; sk++) {
if (networkSeqs[i] > localSeqs[i] &&
networkSeqs[i] != DHTRecord.emptySeq) {
return DHTSeqChange(sk, localSeqs[i], networkSeqs[i]);
final nseq = networkSeqs[i];
final lseq = localSeqs[i];
if (nseq != null && (lseq == null || nseq > lseq)) {
return DHTSeqChange(sk, lseq, nseq);
}
i++;
}

View file

@ -5,7 +5,7 @@ class DHTShortArrayHeadLookup {
{required this.record, required this.recordSubkey, required this.seq});
final DHTRecord record;
final int recordSubkey;
final int seq;
final int? seq;
}
class _DHTShortArrayHead {
@ -41,7 +41,7 @@ class _DHTShortArrayHead {
final head = proto.DHTShortArray();
head.keys.addAll(_linkedRecords.map((lr) => lr.key.toProto()));
head.index = List.of(_index);
head.seqs.addAll(_seqs);
head.seqs.addAll(_seqs.map((x) => x ?? 0xFFFFFFFF));
// Do not serialize free list, it gets recreated
// Do not serialize local seqs, they are only locally relevant
return head;
@ -70,10 +70,7 @@ class _DHTShortArrayHead {
Future<bool> delete() => _headMutex.protect(_headRecord.delete);
Future<T> operate<T>(Future<T> Function(_DHTShortArrayHead) closure) async =>
// ignore: prefer_expression_function_bodies
_headMutex.protect(() async {
return closure(this);
});
_headMutex.protect(() async => closure(this));
Future<T> operateWrite<T>(
Future<T> Function(_DHTShortArrayHead) closure) async =>
@ -115,7 +112,7 @@ class _DHTShortArrayHead {
late List<DHTRecord> oldLinkedRecords;
late List<int> oldIndex;
late List<int> oldFree;
late List<int> oldSeqs;
late List<int?> oldSeqs;
late T out;
try {
@ -197,7 +194,8 @@ class _DHTShortArrayHead {
// Get the set of new linked keys and validate it
final updatedLinkedKeys = head.keys.map((p) => p.toVeilid()).toList();
final updatedIndex = List.of(head.index);
final updatedSeqs = List.of(head.seqs);
final updatedSeqs =
List.of(head.seqs.map((x) => x == 0xFFFFFFFF ? null : x));
final updatedFree = _makeFreeList(updatedLinkedKeys, updatedIndex);
// See which records are actually new
@ -333,7 +331,7 @@ class _DHTShortArrayHead {
}
Future<DHTShortArrayHeadLookup> lookupIndex(int idx, bool allowCreate) async {
final seq = idx < _seqs.length ? _seqs[idx] : DHTRecord.emptySeq;
final seq = idx < _seqs.length ? _seqs[idx] : null;
final recordNumber = idx ~/ _stride;
final record = await _getOrCreateLinkedRecord(recordNumber, allowCreate);
final recordSubkey = (idx % _stride) + ((recordNumber == 0) ? 1 : 0);
@ -445,18 +443,18 @@ class _DHTShortArrayHead {
// If our local sequence number is unknown or hasnt been written yet
// then a normal DHT operation is going to pull from the network anyway
if (_localSeqs.length < idx || _localSeqs[idx] == DHTRecord.emptySeq) {
if (_localSeqs.length < idx || _localSeqs[idx] == null) {
return false;
}
// If the remote sequence number record is unknown or hasnt been written
// at this index yet, then we also do not refresh at this time as it
// is the first time the index is being written to
if (_seqs.length < idx || _seqs[idx] == DHTRecord.emptySeq) {
if (_seqs.length < idx || _seqs[idx] == null) {
return false;
}
return _localSeqs[idx] < _seqs[idx];
return _localSeqs[idx]! < _seqs[idx]!;
}
/// Update the sequence number for a particular index in
@ -466,12 +464,12 @@ class _DHTShortArrayHead {
final idx = _index[pos];
while (_localSeqs.length <= idx) {
_localSeqs.add(DHTRecord.emptySeq);
_localSeqs.add(null);
}
_localSeqs[idx] = newSeq;
if (write) {
while (_seqs.length <= idx) {
_seqs.add(DHTRecord.emptySeq);
_seqs.add(null);
}
_seqs[idx] = newSeq;
}
@ -555,7 +553,7 @@ class _DHTShortArrayHead {
// The sequence numbers of each subkey.
// Index is by subkey number not by element index.
// (n-1 for head record and then the next n for linked records)
List<int> _seqs;
List<int?> _seqs;
// The local sequence numbers for each subkey.
List<int> _localSeqs;
List<int?> _localSeqs;
}

View file

@ -129,7 +129,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
final outSeqNum = Output<int>();
final result = lookup.seq == DHTRecord.emptySeq
final result = lookup.seq == null
? null
: await lookup.record.get(subkey: lookup.recordSubkey);
@ -163,7 +163,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
final lookup = await _head.lookupPosition(pos, true);
final outSeqNumRead = Output<int>();
final oldValue = lookup.seq == DHTRecord.emptySeq
final oldValue = lookup.seq == null
? null
: await lookup.record
.get(subkey: lookup.recordSubkey, outSeqNum: outSeqNumRead);