diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart index 4e632fc..d632b58 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record.dart @@ -511,7 +511,7 @@ class DHTRecord implements DHTDeleteable { key, subkeys: [ValueSubkeyRange.single(subkey)], ); - return rr.localSeqs.firstOrNull ?? emptySeq; + return rr.localSeqs.firstOrNull; } void _addValueChange( @@ -566,6 +566,4 @@ class DHTRecord implements DHTDeleteable { int _openCount; StreamController? _watchController; _WatchState? _watchState; - - static const int emptySeq = 0xFFFFFFFF; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart index e3c9abe..3ee9adc 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/dht_record_pool.dart @@ -875,7 +875,7 @@ class DHTRecordPool with TableDBBackedJson { if (fsc == null) { return null; } - final newerSubkeys = currentReport.newerSubkeys; + final newerSubkeys = currentReport.newerOnlineSubkeys; final valueData = await dhtctx.getDHTValue(openedRecordKey, fsc.subkey, forceRefresh: true); @@ -887,7 +887,7 @@ class DHTRecordPool with TableDBBackedJson { log('inspect returned a newer seq than get: ${valueData.seq} < $fsc'); } - if (valueData.seq > fsc.oldSeq && valueData.seq != DHTRecord.emptySeq) { + if (fsc.oldSeq == null || valueData.seq > fsc.oldSeq!) { processRemoteValueChange(VeilidUpdateValueChange( key: openedRecordKey, subkeys: newerSubkeys, diff --git a/packages/veilid_support/lib/dht_support/src/dht_record/extensions.dart b/packages/veilid_support/lib/dht_support/src/dht_record/extensions.dart index e62403e..b0da9e3 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_record/extensions.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_record/extensions.dart @@ -1,15 +1,14 @@ import 'package:veilid/veilid.dart'; -import 'dht_record_pool.dart'; class DHTSeqChange { const DHTSeqChange(this.subkey, this.oldSeq, this.newSeq); final int subkey; - final int oldSeq; + final int? oldSeq; final int newSeq; } extension DHTReportReportExt on DHTRecordReport { - List get newerSubkeys { + List get newerOnlineSubkeys { if (networkSeqs.isEmpty || localSeqs.isEmpty || subkeys.isEmpty) { return []; } @@ -19,8 +18,10 @@ extension DHTReportReportExt on DHTRecordReport { var i = 0; for (final skr in subkeys) { for (var sk = skr.low; sk <= skr.high; sk++) { - if (networkSeqs[i] > localSeqs[i] && - networkSeqs[i] != DHTRecord.emptySeq) { + final nseq = networkSeqs[i]; + final lseq = localSeqs[i]; + + if (nseq != null && (lseq == null || nseq > lseq)) { if (currentSubkeys.isNotEmpty && currentSubkeys.last.high == (sk - 1)) { currentSubkeys.add(ValueSubkeyRange( @@ -29,6 +30,7 @@ extension DHTReportReportExt on DHTRecordReport { currentSubkeys.add(ValueSubkeyRange.single(sk)); } } + i++; } } @@ -44,9 +46,11 @@ extension DHTReportReportExt on DHTRecordReport { var i = 0; for (final skr in subkeys) { for (var sk = skr.low; sk <= skr.high; sk++) { - if (networkSeqs[i] > localSeqs[i] && - networkSeqs[i] != DHTRecord.emptySeq) { - return DHTSeqChange(sk, localSeqs[i], networkSeqs[i]); + final nseq = networkSeqs[i]; + final lseq = localSeqs[i]; + + if (nseq != null && (lseq == null || nseq > lseq)) { + return DHTSeqChange(sk, lseq, nseq); } i++; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart index b0cc41b..66b5baa 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_head.dart @@ -5,7 +5,7 @@ class DHTShortArrayHeadLookup { {required this.record, required this.recordSubkey, required this.seq}); final DHTRecord record; final int recordSubkey; - final int seq; + final int? seq; } class _DHTShortArrayHead { @@ -41,7 +41,7 @@ class _DHTShortArrayHead { final head = proto.DHTShortArray(); head.keys.addAll(_linkedRecords.map((lr) => lr.key.toProto())); head.index = List.of(_index); - head.seqs.addAll(_seqs); + head.seqs.addAll(_seqs.map((x) => x ?? 0xFFFFFFFF)); // Do not serialize free list, it gets recreated // Do not serialize local seqs, they are only locally relevant return head; @@ -70,10 +70,7 @@ class _DHTShortArrayHead { Future delete() => _headMutex.protect(_headRecord.delete); Future operate(Future Function(_DHTShortArrayHead) closure) async => - // ignore: prefer_expression_function_bodies - _headMutex.protect(() async { - return closure(this); - }); + _headMutex.protect(() async => closure(this)); Future operateWrite( Future Function(_DHTShortArrayHead) closure) async => @@ -115,7 +112,7 @@ class _DHTShortArrayHead { late List oldLinkedRecords; late List oldIndex; late List oldFree; - late List oldSeqs; + late List oldSeqs; late T out; try { @@ -197,7 +194,8 @@ class _DHTShortArrayHead { // Get the set of new linked keys and validate it final updatedLinkedKeys = head.keys.map((p) => p.toVeilid()).toList(); final updatedIndex = List.of(head.index); - final updatedSeqs = List.of(head.seqs); + final updatedSeqs = + List.of(head.seqs.map((x) => x == 0xFFFFFFFF ? null : x)); final updatedFree = _makeFreeList(updatedLinkedKeys, updatedIndex); // See which records are actually new @@ -333,7 +331,7 @@ class _DHTShortArrayHead { } Future lookupIndex(int idx, bool allowCreate) async { - final seq = idx < _seqs.length ? _seqs[idx] : DHTRecord.emptySeq; + final seq = idx < _seqs.length ? _seqs[idx] : null; final recordNumber = idx ~/ _stride; final record = await _getOrCreateLinkedRecord(recordNumber, allowCreate); final recordSubkey = (idx % _stride) + ((recordNumber == 0) ? 1 : 0); @@ -445,18 +443,18 @@ class _DHTShortArrayHead { // If our local sequence number is unknown or hasnt been written yet // then a normal DHT operation is going to pull from the network anyway - if (_localSeqs.length < idx || _localSeqs[idx] == DHTRecord.emptySeq) { + if (_localSeqs.length < idx || _localSeqs[idx] == null) { return false; } // If the remote sequence number record is unknown or hasnt been written // at this index yet, then we also do not refresh at this time as it // is the first time the index is being written to - if (_seqs.length < idx || _seqs[idx] == DHTRecord.emptySeq) { + if (_seqs.length < idx || _seqs[idx] == null) { return false; } - return _localSeqs[idx] < _seqs[idx]; + return _localSeqs[idx]! < _seqs[idx]!; } /// Update the sequence number for a particular index in @@ -466,12 +464,12 @@ class _DHTShortArrayHead { final idx = _index[pos]; while (_localSeqs.length <= idx) { - _localSeqs.add(DHTRecord.emptySeq); + _localSeqs.add(null); } _localSeqs[idx] = newSeq; if (write) { while (_seqs.length <= idx) { - _seqs.add(DHTRecord.emptySeq); + _seqs.add(null); } _seqs[idx] = newSeq; } @@ -555,7 +553,7 @@ class _DHTShortArrayHead { // The sequence numbers of each subkey. // Index is by subkey number not by element index. // (n-1 for head record and then the next n for linked records) - List _seqs; + List _seqs; // The local sequence numbers for each subkey. - List _localSeqs; + List _localSeqs; } diff --git a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart index 1705bc0..fa3b1c6 100644 --- a/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart +++ b/packages/veilid_support/lib/dht_support/src/dht_short_array/dht_short_array_write.dart @@ -129,7 +129,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead final outSeqNum = Output(); - final result = lookup.seq == DHTRecord.emptySeq + final result = lookup.seq == null ? null : await lookup.record.get(subkey: lookup.recordSubkey); @@ -163,7 +163,7 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead final lookup = await _head.lookupPosition(pos, true); final outSeqNumRead = Output(); - final oldValue = lookup.seq == DHTRecord.emptySeq + final oldValue = lookup.seq == null ? null : await lookup.record .get(subkey: lookup.recordSubkey, outSeqNum: outSeqNumRead);