better handling of subkey sequence numbers

This commit is contained in:
Christien Rioux 2024-04-24 22:43:42 -04:00
parent 64d0019e6e
commit 0b4de3ad13
9 changed files with 185 additions and 57 deletions

View File

@ -379,6 +379,7 @@ class SingleContactMessagesCubit extends Cubit<SingleContactMessagesState> {
.toIList();
// Emit the rendered state
emit(AsyncValue.data(renderedState));
}

View File

@ -158,9 +158,17 @@ class ChatComponent extends StatelessWidget {
// Convert protobuf messages to chat messages
final chatMessages = <types.Message>[];
final tsSet = <String>{};
for (final message in messages) {
final chatMessage = messageToChatMessage(message);
chatMessages.insert(0, chatMessage);
if (!tsSet.add(chatMessage.id)) {
// ignore: avoid_print
print('duplicate id found: ${chatMessage.id}:\n'
'Messages:\n$messages\n'
'ChatMessages:\n$chatMessages');
assert(false, 'should not have duplicate id');
}
}
return DefaultTextStyle(

View File

@ -9,6 +9,7 @@ import 'package:loggy/loggy.dart';
import 'package:veilid_support/veilid_support.dart';
import '../veilid_processor/views/developer.dart';
import 'responsive.dart';
import 'state_logger.dart';
String wrapWithLogColor(LogLevel? level, String text) {
@ -111,7 +112,9 @@ class CallbackPrinter extends LoggyPrinter {
@override
void onLog(LogRecord record) {
final out = record.pretty();
debugPrint(out);
if (isDesktop) {
debugPrintSynchronously(out);
}
globalDebugTerminal.write('$out\n'.replaceAll('\n', '\r\n'));
callback?.call(record);
}

View File

@ -8,6 +8,7 @@ const Map<String, LogLevel> _blocChangeLogLevels = {
'ActiveConversationsBlocMapCubit': LogLevel.off,
'DHTShortArrayCubit<Message>': LogLevel.off,
'PersistentQueueCubit<Message>': LogLevel.off,
'SingleContactMessagesCubit': LogLevel.off,
};
const Map<String, LogLevel> _blocCreateCloseLogLevels = {};
const Map<String, LogLevel> _blocErrorLogLevels = {};

View File

@ -104,7 +104,8 @@ class DHTRecord {
{int subkey = -1,
DHTRecordCrypto? crypto,
bool forceRefresh = false,
bool onlyUpdates = false}) async {
bool onlyUpdates = false,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final valueData = await _routingContext.getDHTValue(key, subkey,
forceRefresh: forceRefresh);
@ -116,6 +117,9 @@ class DHTRecord {
return null;
}
final out = (crypto ?? _crypto).decrypt(valueData.data, subkey);
if (outSeqNum != null) {
outSeqNum.save(valueData.seq);
}
_sharedDHTRecordData.subkeySeqCache[subkey] = valueData.seq;
return out;
}
@ -124,12 +128,14 @@ class DHTRecord {
{int subkey = -1,
DHTRecordCrypto? crypto,
bool forceRefresh = false,
bool onlyUpdates = false}) async {
bool onlyUpdates = false,
Output<int>? outSeqNum}) async {
final data = await get(
subkey: subkey,
crypto: crypto,
forceRefresh: forceRefresh,
onlyUpdates: onlyUpdates);
onlyUpdates: onlyUpdates,
outSeqNum: outSeqNum);
if (data == null) {
return null;
}
@ -141,12 +147,14 @@ class DHTRecord {
{int subkey = -1,
DHTRecordCrypto? crypto,
bool forceRefresh = false,
bool onlyUpdates = false}) async {
bool onlyUpdates = false,
Output<int>? outSeqNum}) async {
final data = await get(
subkey: subkey,
crypto: crypto,
forceRefresh: forceRefresh,
onlyUpdates: onlyUpdates);
onlyUpdates: onlyUpdates,
outSeqNum: outSeqNum);
if (data == null) {
return null;
}
@ -154,7 +162,10 @@ class DHTRecord {
}
Future<Uint8List?> tryWriteBytes(Uint8List newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) async {
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final encryptedNewValue =
@ -175,6 +186,9 @@ class DHTRecord {
// Record new sequence number
final isUpdated = newValueData.seq != lastSeq;
if (isUpdated && outSeqNum != null) {
outSeqNum.save(newValueData.seq);
}
_sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq;
// See if the encrypted data returned is exactly the same
@ -197,7 +211,10 @@ class DHTRecord {
}
Future<void> eventualWriteBytes(Uint8List newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) async {
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
final lastSeq = _sharedDHTRecordData.subkeySeqCache[subkey];
final encryptedNewValue =
@ -222,6 +239,9 @@ class DHTRecord {
}
// Record new sequence number
if (outSeqNum != null) {
outSeqNum.save(newValueData.seq);
}
_sharedDHTRecordData.subkeySeqCache[subkey] = newValueData.seq;
// The encrypted data returned should be exactly the same
@ -239,12 +259,14 @@ class DHTRecord {
Future<Uint8List> Function(Uint8List? oldValue) update,
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer}) async {
KeyPair? writer,
Output<int>? outSeqNum}) async {
subkey = subkeyOrDefault(subkey);
// Get the existing data, do not allow force refresh here
// because if we need a refresh the setDHTValue will fail anyway
var oldValue = await get(subkey: subkey, crypto: crypto);
var oldValue =
await get(subkey: subkey, crypto: crypto, outSeqNum: outSeqNum);
do {
// Update the data
@ -252,16 +274,22 @@ class DHTRecord {
// Try to write it back to the network
oldValue = await tryWriteBytes(updatedValue,
subkey: subkey, crypto: crypto, writer: writer);
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
// Repeat update if newer data on the network was found
} while (oldValue != null);
}
Future<T?> tryWriteJson<T>(T Function(dynamic) fromJson, T newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
tryWriteBytes(jsonEncodeBytes(newValue),
subkey: subkey, crypto: crypto, writer: writer)
subkey: subkey,
crypto: crypto,
writer: writer,
outSeqNum: outSeqNum)
.then((out) {
if (out == null) {
return null;
@ -271,9 +299,15 @@ class DHTRecord {
Future<T?> tryWriteProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, T newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
tryWriteBytes(newValue.writeToBuffer(),
subkey: subkey, crypto: crypto, writer: writer)
subkey: subkey,
crypto: crypto,
writer: writer,
outSeqNum: outSeqNum)
.then((out) {
if (out == null) {
return null;
@ -282,26 +316,38 @@ class DHTRecord {
});
Future<void> eventualWriteJson<T>(T newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualWriteBytes(jsonEncodeBytes(newValue),
subkey: subkey, crypto: crypto, writer: writer);
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
Future<void> eventualWriteProtobuf<T extends GeneratedMessage>(T newValue,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualWriteBytes(newValue.writeToBuffer(),
subkey: subkey, crypto: crypto, writer: writer);
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
Future<void> eventualUpdateJson<T>(
T Function(dynamic) fromJson, Future<T> Function(T?) update,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualUpdateBytes(jsonUpdate(fromJson, update),
subkey: subkey, crypto: crypto, writer: writer);
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
Future<void> eventualUpdateProtobuf<T extends GeneratedMessage>(
T Function(List<int>) fromBuffer, Future<T> Function(T?) update,
{int subkey = -1, DHTRecordCrypto? crypto, KeyPair? writer}) =>
{int subkey = -1,
DHTRecordCrypto? crypto,
KeyPair? writer,
Output<int>? outSeqNum}) =>
eventualUpdateBytes(protobufUpdate(fromBuffer, update),
subkey: subkey, crypto: crypto, writer: writer);
subkey: subkey, crypto: crypto, writer: writer, outSeqNum: outSeqNum);
Future<void> watch(
{List<ValueSubkeyRange>? subkeys,

View File

@ -10,6 +10,9 @@ import 'package:protobuf/protobuf.dart';
import '../../../../veilid_support.dart';
export 'package:fast_immutable_collections/fast_immutable_collections.dart'
show Output;
part 'dht_record_pool.freezed.dart';
part 'dht_record_pool.g.dart';
part 'dht_record.dart';
@ -17,6 +20,10 @@ part 'dht_record.dart';
const int watchBackoffMultiplier = 2;
const int watchBackoffMax = 30;
const int? defaultWatchDurationSecs = null; // 600
const int watchRenewalNumerator = 4;
const int watchRenewalDenominator = 5;
typedef DHTRecordPoolLogger = void Function(String message);
/// Record pool that managed DHTRecords and allows for tagged deletion
@ -56,14 +63,17 @@ class WatchState extends Equatable {
{required this.subkeys,
required this.expiration,
required this.count,
this.realExpiration});
this.realExpiration,
this.renewalTime});
final List<ValueSubkeyRange>? subkeys;
final Timestamp? expiration;
final int? count;
final Timestamp? realExpiration;
final Timestamp? renewalTime;
@override
List<Object?> get props => [subkeys, expiration, count, realExpiration];
List<Object?> get props =>
[subkeys, expiration, count, realExpiration, renewalTime];
}
/// Data shared amongst all DHTRecord instances
@ -77,6 +87,7 @@ class SharedDHTRecordData {
VeilidRoutingContext defaultRoutingContext;
Map<int, int> subkeySeqCache = {};
bool needsWatchStateUpdate = false;
WatchState? unionWatchState;
bool deleteOnClose = false;
}
@ -616,6 +627,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
int? totalCount;
Timestamp? maxExpiration;
List<ValueSubkeyRange>? allSubkeys;
Timestamp? earliestRenewalTime;
var noExpiration = false;
var everySubkey = false;
@ -648,6 +660,15 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
} else {
everySubkey = true;
}
final wsRenewalTime = ws.renewalTime;
if (wsRenewalTime != null) {
earliestRenewalTime = earliestRenewalTime == null
? wsRenewalTime
: Timestamp(
value: (wsRenewalTime.value < earliestRenewalTime.value
? wsRenewalTime.value
: earliestRenewalTime.value));
}
}
}
if (noExpiration) {
@ -661,11 +682,14 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
return WatchState(
subkeys: allSubkeys, expiration: maxExpiration, count: totalCount);
subkeys: allSubkeys,
expiration: maxExpiration,
count: totalCount,
renewalTime: earliestRenewalTime);
}
void _updateWatchRealExpirations(
Iterable<DHTRecord> records, Timestamp realExpiration) {
void _updateWatchRealExpirations(Iterable<DHTRecord> records,
Timestamp realExpiration, Timestamp renewalTime) {
for (final rec in records) {
final ws = rec.watchState;
if (ws != null) {
@ -673,7 +697,8 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
subkeys: ws.subkeys,
expiration: ws.expiration,
count: ws.count,
realExpiration: realExpiration);
realExpiration: realExpiration,
renewalTime: renewalTime);
}
}
}
@ -689,6 +714,7 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
}
_inTick = true;
_tickCount = 0;
final now = veilid.now();
try {
final allSuccess = await _mutex.protect(() async {
@ -700,12 +726,24 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
final openedRecordInfo = kv.value;
final dhtctx = openedRecordInfo.shared.defaultRoutingContext;
if (openedRecordInfo.shared.needsWatchStateUpdate) {
final watchState =
var wantsWatchStateUpdate =
openedRecordInfo.shared.needsWatchStateUpdate;
// Check if we have reached renewal time for the watch
if (openedRecordInfo.shared.unionWatchState != null &&
openedRecordInfo.shared.unionWatchState!.renewalTime != null &&
now.value >
openedRecordInfo.shared.unionWatchState!.renewalTime!.value) {
wantsWatchStateUpdate = true;
}
if (wantsWatchStateUpdate) {
// Update union watch state
final unionWatchState = openedRecordInfo.shared.unionWatchState =
_collectUnionWatchState(openedRecordInfo.records);
// Apply watch changes for record
if (watchState == null) {
if (unionWatchState == null) {
unord.add(() async {
// Record needs watch cancel
var success = false;
@ -727,26 +765,39 @@ class DHTRecordPool with TableDBBackedJson<DHTRecordPoolAllocations> {
// Record needs new watch
var success = false;
try {
final subkeys = watchState.subkeys?.toList();
final count = watchState.count;
final expiration = watchState.expiration;
final subkeys = unionWatchState.subkeys?.toList();
final count = unionWatchState.count;
final expiration = unionWatchState.expiration;
final now = veilid.now();
final realExpiration = await dhtctx.watchDHTValues(
openedRecordKey,
subkeys: watchState.subkeys?.toList(),
count: watchState.count,
expiration: watchState.expiration);
subkeys: unionWatchState.subkeys?.toList(),
count: unionWatchState.count,
expiration: unionWatchState.expiration ??
(defaultWatchDurationSecs == null
? null
: veilid.now().offset(
TimestampDuration.fromMillis(
defaultWatchDurationSecs! * 1000))));
final expirationDuration = realExpiration.diff(now);
final renewalTime = now.offset(TimestampDuration(
value: expirationDuration.value *
BigInt.from(watchRenewalNumerator) ~/
BigInt.from(watchRenewalDenominator)));
log('watchDHTValues: key=$openedRecordKey, subkeys=$subkeys, '
'count=$count, expiration=$expiration, '
'realExpiration=$realExpiration, '
'renewalTime=$renewalTime, '
'debugNames=${openedRecordInfo.debugNames}');
// Update watch states with real expiration
if (realExpiration.value != BigInt.zero) {
openedRecordInfo.shared.needsWatchStateUpdate = false;
_updateWatchRealExpirations(
openedRecordInfo.records, realExpiration);
openedRecordInfo.records, realExpiration, renewalTime);
success = true;
}
} on VeilidAPIException catch (e) {

View File

@ -424,21 +424,18 @@ class _DHTShortArrayHead {
/// Update the sequence number for a particular index in
/// our local sequence number list.
/// If a write is happening, update the network copy as well.
Future<void> updatePositionSeq(int pos, bool write) async {
void updatePositionSeq(int pos, bool write, int newSeq) {
final idx = _index[pos];
final lookup = await lookupIndex(idx);
final report = await lookup.record
.inspect(subkeys: [ValueSubkeyRange.single(lookup.recordSubkey)]);
while (_localSeqs.length <= idx) {
_localSeqs.add(0xFFFFFFFF);
}
_localSeqs[idx] = report.localSeqs[0];
_localSeqs[idx] = newSeq;
if (write) {
while (_seqs.length <= idx) {
_seqs.add(0xFFFFFFFF);
}
_seqs[idx] = report.localSeqs[0];
_seqs[idx] = newSeq;
}
}

View File

@ -74,9 +74,14 @@ class _DHTShortArrayRead implements DHTShortArrayRead {
final lookup = await _head.lookupPosition(pos);
final refresh = forceRefresh || _head.positionNeedsRefresh(pos);
final out =
lookup.record.get(subkey: lookup.recordSubkey, forceRefresh: refresh);
await _head.updatePositionSeq(pos, false);
final outSeqNum = Output<int>();
final out = lookup.record.get(
subkey: lookup.recordSubkey,
forceRefresh: refresh,
outSeqNum: outSeqNum);
if (outSeqNum.value != null) {
_head.updatePositionSeq(pos, false, outSeqNum.value!);
}
return out;
}

View File

@ -110,9 +110,6 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
return false;
}
// Get sequence number written
await _head.updatePositionSeq(pos, true);
return true;
}
@ -127,9 +124,6 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
return false;
}
// Get sequence number written
await _head.updatePositionSeq(pos, true);
return true;
}
@ -153,9 +147,17 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
throw IndexError.withLength(pos, _head.length);
}
final lookup = await _head.lookupPosition(pos);
final outSeqNum = Output<int>();
final result = lookup.seq == 0xFFFFFFFF
? null
: await lookup.record.get(subkey: lookup.recordSubkey);
if (outSeqNum.value != null) {
_head.updatePositionSeq(pos, false, outSeqNum.value!);
}
if (result == null) {
throw StateError('Element does not exist');
}
@ -175,11 +177,25 @@ class _DHTShortArrayWrite extends _DHTShortArrayRead
throw IndexError.withLength(pos, _head.length);
}
final lookup = await _head.lookupPosition(pos);
final outSeqNum = Output<int>();
final oldValue = lookup.seq == 0xFFFFFFFF
? null
: await lookup.record.get(subkey: lookup.recordSubkey);
final result = await lookup.record
.tryWriteBytes(newValue, subkey: lookup.recordSubkey);
: await lookup.record
.get(subkey: lookup.recordSubkey, outSeqNum: outSeqNum);
if (outSeqNum.value != null) {
_head.updatePositionSeq(pos, false, outSeqNum.value!);
}
final result = await lookup.record.tryWriteBytes(newValue,
subkey: lookup.recordSubkey, outSeqNum: outSeqNum);
if (outSeqNum.value != null) {
_head.updatePositionSeq(pos, true, outSeqNum.value!);
}
if (result != null) {
// A result coming back means the element was overwritten already
return (result, false);