Merge branch 'valuechanged-optional' into 'main'

ValueChanged Optional

See merge request veilid/veilid!264
This commit is contained in:
Christien Rioux 2024-04-01 02:14:21 +00:00
commit cdedf37ade
19 changed files with 272 additions and 144 deletions

View File

@ -385,7 +385,7 @@ struct OperationValueChanged @0xd1c59ebdd8cc1bf6 {
subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, if empty this is a watch expiration notice) subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, if empty this is a watch expiration notice)
count @2 :UInt32; # remaining changes left (0 means watch has expired) count @2 :UInt32; # remaining changes left (0 means watch has expired)
watchId @3 :UInt64; # watch id this value change came from watchId @3 :UInt64; # watch id this value change came from
value @4 :SignedValueData; # first value that changed (the rest can be gotten with getvalue) value @4 :SignedValueData; # Optional: first value that changed (the rest can be gotten with getvalue)
} }
struct OperationSupplyBlockQ @0xadbf4c542d749971 { struct OperationSupplyBlockQ @0xadbf4c542d749971 {

View File

@ -22084,4 +22084,4 @@ pub mod operation {
} }
} }
//BUILDHASH:6df0786a4485a9d0c25c177959fe1b2070c2a98884d20267eb70e129be95f2b0 //BUILDHASH:a28970f74fcf7989d47e1457e848e5de33d8b7a4003a0e439c95f442ecf69fd3

View File

@ -9,7 +9,7 @@ pub(in crate::rpc_processor) struct RPCOperationValueChanged {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: SignedValueData, value: Option<SignedValueData>,
} }
impl RPCOperationValueChanged { impl RPCOperationValueChanged {
@ -19,7 +19,7 @@ impl RPCOperationValueChanged {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: SignedValueData, value: Option<SignedValueData>,
) -> Result<Self, RPCError> { ) -> Result<Self, RPCError> {
if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN {
return Err(RPCError::protocol( return Err(RPCError::protocol(
@ -30,6 +30,11 @@ impl RPCOperationValueChanged {
if watch_id == 0 { if watch_id == 0 {
return Err(RPCError::protocol("ValueChanged needs a nonzero watch id")); return Err(RPCError::protocol("ValueChanged needs a nonzero watch id"));
} }
if subkeys.is_empty() && value.is_some() {
return Err(RPCError::protocol(
"ValueChanged with a value must have subkeys",
));
}
Ok(Self { Ok(Self {
key, key,
@ -44,6 +49,11 @@ impl RPCOperationValueChanged {
if self.watch_id == 0 { if self.watch_id == 0 {
return Err(RPCError::protocol("ValueChanged does not have a valid id")); return Err(RPCError::protocol("ValueChanged does not have a valid id"));
} }
if self.subkeys.is_empty() && self.value.is_some() {
return Err(RPCError::protocol(
"ValueChanged with a value must have subkeys",
));
}
// further validation must be done by storage manager as this is more complicated // further validation must be done by storage manager as this is more complicated
Ok(()) Ok(())
} }
@ -69,12 +79,20 @@ impl RPCOperationValueChanged {
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn value(&self) -> &SignedValueData { pub fn value(&self) -> Option<&SignedValueData> {
&self.value self.value.as_ref()
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, u64, SignedValueData) { pub fn destructure(
self,
) -> (
TypedKey,
ValueSubkeyRangeSet,
u32,
u64,
Option<SignedValueData>,
) {
( (
self.key, self.key,
self.subkeys, self.subkeys,
@ -113,9 +131,13 @@ impl RPCOperationValueChanged {
subkeys.ranges_insert(vskr.0..=vskr.1); subkeys.ranges_insert(vskr.0..=vskr.1);
} }
let count = reader.get_count(); let count = reader.get_count();
let v_reader = reader.get_value().map_err(RPCError::protocol)?;
let watch_id = reader.get_watch_id(); let watch_id = reader.get_watch_id();
let value = decode_signed_value_data(&v_reader)?; let value = if reader.has_value() {
let v_reader = reader.get_value().map_err(RPCError::protocol)?;
Some(decode_signed_value_data(&v_reader)?)
} else {
None
};
Ok(Self { Ok(Self {
key, key,
@ -147,8 +169,10 @@ impl RPCOperationValueChanged {
builder.set_count(self.count); builder.set_count(self.count);
builder.set_watch_id(self.watch_id); builder.set_watch_id(self.watch_id);
if let Some(value) = &self.value {
let mut v_builder = builder.reborrow().init_value(); let mut v_builder = builder.reborrow().init_value();
encode_signed_value_data(&self.value, &mut v_builder)?; encode_signed_value_data(value, &mut v_builder)?;
}
Ok(()) Ok(())
} }
} }

View File

@ -15,7 +15,7 @@ impl RPCProcessor {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: SignedValueData, value: Option<SignedValueData>,
) -> RPCNetworkResult<()> { ) -> RPCNetworkResult<()> {
// Ensure destination is never using a safety route // Ensure destination is never using a safety route
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) { if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
@ -63,12 +63,16 @@ impl RPCProcessor {
}; };
if debug_target_enabled!("dht") { if debug_target_enabled!("dht") {
let debug_string_value = format!( let debug_string_value = if let Some(value) = &value {
format!(
" len={} seq={} writer={}", " len={} seq={} writer={}",
value.value_data().data().len(), value.value_data().data().len(),
value.value_data().seq(), value.value_data().seq(),
value.value_data().writer(), value.value_data().writer(),
); )
} else {
"(no value)".to_owned()
};
let debug_string_stmt = format!( let debug_string_stmt = format!(
"IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}", "IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}",
@ -91,13 +95,11 @@ impl RPCProcessor {
key, key,
subkeys, subkeys,
count, count,
Arc::new(value), value.map(Arc::new),
inbound_node_id, inbound_node_id,
watch_id, watch_id,
) )
.await .await
.map_err(RPCError::internal)?; .map_err(RPCError::internal)
Ok(NetworkResult::value(()))
} }
} }

View File

@ -41,7 +41,7 @@ struct ValueChangedInfo {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: Arc<SignedValueData>, value: Option<Arc<SignedValueData>>,
} }
struct StorageManagerUnlockedInner { struct StorageManagerUnlockedInner {
@ -890,7 +890,7 @@ impl StorageManager {
.map_err(VeilidAPIError::from)?; .map_err(VeilidAPIError::from)?;
network_result_value_or_log!(rpc_processor network_result_value_or_log!(rpc_processor
.rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, (*vc.value).clone() ) .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, vc.value.map(|v| (*v).clone()) )
.await .await
.map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {}); .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {});

View File

@ -1211,7 +1211,7 @@ where
subkeys: evci.subkeys, subkeys: evci.subkeys,
count: evci.count, count: evci.count,
watch_id: evci.watch_id, watch_id: evci.watch_id,
value, value: Some(value),
}); });
} }
} }

View File

@ -221,7 +221,7 @@ impl StorageManager {
// Make sure this value would actually be newer // Make sure this value would actually be newer
if let Some(last_value) = &last_get_result.opt_value { if let Some(last_value) = &last_get_result.opt_value {
if value.value_data().seq() <= last_value.value_data().seq() { if value.value_data().seq() <= last_value.value_data().seq() {
// inbound value is older or equal sequence number than the one we have, just return the one we have // inbound value is older than or equal to the sequence number that we have, just return the one we have
return Ok(NetworkResult::value(Some(last_value.clone()))); return Ok(NetworkResult::value(Some(last_value.clone())));
} }
} }

View File

@ -59,7 +59,7 @@ impl StorageManager {
key: *k, key: *k,
subkeys: ValueSubkeyRangeSet::new(), subkeys: ValueSubkeyRangeSet::new(),
count: 0, count: 0,
value: ValueData::default(), value: None,
}))); })));
} }
} }

View File

@ -227,27 +227,27 @@ impl StorageManager {
key: TypedKey, key: TypedKey,
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
mut count: u32, mut count: u32,
value: Arc<SignedValueData>, value: Option<Arc<SignedValueData>>,
inbound_node_id: TypedKey, inbound_node_id: TypedKey,
watch_id: u64, watch_id: u64,
) -> VeilidAPIResult<()> { ) -> VeilidAPIResult<NetworkResult<()>> {
// Update local record store with new value // Update local record store with new value
let (res, opt_update_callback) = { let (is_value_seq_newer, opt_update_callback, value) = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// Don't process update if the record is closed // Don't process update if the record is closed
let Some(opened_record) = inner.opened_records.get_mut(&key) else { let Some(opened_record) = inner.opened_records.get_mut(&key) else {
return Ok(()); return Ok(NetworkResult::value(()));
}; };
// No active watch means no callback // No active watch means no callback
let Some(mut active_watch) = opened_record.active_watch() else { let Some(mut active_watch) = opened_record.active_watch() else {
return Ok(()); return Ok(NetworkResult::value(()));
}; };
// If the watch id doesn't match, then don't process this // If the watch id doesn't match, then don't process this
if active_watch.id != watch_id { if active_watch.id != watch_id {
return Ok(()); return Ok(NetworkResult::value(()));
} }
// If the reporting node is not the same as our watch, don't process the value change // If the reporting node is not the same as our watch, don't process the value change
@ -256,7 +256,7 @@ impl StorageManager {
.node_ids() .node_ids()
.contains(&inbound_node_id) .contains(&inbound_node_id)
{ {
return Ok(()); return Ok(NetworkResult::value(()));
} }
if count > active_watch.count { if count > active_watch.count {
@ -280,8 +280,46 @@ impl StorageManager {
opened_record.set_active_watch(active_watch); opened_record.set_active_watch(active_watch);
} }
// Null out default value
let value = value.filter(|value| *value.value_data() != ValueData::default());
// Set the local value // Set the local value
let res = if let Some(first_subkey) = subkeys.first() { let mut is_value_seq_newer = false;
if let Some(value) = &value {
let Some(first_subkey) = subkeys.first() else {
apibail_internal!("should not have value without first subkey");
};
let last_get_result = inner
.handle_get_local_value(key, first_subkey, true)
.await?;
let descriptor = last_get_result.opt_descriptor.unwrap();
let schema = descriptor.schema()?;
// Validate with schema
if !schema.check_subkey_value_data(
descriptor.owner(),
first_subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(NetworkResult::invalid_message(format!(
"Schema validation failed on subkey {}",
first_subkey
)));
}
// Make sure this value would actually be newer
is_value_seq_newer = true;
if let Some(last_value) = &last_get_result.opt_value {
if value.value_data().seq() <= last_value.value_data().seq() {
// inbound value is older than or equal to the sequence number that we have, just return the one we have
is_value_seq_newer = false;
}
}
if is_value_seq_newer {
inner inner
.handle_set_local_value( .handle_set_local_value(
key, key,
@ -289,24 +327,34 @@ impl StorageManager {
value.clone(), value.clone(),
WatchUpdateMode::NoUpdate, WatchUpdateMode::NoUpdate,
) )
.await .await?;
} else { }
VeilidAPIResult::Ok(()) }
};
(res, inner.update_callback.clone()) (is_value_seq_newer, inner.update_callback.clone(), value)
}; };
// Announce ValueChanged VeilidUpdate // Announce ValueChanged VeilidUpdate
// * if the value in the update had a newer sequence number
// * if more than a single subkeys has changed
// * if the count was zero meaning cancelled
let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0;
if do_update {
if let Some(update_callback) = opt_update_callback { if let Some(update_callback) = opt_update_callback {
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key, key,
subkeys, subkeys,
count, count,
value: value.value_data().clone(), value: if is_value_seq_newer {
Some(value.unwrap().value_data().clone())
} else {
None
},
}))); })));
} }
}
res Ok(NetworkResult::value(()))
} }
} }

View File

@ -210,6 +210,6 @@ pub fn fix_veilidvaluechange() -> VeilidValueChange {
key: fix_typedkey(), key: fix_typedkey(),
subkeys: ValueSubkeyRangeSet::new(), subkeys: ValueSubkeyRangeSet::new(),
count: 5, count: 5,
value: ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()).unwrap(), value: Some(ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()).unwrap()),
} }
} }

View File

@ -101,7 +101,7 @@ pub struct VeilidValueChange {
pub key: TypedKey, pub key: TypedKey,
pub subkeys: ValueSubkeyRangeSet, pub subkeys: ValueSubkeyRangeSet,
pub count: u32, pub count: u32,
pub value: ValueData, pub value: Option<ValueData>,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]

View File

@ -19,37 +19,50 @@ class DefaultFixture {
assert(_veilidUpdateStream == null, 'should not set up fixture twice'); assert(_veilidUpdateStream == null, 'should not set up fixture twice');
final ignoreLogTargetsStr =
// ignore: do_not_use_environment
const String.fromEnvironment('IGNORE_LOG_TARGETS').trim();
final ignoreLogTargets = ignoreLogTargetsStr.isEmpty
? <String>[]
: ignoreLogTargetsStr.split(',').map((e) => e.trim()).toList();
final Map<String, dynamic> platformConfigJson; final Map<String, dynamic> platformConfigJson;
if (kIsWeb) { if (kIsWeb) {
const platformConfig = VeilidWASMConfig( final platformConfig = VeilidWASMConfig(
logging: VeilidWASMConfigLogging( logging: VeilidWASMConfigLogging(
performance: VeilidWASMConfigLoggingPerformance( performance: VeilidWASMConfigLoggingPerformance(
enabled: true, enabled: true,
level: VeilidConfigLogLevel.debug, level: VeilidConfigLogLevel.debug,
logsInTimings: true, logsInTimings: true,
logsInConsole: false, logsInConsole: false,
ignoreLogTargets: ignoreLogTargets,
), ),
api: VeilidWASMConfigLoggingApi( api: VeilidWASMConfigLoggingApi(
enabled: true, enabled: true,
level: VeilidConfigLogLevel.info, level: VeilidConfigLogLevel.info,
ignoreLogTargets: ignoreLogTargets,
))); )));
platformConfigJson = platformConfig.toJson(); platformConfigJson = platformConfig.toJson();
} else { } else {
const platformConfig = VeilidFFIConfig( final platformConfig = VeilidFFIConfig(
logging: VeilidFFIConfigLogging( logging: VeilidFFIConfigLogging(
terminal: VeilidFFIConfigLoggingTerminal( terminal: VeilidFFIConfigLoggingTerminal(
enabled: false, enabled: false,
level: VeilidConfigLogLevel.debug, level: VeilidConfigLogLevel.debug,
ignoreLogTargets: ignoreLogTargets,
), ),
otlp: VeilidFFIConfigLoggingOtlp( otlp: VeilidFFIConfigLoggingOtlp(
enabled: false, enabled: false,
level: VeilidConfigLogLevel.trace, level: VeilidConfigLogLevel.trace,
grpcEndpoint: 'localhost:4317', grpcEndpoint: 'localhost:4317',
serviceName: 'Veilid Tests', serviceName: 'Veilid Tests',
ignoreLogTargets: ignoreLogTargets,
), ),
api: VeilidFFIConfigLoggingApi( api: VeilidFFIConfigLoggingApi(
enabled: true, enabled: true,
// level: VeilidConfigLogLevel.debug,
level: VeilidConfigLogLevel.info, level: VeilidConfigLogLevel.info,
ignoreLogTargets: ignoreLogTargets,
))); )));
platformConfigJson = platformConfig.toJson(); platformConfigJson = platformConfig.toJson();
} }

View File

@ -225,14 +225,15 @@ Future<void> testOpenWriterDHTValue() async {
} }
Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async { Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
final valueChangeQueue = StreamController<VeilidUpdateValueChange>(); final valueChangeQueue =
StreamController<VeilidUpdateValueChange>.broadcast();
final valueChangeSubscription = updateStream.listen((update) { final valueChangeSubscription = updateStream.listen((update) {
if (update is VeilidUpdateValueChange) { if (update is VeilidUpdateValueChange) {
// print("valuechange: " + update.toString()); // print("valuechange: " + update.toString());
valueChangeQueue.sink.add(update); valueChangeQueue.sink.add(update);
} }
}); });
final valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream); var valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream);
try { try {
// Make two routing contexts, one with and one without safety // Make two routing contexts, one with and one without safety
@ -262,6 +263,25 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Now set the subkey and trigger an update // Now set the subkey and trigger an update
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull);
// Now we should NOT get an update because the update
// is the same as our local copy
if (await valueChangeQueueIterator
.moveNext()
.timeout(const Duration(seconds: 5), onTimeout: () {
return false;
})) {
fail("should not have a change");
}
valueChangeQueueIterator = StreamIterator(valueChangeQueue.stream);
// Now set multiple subkeys and trigger an update
expect(
await [
rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH")),
rcSet.setDHTValue(rec.key, 4, utf8.encode("BZORT"))
].wait,
equals([null, null]));
// Wait for the update // Wait for the update
await valueChangeQueueIterator await valueChangeQueueIterator
.moveNext() .moveNext()
@ -271,13 +291,10 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Verify the update // Verify the update
expect(valueChangeQueueIterator.current.key, equals(rec.key)); expect(valueChangeQueueIterator.current.key, equals(rec.key));
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFE)); expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD));
expect(valueChangeQueueIterator.current.subkeys, expect(valueChangeQueueIterator.current.subkeys,
equals([ValueSubkeyRange.single(3)])); equals([ValueSubkeyRange.make(3, 4)]));
expect(valueChangeQueueIterator.current.value.seq, equals(1)); expect(valueChangeQueueIterator.current.value, isNull);
expect(valueChangeQueueIterator.current.value.data,
equals(utf8.encode("BLAH")));
expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner));
// Reopen without closing to change routing context and not lose watch // Reopen without closing to change routing context and not lose watch
rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
@ -291,9 +308,13 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Reopen without closing to change routing context and not lose watch // Reopen without closing to change routing context and not lose watch
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
// Change our subkey // Now set multiple subkeys and trigger an update
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")), expect(
isNull); await [
rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH")),
rcSet.setDHTValue(rec.key, 5, utf8.encode("BZORT BZORT"))
].wait,
equals([null, null]));
// Wait for the update // Wait for the update
await valueChangeQueueIterator await valueChangeQueueIterator
@ -302,15 +323,12 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
fail("should have a change"); fail("should have a change");
}); });
// Verify the update // Verify the update came back but we don't get a new value because the sequence number is the same
expect(valueChangeQueueIterator.current.key, equals(rec.key)); expect(valueChangeQueueIterator.current.key, equals(rec.key));
expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFD)); expect(valueChangeQueueIterator.current.count, equals(0xFFFFFFFC));
expect(valueChangeQueueIterator.current.subkeys, expect(valueChangeQueueIterator.current.subkeys,
equals([ValueSubkeyRange.single(3)])); equals([ValueSubkeyRange.single(3), ValueSubkeyRange.single(5)]));
expect(valueChangeQueueIterator.current.value.seq, equals(2)); expect(valueChangeQueueIterator.current.value, isNull);
expect(valueChangeQueueIterator.current.value.data,
equals(utf8.encode("BLAH BLAH BLAH")));
expect(valueChangeQueueIterator.current.value.writer, equals(rec.owner));
// Reopen without closing to change routing context and not lose watch // Reopen without closing to change routing context and not lose watch
rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); rec = await rcWatch.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
@ -324,8 +342,13 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Reopen without closing to change routing context and not lose watch // Reopen without closing to change routing context and not lose watch
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
// Set the value without a watch // Now set multiple subkeys and trigger an update
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); expect(
await [
rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH BLAH BLAH BLAH")),
rcSet.setDHTValue(rec.key, 5, utf8.encode("BZORT BZORT BZORT"))
].wait,
equals([null, null]));
// Now we should NOT get an update // Now we should NOT get an update
if (await valueChangeQueueIterator if (await valueChangeQueueIterator

View File

@ -174,7 +174,7 @@ sealed class VeilidUpdate with _$VeilidUpdate {
required TypedKey key, required TypedKey key,
required List<ValueSubkeyRange> subkeys, required List<ValueSubkeyRange> subkeys,
required int count, required int count,
required ValueData value, required ValueData? value,
}) = VeilidUpdateValueChange; }) = VeilidUpdateValueChange;
factory VeilidUpdate.fromJson(dynamic json) => factory VeilidUpdate.fromJson(dynamic json) =>

View File

@ -1360,7 +1360,7 @@ mixin _$VeilidUpdate {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) => }) =>
throw _privateConstructorUsedError; throw _privateConstructorUsedError;
@ -1388,7 +1388,7 @@ mixin _$VeilidUpdate {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) => }) =>
throw _privateConstructorUsedError; throw _privateConstructorUsedError;
@ -1416,7 +1416,7 @@ mixin _$VeilidUpdate {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) => }) =>
@ -1598,7 +1598,7 @@ class _$VeilidLogImpl implements VeilidLog {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return log(logLevel, message, backtrace); return log(logLevel, message, backtrace);
@ -1629,7 +1629,7 @@ class _$VeilidLogImpl implements VeilidLog {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return log?.call(logLevel, message, backtrace); return log?.call(logLevel, message, backtrace);
@ -1660,7 +1660,7 @@ class _$VeilidLogImpl implements VeilidLog {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -1867,7 +1867,7 @@ class _$VeilidAppMessageImpl implements VeilidAppMessage {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return appMessage(message, sender, routeId); return appMessage(message, sender, routeId);
@ -1898,7 +1898,7 @@ class _$VeilidAppMessageImpl implements VeilidAppMessage {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return appMessage?.call(message, sender, routeId); return appMessage?.call(message, sender, routeId);
@ -1929,7 +1929,7 @@ class _$VeilidAppMessageImpl implements VeilidAppMessage {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -2146,7 +2146,7 @@ class _$VeilidAppCallImpl implements VeilidAppCall {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return appCall(message, callId, sender, routeId); return appCall(message, callId, sender, routeId);
@ -2177,7 +2177,7 @@ class _$VeilidAppCallImpl implements VeilidAppCall {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return appCall?.call(message, callId, sender, routeId); return appCall?.call(message, callId, sender, routeId);
@ -2208,7 +2208,7 @@ class _$VeilidAppCallImpl implements VeilidAppCall {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -2421,7 +2421,7 @@ class _$VeilidUpdateAttachmentImpl implements VeilidUpdateAttachment {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return attachment(state, publicInternetReady, localNetworkReady); return attachment(state, publicInternetReady, localNetworkReady);
@ -2452,7 +2452,7 @@ class _$VeilidUpdateAttachmentImpl implements VeilidUpdateAttachment {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return attachment?.call(state, publicInternetReady, localNetworkReady); return attachment?.call(state, publicInternetReady, localNetworkReady);
@ -2483,7 +2483,7 @@ class _$VeilidUpdateAttachmentImpl implements VeilidUpdateAttachment {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -2702,7 +2702,7 @@ class _$VeilidUpdateNetworkImpl implements VeilidUpdateNetwork {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return network(started, bpsDown, bpsUp, peers); return network(started, bpsDown, bpsUp, peers);
@ -2733,7 +2733,7 @@ class _$VeilidUpdateNetworkImpl implements VeilidUpdateNetwork {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return network?.call(started, bpsDown, bpsUp, peers); return network?.call(started, bpsDown, bpsUp, peers);
@ -2764,7 +2764,7 @@ class _$VeilidUpdateNetworkImpl implements VeilidUpdateNetwork {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -2958,7 +2958,7 @@ class _$VeilidUpdateConfigImpl implements VeilidUpdateConfig {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return config(this.config); return config(this.config);
@ -2989,7 +2989,7 @@ class _$VeilidUpdateConfigImpl implements VeilidUpdateConfig {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return config?.call(this.config); return config?.call(this.config);
@ -3020,7 +3020,7 @@ class _$VeilidUpdateConfigImpl implements VeilidUpdateConfig {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -3230,7 +3230,7 @@ class _$VeilidUpdateRouteChangeImpl implements VeilidUpdateRouteChange {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return routeChange(deadRoutes, deadRemoteRoutes); return routeChange(deadRoutes, deadRemoteRoutes);
@ -3261,7 +3261,7 @@ class _$VeilidUpdateRouteChangeImpl implements VeilidUpdateRouteChange {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return routeChange?.call(deadRoutes, deadRemoteRoutes); return routeChange?.call(deadRoutes, deadRemoteRoutes);
@ -3292,7 +3292,7 @@ class _$VeilidUpdateRouteChangeImpl implements VeilidUpdateRouteChange {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -3386,9 +3386,9 @@ abstract class _$$VeilidUpdateValueChangeImplCopyWith<$Res> {
{Typed<FixedEncodedString43> key, {Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, List<ValueSubkeyRange> subkeys,
int count, int count,
ValueData value}); ValueData? value});
$ValueDataCopyWith<$Res> get value; $ValueDataCopyWith<$Res>? get value;
} }
/// @nodoc /// @nodoc
@ -3406,7 +3406,7 @@ class __$$VeilidUpdateValueChangeImplCopyWithImpl<$Res>
Object? key = null, Object? key = null,
Object? subkeys = null, Object? subkeys = null,
Object? count = null, Object? count = null,
Object? value = null, Object? value = freezed,
}) { }) {
return _then(_$VeilidUpdateValueChangeImpl( return _then(_$VeilidUpdateValueChangeImpl(
key: null == key key: null == key
@ -3421,17 +3421,21 @@ class __$$VeilidUpdateValueChangeImplCopyWithImpl<$Res>
? _value.count ? _value.count
: count // ignore: cast_nullable_to_non_nullable : count // ignore: cast_nullable_to_non_nullable
as int, as int,
value: null == value value: freezed == value
? _value.value ? _value.value
: value // ignore: cast_nullable_to_non_nullable : value // ignore: cast_nullable_to_non_nullable
as ValueData, as ValueData?,
)); ));
} }
@override @override
@pragma('vm:prefer-inline') @pragma('vm:prefer-inline')
$ValueDataCopyWith<$Res> get value { $ValueDataCopyWith<$Res>? get value {
return $ValueDataCopyWith<$Res>(_value.value, (value) { if (_value.value == null) {
return null;
}
return $ValueDataCopyWith<$Res>(_value.value!, (value) {
return _then(_value.copyWith(value: value)); return _then(_value.copyWith(value: value));
}); });
} }
@ -3465,7 +3469,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange {
@override @override
final int count; final int count;
@override @override
final ValueData value; final ValueData? value;
@JsonKey(name: 'kind') @JsonKey(name: 'kind')
final String $type; final String $type;
@ -3526,7 +3530,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange {
List<String> deadRoutes, List<String> deadRemoteRoutes) List<String> deadRoutes, List<String> deadRemoteRoutes)
routeChange, routeChange,
required TResult Function(Typed<FixedEncodedString43> key, required TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value) List<ValueSubkeyRange> subkeys, int count, ValueData? value)
valueChange, valueChange,
}) { }) {
return valueChange(key, subkeys, count, value); return valueChange(key, subkeys, count, value);
@ -3557,7 +3561,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange {
TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult? Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult? Function(Typed<FixedEncodedString43> key, TResult? Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
}) { }) {
return valueChange?.call(key, subkeys, count, value); return valueChange?.call(key, subkeys, count, value);
@ -3588,7 +3592,7 @@ class _$VeilidUpdateValueChangeImpl implements VeilidUpdateValueChange {
TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)? TResult Function(List<String> deadRoutes, List<String> deadRemoteRoutes)?
routeChange, routeChange,
TResult Function(Typed<FixedEncodedString43> key, TResult Function(Typed<FixedEncodedString43> key,
List<ValueSubkeyRange> subkeys, int count, ValueData value)? List<ValueSubkeyRange> subkeys, int count, ValueData? value)?
valueChange, valueChange,
required TResult orElse(), required TResult orElse(),
}) { }) {
@ -3660,7 +3664,7 @@ abstract class VeilidUpdateValueChange implements VeilidUpdate {
{required final Typed<FixedEncodedString43> key, {required final Typed<FixedEncodedString43> key,
required final List<ValueSubkeyRange> subkeys, required final List<ValueSubkeyRange> subkeys,
required final int count, required final int count,
required final ValueData value}) = _$VeilidUpdateValueChangeImpl; required final ValueData? value}) = _$VeilidUpdateValueChangeImpl;
factory VeilidUpdateValueChange.fromJson(Map<String, dynamic> json) = factory VeilidUpdateValueChange.fromJson(Map<String, dynamic> json) =
_$VeilidUpdateValueChangeImpl.fromJson; _$VeilidUpdateValueChangeImpl.fromJson;
@ -3668,7 +3672,7 @@ abstract class VeilidUpdateValueChange implements VeilidUpdate {
Typed<FixedEncodedString43> get key; Typed<FixedEncodedString43> get key;
List<ValueSubkeyRange> get subkeys; List<ValueSubkeyRange> get subkeys;
int get count; int get count;
ValueData get value; ValueData? get value;
@JsonKey(ignore: true) @JsonKey(ignore: true)
_$$VeilidUpdateValueChangeImplCopyWith<_$VeilidUpdateValueChangeImpl> _$$VeilidUpdateValueChangeImplCopyWith<_$VeilidUpdateValueChangeImpl>
get copyWith => throw _privateConstructorUsedError; get copyWith => throw _privateConstructorUsedError;

View File

@ -255,7 +255,7 @@ _$VeilidUpdateValueChangeImpl _$$VeilidUpdateValueChangeImplFromJson(
.map(ValueSubkeyRange.fromJson) .map(ValueSubkeyRange.fromJson)
.toList(), .toList(),
count: json['count'] as int, count: json['count'] as int,
value: ValueData.fromJson(json['value']), value: json['value'] == null ? null : ValueData.fromJson(json['value']),
$type: json['kind'] as String?, $type: json['kind'] as String?,
); );
@ -265,7 +265,7 @@ Map<String, dynamic> _$$VeilidUpdateValueChangeImplToJson(
'key': instance.key.toJson(), 'key': instance.key.toJson(),
'subkeys': instance.subkeys.map((e) => e.toJson()).toList(), 'subkeys': instance.subkeys.map((e) => e.toJson()).toList(),
'count': instance.count, 'count': instance.count,
'value': instance.value.toJson(), 'value': instance.value?.toJson(),
'kind': instance.$type, 'kind': instance.$type,
}; };

View File

@ -248,16 +248,26 @@ async def test_watch_dht_values():
vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH") vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH")
assert vd == None assert vd == None
# Now we should NOT get an update because the update is the same as our local copy
update = None
try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=5)
except asyncio.TimeoutError:
pass
assert update == None
# Now set multiple subkeys and trigger an update
vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH"), rcSet.set_dht_value(rec.key, 4, b"BZORT")])
assert vd == [None, None]
# Wait for the update # Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) upd = await asyncio.wait_for(value_change_queue.get(), timeout=5)
# Verify the update # Verify the update came back but we don't get a new value because the sequence number is the same
assert upd.detail.key == rec.key assert upd.detail.key == rec.key
assert upd.detail.count == 0xFFFFFFFE assert upd.detail.count == 0xFFFFFFFD
assert upd.detail.subkeys == [(3,3)] assert upd.detail.subkeys == [(3, 4)]
assert upd.detail.value.seq == 1 assert upd.detail.value == None
assert upd.detail.value.data == b"BLAH"
assert upd.detail.value.writer == rec.owner
# Reopen without closing to change routing context and not lose watch # Reopen without closing to change routing context and not lose watch
rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair())
@ -269,20 +279,18 @@ async def test_watch_dht_values():
# Reopen without closing to change routing context and not lose watch # Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Change our subkey # Now set multiple subkeys and trigger an update
vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, 5, b"BZORT BZORT")])
assert vd == None assert vd == [None, None]
# Wait for the update # Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) upd = await asyncio.wait_for(value_change_queue.get(), timeout=5)
# Verify the update # Verify the update came back but we don't get a new value because the sequence number is the same
assert upd.detail.key == rec.key assert upd.detail.key == rec.key
assert upd.detail.count == 0xFFFFFFFD assert upd.detail.count == 0xFFFFFFFC
assert upd.detail.subkeys == [(3,3)] assert upd.detail.subkeys == [(3, 3), (5, 5)]
assert upd.detail.value.seq == 2 assert upd.detail.value == None
assert upd.detail.value.data == b"BLAH BLAH BLAH"
assert upd.detail.value.writer == rec.owner
# Reopen without closing to change routing context and not lose watch # Reopen without closing to change routing context and not lose watch
rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair())
@ -294,9 +302,9 @@ async def test_watch_dht_values():
# Reopen without closing to change routing context and not lose watch # Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Set the value without a watch # Now set multiple subkeys
vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH") vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, 5, b"BZORT BZORT BZORT")])
assert vd == None assert vd == [None, None]
# Now we should NOT get an update # Now we should NOT get an update
update = None update = None

View File

@ -2673,8 +2673,7 @@
"count", "count",
"key", "key",
"kind", "kind",
"subkeys", "subkeys"
"value"
], ],
"properties": { "properties": {
"count": { "count": {
@ -2712,7 +2711,14 @@
} }
}, },
"value": { "value": {
"anyOf": [
{
"$ref": "#/definitions/ValueData" "$ref": "#/definitions/ValueData"
},
{
"type": "null"
}
]
} }
} }
}, },

View File

@ -358,9 +358,9 @@ class VeilidValueChange:
key: TypedKey key: TypedKey
subkeys: list[tuple[ValueSubkey, ValueSubkey]] subkeys: list[tuple[ValueSubkey, ValueSubkey]]
count: int count: int
value: ValueData value: Optional[ValueData]
def __init__(self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], count: int, value: ValueData): def __init__(self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], count: int, value: Optional[ValueData]):
self.key = key self.key = key
self.subkeys = subkeys self.subkeys = subkeys
self.count = count self.count = count
@ -373,7 +373,7 @@ class VeilidValueChange:
TypedKey(j["key"]), TypedKey(j["key"]),
[(p[0], p[1]) for p in j["subkeys"]], [(p[0], p[1]) for p in j["subkeys"]],
j["count"], j["count"],
ValueData.from_json(j["value"]), None if j["value"] is None else ValueData.from_json(j["value"]),
) )