allow value changed data to be optional in rpc schema

This commit is contained in:
Christien Rioux 2024-03-31 21:35:52 -04:00
parent 6e1439306a
commit ca9fec75d2
7 changed files with 87 additions and 37 deletions

View file

@ -385,7 +385,7 @@ struct OperationValueChanged @0xd1c59ebdd8cc1bf6 {
subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, if empty this is a watch expiration notice) subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, if empty this is a watch expiration notice)
count @2 :UInt32; # remaining changes left (0 means watch has expired) count @2 :UInt32; # remaining changes left (0 means watch has expired)
watchId @3 :UInt64; # watch id this value change came from watchId @3 :UInt64; # watch id this value change came from
value @4 :SignedValueData; # first value that changed (the rest can be gotten with getvalue) value @4 :SignedValueData; # Optional: first value that changed (the rest can be gotten with getvalue)
} }
struct OperationSupplyBlockQ @0xadbf4c542d749971 { struct OperationSupplyBlockQ @0xadbf4c542d749971 {

View file

@ -22084,4 +22084,4 @@ pub mod operation {
} }
} }
//BUILDHASH:6df0786a4485a9d0c25c177959fe1b2070c2a98884d20267eb70e129be95f2b0 //BUILDHASH:a28970f74fcf7989d47e1457e848e5de33d8b7a4003a0e439c95f442ecf69fd3

View file

@ -9,7 +9,7 @@ pub(in crate::rpc_processor) struct RPCOperationValueChanged {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: SignedValueData, value: Option<SignedValueData>,
} }
impl RPCOperationValueChanged { impl RPCOperationValueChanged {
@ -19,7 +19,7 @@ impl RPCOperationValueChanged {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: SignedValueData, value: Option<SignedValueData>,
) -> Result<Self, RPCError> { ) -> Result<Self, RPCError> {
if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN {
return Err(RPCError::protocol( return Err(RPCError::protocol(
@ -30,6 +30,11 @@ impl RPCOperationValueChanged {
if watch_id == 0 { if watch_id == 0 {
return Err(RPCError::protocol("ValueChanged needs a nonzero watch id")); return Err(RPCError::protocol("ValueChanged needs a nonzero watch id"));
} }
if subkeys.is_empty() && value.is_some() {
return Err(RPCError::protocol(
"ValueChanged with a value must have subkeys",
));
}
Ok(Self { Ok(Self {
key, key,
@ -44,6 +49,11 @@ impl RPCOperationValueChanged {
if self.watch_id == 0 { if self.watch_id == 0 {
return Err(RPCError::protocol("ValueChanged does not have a valid id")); return Err(RPCError::protocol("ValueChanged does not have a valid id"));
} }
if self.subkeys.is_empty() && self.value.is_some() {
return Err(RPCError::protocol(
"ValueChanged with a value must have subkeys",
));
}
// further validation must be done by storage manager as this is more complicated // further validation must be done by storage manager as this is more complicated
Ok(()) Ok(())
} }
@ -69,12 +79,20 @@ impl RPCOperationValueChanged {
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn value(&self) -> &SignedValueData { pub fn value(&self) -> Option<&SignedValueData> {
&self.value self.value.as_ref()
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, u64, SignedValueData) { pub fn destructure(
self,
) -> (
TypedKey,
ValueSubkeyRangeSet,
u32,
u64,
Option<SignedValueData>,
) {
( (
self.key, self.key,
self.subkeys, self.subkeys,
@ -113,9 +131,13 @@ impl RPCOperationValueChanged {
subkeys.ranges_insert(vskr.0..=vskr.1); subkeys.ranges_insert(vskr.0..=vskr.1);
} }
let count = reader.get_count(); let count = reader.get_count();
let v_reader = reader.get_value().map_err(RPCError::protocol)?;
let watch_id = reader.get_watch_id(); let watch_id = reader.get_watch_id();
let value = decode_signed_value_data(&v_reader)?; let value = if reader.has_value() {
let v_reader = reader.get_value().map_err(RPCError::protocol)?;
Some(decode_signed_value_data(&v_reader)?)
} else {
None
};
Ok(Self { Ok(Self {
key, key,
@ -147,8 +169,10 @@ impl RPCOperationValueChanged {
builder.set_count(self.count); builder.set_count(self.count);
builder.set_watch_id(self.watch_id); builder.set_watch_id(self.watch_id);
if let Some(value) = &self.value {
let mut v_builder = builder.reborrow().init_value(); let mut v_builder = builder.reborrow().init_value();
encode_signed_value_data(&self.value, &mut v_builder)?; encode_signed_value_data(value, &mut v_builder)?;
}
Ok(()) Ok(())
} }
} }

View file

@ -15,7 +15,7 @@ impl RPCProcessor {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: SignedValueData, value: Option<SignedValueData>,
) -> RPCNetworkResult<()> { ) -> RPCNetworkResult<()> {
// Ensure destination is never using a safety route // Ensure destination is never using a safety route
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) { if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
@ -63,12 +63,16 @@ impl RPCProcessor {
}; };
if debug_target_enabled!("dht") { if debug_target_enabled!("dht") {
let debug_string_value = format!( let debug_string_value = if let Some(value) = &value {
format!(
" len={} seq={} writer={}", " len={} seq={} writer={}",
value.value_data().data().len(), value.value_data().data().len(),
value.value_data().seq(), value.value_data().seq(),
value.value_data().writer(), value.value_data().writer(),
); )
} else {
"(no value)".to_owned()
};
let debug_string_stmt = format!( let debug_string_stmt = format!(
"IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}", "IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}",
@ -91,13 +95,11 @@ impl RPCProcessor {
key, key,
subkeys, subkeys,
count, count,
Arc::new(value), value.map(Arc::new),
inbound_node_id, inbound_node_id,
watch_id, watch_id,
) )
.await .await
.map_err(RPCError::internal)?; .map_err(RPCError::internal)
Ok(NetworkResult::value(()))
} }
} }

View file

@ -41,7 +41,7 @@ struct ValueChangedInfo {
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,
watch_id: u64, watch_id: u64,
value: Arc<SignedValueData>, value: Option<Arc<SignedValueData>>,
} }
struct StorageManagerUnlockedInner { struct StorageManagerUnlockedInner {
@ -890,7 +890,7 @@ impl StorageManager {
.map_err(VeilidAPIError::from)?; .map_err(VeilidAPIError::from)?;
network_result_value_or_log!(rpc_processor network_result_value_or_log!(rpc_processor
.rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, (*vc.value).clone() ) .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, vc.value.map(|v| (*v).clone()) )
.await .await
.map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {}); .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {});

View file

@ -1211,7 +1211,7 @@ where
subkeys: evci.subkeys, subkeys: evci.subkeys,
count: evci.count, count: evci.count,
watch_id: evci.watch_id, watch_id: evci.watch_id,
value, value: Some(value),
}); });
} }
} }

View file

@ -227,27 +227,27 @@ impl StorageManager {
key: TypedKey, key: TypedKey,
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
mut count: u32, mut count: u32,
value: Arc<SignedValueData>, value: Option<Arc<SignedValueData>>,
inbound_node_id: TypedKey, inbound_node_id: TypedKey,
watch_id: u64, watch_id: u64,
) -> VeilidAPIResult<()> { ) -> VeilidAPIResult<NetworkResult<()>> {
// Update local record store with new value // Update local record store with new value
let (is_value_seq_newer, opt_update_callback) = { let (is_value_seq_newer, opt_update_callback, value) = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// Don't process update if the record is closed // Don't process update if the record is closed
let Some(opened_record) = inner.opened_records.get_mut(&key) else { let Some(opened_record) = inner.opened_records.get_mut(&key) else {
return Ok(()); return Ok(NetworkResult::value(()));
}; };
// No active watch means no callback // No active watch means no callback
let Some(mut active_watch) = opened_record.active_watch() else { let Some(mut active_watch) = opened_record.active_watch() else {
return Ok(()); return Ok(NetworkResult::value(()));
}; };
// If the watch id doesn't match, then don't process this // If the watch id doesn't match, then don't process this
if active_watch.id != watch_id { if active_watch.id != watch_id {
return Ok(()); return Ok(NetworkResult::value(()));
} }
// If the reporting node is not the same as our watch, don't process the value change // If the reporting node is not the same as our watch, don't process the value change
@ -256,7 +256,7 @@ impl StorageManager {
.node_ids() .node_ids()
.contains(&inbound_node_id) .contains(&inbound_node_id)
{ {
return Ok(()); return Ok(NetworkResult::value(()));
} }
if count > active_watch.count { if count > active_watch.count {
@ -280,13 +280,37 @@ impl StorageManager {
opened_record.set_active_watch(active_watch); opened_record.set_active_watch(active_watch);
} }
// Null out default value
let value = value.filter(|value| *value.value_data() != ValueData::default());
// Set the local value // Set the local value
let mut is_value_seq_newer = false; let mut is_value_seq_newer = false;
if let Some(first_subkey) = subkeys.first() { if let Some(value) = &value {
let Some(first_subkey) = subkeys.first() else {
apibail_internal!("should not have value without first subkey");
};
let last_get_result = inner let last_get_result = inner
.handle_get_local_value(key, first_subkey, false) .handle_get_local_value(key, first_subkey, true)
.await?; .await?;
let descriptor = last_get_result.opt_descriptor.unwrap();
let schema = descriptor.schema()?;
// Validate with schema
if !schema.check_subkey_value_data(
descriptor.owner(),
first_subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(NetworkResult::invalid_message(format!(
"Schema validation failed on subkey {}",
first_subkey
)));
}
// Make sure this value would actually be newer // Make sure this value would actually be newer
is_value_seq_newer = true; is_value_seq_newer = true;
if let Some(last_value) = &last_get_result.opt_value { if let Some(last_value) = &last_get_result.opt_value {
@ -307,7 +331,7 @@ impl StorageManager {
} }
} }
(is_value_seq_newer, inner.update_callback.clone()) (is_value_seq_newer, inner.update_callback.clone(), value)
}; };
// Announce ValueChanged VeilidUpdate // Announce ValueChanged VeilidUpdate
@ -323,7 +347,7 @@ impl StorageManager {
subkeys, subkeys,
count, count,
value: if is_value_seq_newer { value: if is_value_seq_newer {
Some(value.value_data().clone()) Some(value.unwrap().value_data().clone())
} else { } else {
None None
}, },
@ -331,6 +355,6 @@ impl StorageManager {
} }
} }
Ok(()) Ok(NetworkResult::value(()))
} }
} }