From 6e1439306a29d6daa34b3cdd3ec64cbdaac4b2dd Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 31 Mar 2024 16:34:12 -0400 Subject: [PATCH] make valuechanged update no longer happen when value hasn't changed or is older --- veilid-core/src/storage_manager/set_value.rs | 2 +- .../tasks/check_active_watches.rs | 2 +- .../src/storage_manager/watch_value.rs | 68 ++++++++++------ veilid-core/src/veilid_api/tests/fixtures.rs | 2 +- .../src/veilid_api/types/veilid_state.rs | 2 +- .../example/integration_test/fixtures.dart | 17 +++- .../example/integration_test/test_dht.dart | 63 ++++++++++----- veilid-flutter/lib/veilid_state.dart | 2 +- veilid-flutter/lib/veilid_state.freezed.dart | 78 ++++++++++--------- veilid-flutter/lib/veilid_state.g.dart | 4 +- veilid-python/tests/test_dht.py | 44 ++++++----- veilid-python/veilid/schema/RecvMessage.json | 12 ++- veilid-python/veilid/state.py | 6 +- 13 files changed, 190 insertions(+), 112 deletions(-) diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index f5bb3550..f21c6375 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -221,7 +221,7 @@ impl StorageManager { // Make sure this value would actually be newer if let Some(last_value) = &last_get_result.opt_value { if value.value_data().seq() <= last_value.value_data().seq() { - // inbound value is older or equal sequence number than the one we have, just return the one we have + // inbound value is older than or equal to the sequence number that we have, just return the one we have return Ok(NetworkResult::value(Some(last_value.clone()))); } } diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index da49621a..2da4da81 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -59,7 +59,7 @@ impl StorageManager { key: *k, subkeys: ValueSubkeyRangeSet::new(), count: 0, - value: ValueData::default(), + value: None, }))); } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 1aa1b6f3..3db4a86d 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -232,7 +232,7 @@ impl StorageManager { watch_id: u64, ) -> VeilidAPIResult<()> { // Update local record store with new value - let (res, opt_update_callback) = { + let (is_value_seq_newer, opt_update_callback) = { let mut inner = self.lock().await?; // Don't process update if the record is closed @@ -281,32 +281,56 @@ impl StorageManager { } // Set the local value - let res = if let Some(first_subkey) = subkeys.first() { - inner - .handle_set_local_value( - key, - first_subkey, - value.clone(), - WatchUpdateMode::NoUpdate, - ) - .await - } else { - VeilidAPIResult::Ok(()) - }; + let mut is_value_seq_newer = false; + if let Some(first_subkey) = subkeys.first() { + let last_get_result = inner + .handle_get_local_value(key, first_subkey, false) + .await?; - (res, inner.update_callback.clone()) + // Make sure this value would actually be newer + is_value_seq_newer = true; + if let Some(last_value) = &last_get_result.opt_value { + if value.value_data().seq() <= last_value.value_data().seq() { + // inbound value is older than or equal to the sequence number that we have, just return the one we have + is_value_seq_newer = false; + } + } + if is_value_seq_newer { + inner + .handle_set_local_value( + key, + first_subkey, + value.clone(), + WatchUpdateMode::NoUpdate, + ) + .await?; + } + } + + (is_value_seq_newer, inner.update_callback.clone()) }; // Announce ValueChanged VeilidUpdate - if let Some(update_callback) = opt_update_callback { - update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { - key, - subkeys, - count, - value: value.value_data().clone(), - }))); + // * if the value in the update had a newer sequence number + // * if more than a single subkeys has changed + // * if the count was zero meaning cancelled + + let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0; + if do_update { + if let Some(update_callback) = opt_update_callback { + update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + key, + subkeys, + count, + value: if is_value_seq_newer { + Some(value.value_data().clone()) + } else { + None + }, + }))); + } } - res + Ok(()) } } diff --git a/veilid-core/src/veilid_api/tests/fixtures.rs b/veilid-core/src/veilid_api/tests/fixtures.rs index c85e5109..df99841a 100644 --- a/veilid-core/src/veilid_api/tests/fixtures.rs +++ b/veilid-core/src/veilid_api/tests/fixtures.rs @@ -210,6 +210,6 @@ pub fn fix_veilidvaluechange() -> VeilidValueChange { key: fix_typedkey(), subkeys: ValueSubkeyRangeSet::new(), count: 5, - value: ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()).unwrap(), + value: Some(ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()).unwrap()), } } diff --git a/veilid-core/src/veilid_api/types/veilid_state.rs b/veilid-core/src/veilid_api/types/veilid_state.rs index b6947690..188ff71c 100644 --- a/veilid-core/src/veilid_api/types/veilid_state.rs +++ b/veilid-core/src/veilid_api/types/veilid_state.rs @@ -101,7 +101,7 @@ pub struct VeilidValueChange { pub key: TypedKey, pub subkeys: ValueSubkeyRangeSet, pub count: u32, - pub value: ValueData, + pub value: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] diff --git a/veilid-flutter/example/integration_test/fixtures.dart b/veilid-flutter/example/integration_test/fixtures.dart index 703e9a00..ef69e819 100644 --- a/veilid-flutter/example/integration_test/fixtures.dart +++ b/veilid-flutter/example/integration_test/fixtures.dart @@ -19,37 +19,50 @@ class DefaultFixture { assert(_veilidUpdateStream == null, 'should not set up fixture twice'); + final ignoreLogTargetsStr = + // ignore: do_not_use_environment + const String.fromEnvironment('IGNORE_LOG_TARGETS').trim(); + final ignoreLogTargets = ignoreLogTargetsStr.isEmpty + ? [] + : ignoreLogTargetsStr.split(',').map((e) => e.trim()).toList(); + final Map platformConfigJson; if (kIsWeb) { - const platformConfig = VeilidWASMConfig( + final platformConfig = VeilidWASMConfig( logging: VeilidWASMConfigLogging( performance: VeilidWASMConfigLoggingPerformance( enabled: true, level: VeilidConfigLogLevel.debug, logsInTimings: true, logsInConsole: false, + ignoreLogTargets: ignoreLogTargets, ), api: VeilidWASMConfigLoggingApi( enabled: true, level: VeilidConfigLogLevel.info, + ignoreLogTargets: ignoreLogTargets, ))); platformConfigJson = platformConfig.toJson(); } else { - const platformConfig = VeilidFFIConfig( + final platformConfig = VeilidFFIConfig( logging: VeilidFFIConfigLogging( terminal: VeilidFFIConfigLoggingTerminal( enabled: false, level: VeilidConfigLogLevel.debug, + ignoreLogTargets: ignoreLogTargets, ), otlp: VeilidFFIConfigLoggingOtlp( enabled: false, level: VeilidConfigLogLevel.trace, grpcEndpoint: 'localhost:4317', serviceName: 'Veilid Tests', + ignoreLogTargets: ignoreLogTargets, ), api: VeilidFFIConfigLoggingApi( enabled: true, + // level: VeilidConfigLogLevel.debug, level: VeilidConfigLogLevel.info, + ignoreLogTargets: ignoreLogTargets, ))); platformConfigJson = platformConfig.toJson(); } diff --git a/veilid-flutter/example/integration_test/test_dht.dart b/veilid-flutter/example/integration_test/test_dht.dart index 795ff83a..dbe462a3 100644 --- a/veilid-flutter/example/integration_test/test_dht.dart +++ b/veilid-flutter/example/integration_test/test_dht.dart @@ -225,14 +225,15 @@ Future testOpenWriterDHTValue() async { } Future testWatchDHTValues(Stream updateStream) async { - final valueChangeQueue = StreamController(); + final valueChangeQueue = + StreamController.broadcast(); final valueChangeSubscription = updateStream.listen((update) { if (update is VeilidUpdateValueChange) { // print("valuechange: " + update.toString()); valueChangeQueue.sink.add(update); } }); - final valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); + var valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); try { // Make two routing contexts, one with and one without safety @@ -262,6 +263,25 @@ Future testWatchDHTValues(Stream updateStream) async { // Now set the subkey and trigger an update expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); + // Now we should NOT get an update because the update + // is the same as our local copy + if (await valueChangeQueueIterator + .moveNext() + .timeout(const Duration(seconds: 5), onTimeout: () { + return false; + })) { + fail("should not have a change"); + } + valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); + + // Now set multiple subkeys and trigger an update + expect( + await [ + rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH")), + rcSet.setDHTValue(rec.key, 4, utf8.encode("BZORT")) + ].wait, + equals([null, null])); + // Wait for the update await valueChangeQueueIterator .moveNext() @@ -271,13 +291,10 @@ Future testWatchDHTValues(Stream updateStream) async { // Verify the update expect(valueChangeQueueIterator.current.key, equals(rec.key)); - expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFE)); + expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD)); expect(valueChangeQueueIterator.current.subkeys, - equals([ValueSubkeyRange.single(3)])); - expect(valueChangeQueueIterator.current.value.seq, equals(1)); - expect(valueChangeQueueIterator.current.value.data, - equals(utf8.encode("BLAH"))); - expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner)); + equals([ValueSubkeyRange.make(3, 4)])); + expect(valueChangeQueueIterator.current.value, isNull); // Reopen without closing to change routing context and not lose watch rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); @@ -291,9 +308,13 @@ Future testWatchDHTValues(Stream updateStream) async { // Reopen without closing to change routing context and not lose watch rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); - // Change our subkey - expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")), - isNull); + // Now set multiple subkeys and trigger an update + expect( + await [ + rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")), + rcSet.setDHTValue(rec.key, 5, utf8.encode("BZORT BZORT")) + ].wait, + equals([null, null])); // Wait for the update await valueChangeQueueIterator @@ -302,15 +323,12 @@ Future testWatchDHTValues(Stream updateStream) async { fail("should have a change"); }); - // Verify the update + // Verify the update came back but we don't get a new value because the sequence number is the same expect(valueChangeQueueIterator.current.key, equals(rec.key)); - expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD)); + expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFC)); expect(valueChangeQueueIterator.current.subkeys, - equals([ValueSubkeyRange.single(3)])); - expect(valueChangeQueueIterator.current.value.seq, equals(2)); - expect(valueChangeQueueIterator.current.value.data, - equals(utf8.encode("BLAH BLAH BLAH"))); - expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner)); + equals([ValueSubkeyRange.single(3), ValueSubkeyRange.single(5)])); + expect(valueChangeQueueIterator.current.value, isNull); // Reopen without closing to change routing context and not lose watch rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); @@ -324,8 +342,13 @@ Future testWatchDHTValues(Stream updateStream) async { // Reopen without closing to change routing context and not lose watch rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); - // Set the value without a watch - expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); + // Now set multiple subkeys and trigger an update + expect( + await [ + rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH BLAH")), + rcSet.setDHTValue(rec.key, 5, utf8.encode("BZORT BZORT BZORT")) + ].wait, + equals([null, null])); // Now we should NOT get an update if (await valueChangeQueueIterator diff --git a/veilid-flutter/lib/veilid_state.dart b/veilid-flutter/lib/veilid_state.dart index f45cd07e..3aa8f468 100644 --- a/veilid-flutter/lib/veilid_state.dart +++ b/veilid-flutter/lib/veilid_state.dart @@ -174,7 +174,7 @@ sealed class VeilidUpdate with _$VeilidUpdate { required TypedKey key, required List subkeys, required int count, - required ValueData value, + required ValueData? value, }) = VeilidUpdateValueChange; factory VeilidUpdate.fromJson(dynamic json) => diff --git a/veilid-flutter/lib/veilid_state.freezed.dart b/veilid-flutter/lib/veilid_state.freezed.dart index 4bba1f1d..cd23c8d4 100644 --- a/veilid-flutter/lib/veilid_state.freezed.dart +++ b/veilid-flutter/lib/veilid_state.freezed.dart @@ -1360,7 +1360,7 @@ mixin _$VeilidUpdate { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) => throw _privateConstructorUsedError; @@ -1388,7 +1388,7 @@ mixin _$VeilidUpdate { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) => throw _privateConstructorUsedError; @@ -1416,7 +1416,7 @@ mixin _$VeilidUpdate { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) => @@ -1598,7 +1598,7 @@ class _$VeilidLogImpl implements VeilidLog { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return log(logLevel, message, backtrace); @@ -1629,7 +1629,7 @@ class _$VeilidLogImpl implements VeilidLog { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return log?.call(logLevel, message, backtrace); @@ -1660,7 +1660,7 @@ class _$VeilidLogImpl implements VeilidLog { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -1867,7 +1867,7 @@ class _$VeilidAppMessageImpl implements VeilidAppMessage { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return appMessage(message, sender, routeId); @@ -1898,7 +1898,7 @@ class _$VeilidAppMessageImpl implements VeilidAppMessage { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return appMessage?.call(message, sender, routeId); @@ -1929,7 +1929,7 @@ class _$VeilidAppMessageImpl implements VeilidAppMessage { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -2146,7 +2146,7 @@ class _$VeilidAppCallImpl implements VeilidAppCall { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return appCall(message, callId, sender, routeId); @@ -2177,7 +2177,7 @@ class _$VeilidAppCallImpl implements VeilidAppCall { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return appCall?.call(message, callId, sender, routeId); @@ -2208,7 +2208,7 @@ class _$VeilidAppCallImpl implements VeilidAppCall { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -2421,7 +2421,7 @@ class _$VeilidUpdateAttachmentImpl implements VeilidUpdateAttachment { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return attachment(state, publicInternetReady, localNetworkReady); @@ -2452,7 +2452,7 @@ class _$VeilidUpdateAttachmentImpl implements VeilidUpdateAttachment { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return attachment?.call(state, publicInternetReady, localNetworkReady); @@ -2483,7 +2483,7 @@ class _$VeilidUpdateAttachmentImpl implements VeilidUpdateAttachment { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -2702,7 +2702,7 @@ class _$VeilidUpdateNetworkImpl implements VeilidUpdateNetwork { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return network(started, bpsDown, bpsUp, peers); @@ -2733,7 +2733,7 @@ class _$VeilidUpdateNetworkImpl implements VeilidUpdateNetwork { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return network?.call(started, bpsDown, bpsUp, peers); @@ -2764,7 +2764,7 @@ class _$VeilidUpdateNetworkImpl implements VeilidUpdateNetwork { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -2958,7 +2958,7 @@ class _$VeilidUpdateConfigImpl implements VeilidUpdateConfig { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return config(this.config); @@ -2989,7 +2989,7 @@ class _$VeilidUpdateConfigImpl implements VeilidUpdateConfig { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return config?.call(this.config); @@ -3020,7 +3020,7 @@ class _$VeilidUpdateConfigImpl implements VeilidUpdateConfig { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -3230,7 +3230,7 @@ class _$VeilidUpdateRouteChangeImpl implements VeilidUpdateRouteChange { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return routeChange(deadRoutes, deadRemoteRoutes); @@ -3261,7 +3261,7 @@ class _$VeilidUpdateRouteChangeImpl implements VeilidUpdateRouteChange { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return routeChange?.call(deadRoutes, deadRemoteRoutes); @@ -3292,7 +3292,7 @@ class _$VeilidUpdateRouteChangeImpl implements VeilidUpdateRouteChange { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -3386,9 +3386,9 @@ abstract class _$$VeilidUpdateValueChangeImplCopyWith<$Res> { {Typed key, List subkeys, int count, - ValueData value}); + ValueData? value}); - $ValueDataCopyWith<$Res> get value; + $ValueDataCopyWith<$Res>? get value; } /// @nodoc @@ -3406,7 +3406,7 @@ class __$$VeilidUpdateValueChangeImplCopyWithImpl<$Res> Object? key = null, Object? subkeys = null, Object? count = null, - Object? value = null, + Object? value = freezed, }) { return _then(_$VeilidUpdateValueChangeImpl( key: null == key @@ -3421,17 +3421,21 @@ class __$$VeilidUpdateValueChangeImplCopyWithImpl<$Res> ? _value.count : count // ignore: cast_nullable_to_non_nullable as int, - value: null == value + value: freezed == value ? _value.value : value // ignore: cast_nullable_to_non_nullable - as ValueData, + as ValueData?, )); } @override @pragma('vm:prefer-inline') - $ValueDataCopyWith<$Res> get value { - return $ValueDataCopyWith<$Res>(_value.value, (value) { + $ValueDataCopyWith<$Res>? get value { + if (_value.value == null) { + return null; + } + + return $ValueDataCopyWith<$Res>(_value.value!, (value) { return _then(_value.copyWith(value: value)); }); } @@ -3465,7 +3469,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange { @override final int count; @override - final ValueData value; + final ValueData? value; @JsonKey(name: 'kind') final String $type; @@ -3526,7 +3530,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange { List deadRoutes, List deadRemoteRoutes) routeChange, required TResult Function(Typed key, - List subkeys, int count, ValueData value) + List subkeys, int count, ValueData? value) valueChange, }) { return valueChange(key, subkeys, count, value); @@ -3557,7 +3561,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange { TResult? Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult? Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, }) { return valueChange?.call(key, subkeys, count, value); @@ -3588,7 +3592,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange { TResult Function(List deadRoutes, List deadRemoteRoutes)? routeChange, TResult Function(Typed key, - List subkeys, int count, ValueData value)? + List subkeys, int count, ValueData? value)? valueChange, required TResult orElse(), }) { @@ -3660,7 +3664,7 @@ abstract class VeilidUpdateValueChange implements VeilidUpdate { {required final Typed key, required final List subkeys, required final int count, - required final ValueData value}) = _$VeilidUpdateValueChangeImpl; + required final ValueData? value}) = _$VeilidUpdateValueChangeImpl; factory VeilidUpdateValueChange.fromJson(Map json) = _$VeilidUpdateValueChangeImpl.fromJson; @@ -3668,7 +3672,7 @@ abstract class VeilidUpdateValueChange implements VeilidUpdate { Typed get key; List get subkeys; int get count; - ValueData get value; + ValueData? get value; @JsonKey(ignore: true) _$$VeilidUpdateValueChangeImplCopyWith<_$VeilidUpdateValueChangeImpl> get copyWith => throw _privateConstructorUsedError; diff --git a/veilid-flutter/lib/veilid_state.g.dart b/veilid-flutter/lib/veilid_state.g.dart index 4f454466..11453507 100644 --- a/veilid-flutter/lib/veilid_state.g.dart +++ b/veilid-flutter/lib/veilid_state.g.dart @@ -255,7 +255,7 @@ _$VeilidUpdateValueChangeImpl _$$VeilidUpdateValueChangeImplFromJson( .map(ValueSubkeyRange.fromJson) .toList(), count: json['count'] as int, - value: ValueData.fromJson(json['value']), + value: json['value'] == null ? null : ValueData.fromJson(json['value']), $type: json['kind'] as String?, ); @@ -265,7 +265,7 @@ Map _$$VeilidUpdateValueChangeImplToJson( 'key': instance.key.toJson(), 'subkeys': instance.subkeys.map((e) => e.toJson()).toList(), 'count': instance.count, - 'value': instance.value.toJson(), + 'value': instance.value?.toJson(), 'kind': instance.$type, }; diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 5346e229..58447c9e 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -247,17 +247,27 @@ async def test_watch_dht_values(): # Now set the subkey and trigger an update vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH") assert vd == None + + # Now we should NOT get an update because the update is the same as our local copy + update = None + try: + update = await asyncio.wait_for(value_change_queue.get(), timeout=5) + except asyncio.TimeoutError: + pass + assert update == None + + # Now set multiple subkeys and trigger an update + vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH"), rcSet.set_dht_value(rec.key, 4, b"BZORT")]) + assert vd == [None, None] # Wait for the update upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) - # Verify the update + # Verify the update came back but we don't get a new value because the sequence number is the same assert upd.detail.key == rec.key - assert upd.detail.count == 0xFFFFFFFE - assert upd.detail.subkeys == [(3,3)] - assert upd.detail.value.seq == 1 - assert upd.detail.value.data == b"BLAH" - assert upd.detail.value.writer == rec.owner + assert upd.detail.count == 0xFFFFFFFD + assert upd.detail.subkeys == [(3, 4)] + assert upd.detail.value == None # Reopen without closing to change routing context and not lose watch rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) @@ -269,20 +279,18 @@ async def test_watch_dht_values(): # Reopen without closing to change routing context and not lose watch rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) - # Change our subkey - vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") - assert vd == None + # Now set multiple subkeys and trigger an update + vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, 5, b"BZORT BZORT")]) + assert vd == [None, None] # Wait for the update upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) - # Verify the update + # Verify the update came back but we don't get a new value because the sequence number is the same assert upd.detail.key == rec.key - assert upd.detail.count == 0xFFFFFFFD - assert upd.detail.subkeys == [(3,3)] - assert upd.detail.value.seq == 2 - assert upd.detail.value.data == b"BLAH BLAH BLAH" - assert upd.detail.value.writer == rec.owner + assert upd.detail.count == 0xFFFFFFFC + assert upd.detail.subkeys == [(3, 3), (5, 5)] + assert upd.detail.value == None # Reopen without closing to change routing context and not lose watch rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) @@ -294,9 +302,9 @@ async def test_watch_dht_values(): # Reopen without closing to change routing context and not lose watch rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) - # Set the value without a watch - vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH") - assert vd == None + # Now set multiple subkeys + vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, 5, b"BZORT BZORT BZORT")]) + assert vd == [None, None] # Now we should NOT get an update update = None diff --git a/veilid-python/veilid/schema/RecvMessage.json b/veilid-python/veilid/schema/RecvMessage.json index fe92e9a9..1ee1ea81 100644 --- a/veilid-python/veilid/schema/RecvMessage.json +++ b/veilid-python/veilid/schema/RecvMessage.json @@ -2673,8 +2673,7 @@ "count", "key", "kind", - "subkeys", - "value" + "subkeys" ], "properties": { "count": { @@ -2712,7 +2711,14 @@ } }, "value": { - "$ref": "#/definitions/ValueData" + "anyOf": [ + { + "$ref": "#/definitions/ValueData" + }, + { + "type": "null" + } + ] } } }, diff --git a/veilid-python/veilid/state.py b/veilid-python/veilid/state.py index df273647..1a86e1be 100644 --- a/veilid-python/veilid/state.py +++ b/veilid-python/veilid/state.py @@ -358,9 +358,9 @@ class VeilidValueChange: key: TypedKey subkeys: list[tuple[ValueSubkey, ValueSubkey]] count: int - value: ValueData + value: Optional[ValueData] - def __init__(self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], count: int, value: ValueData): + def __init__(self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], count: int, value: Optional[ValueData]): self.key = key self.subkeys = subkeys self.count = count @@ -373,7 +373,7 @@ class VeilidValueChange: TypedKey(j["key"]), [(p[0], p[1]) for p in j["subkeys"]], j["count"], - ValueData.from_json(j["value"]), + None if j["value"] is None else ValueData.from_json(j["value"]), )