diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 9ea6dea3..19158696 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -385,7 +385,7 @@ struct OperationValueChanged @0xd1c59ebdd8cc1bf6 { subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, if empty this is a watch expiration notice) count @2 :UInt32; # remaining changes left (0 means watch has expired) watchId @3 :UInt64; # watch id this value change came from - value @4 :SignedValueData; # first value that changed (the rest can be gotten with getvalue) + value @4 :SignedValueData; # Optional: first value that changed (the rest can be gotten with getvalue) } struct OperationSupplyBlockQ @0xadbf4c542d749971 { diff --git a/veilid-core/proto/veilid_capnp.rs b/veilid-core/proto/veilid_capnp.rs index 64ed7421..b887ac54 100644 --- a/veilid-core/proto/veilid_capnp.rs +++ b/veilid-core/proto/veilid_capnp.rs @@ -22084,4 +22084,4 @@ pub mod operation { } } -//BUILDHASH:6df0786a4485a9d0c25c177959fe1b2070c2a98884d20267eb70e129be95f2b0 +//BUILDHASH:a28970f74fcf7989d47e1457e848e5de33d8b7a4003a0e439c95f442ecf69fd3 diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index 3340ce2f..34fe48ea 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -9,7 +9,7 @@ pub(in crate::rpc_processor) struct RPCOperationValueChanged { subkeys: ValueSubkeyRangeSet, count: u32, watch_id: u64, - value: SignedValueData, + value: Option, } impl RPCOperationValueChanged { @@ -19,7 +19,7 @@ impl RPCOperationValueChanged { subkeys: ValueSubkeyRangeSet, count: u32, watch_id: u64, - value: SignedValueData, + value: Option, ) -> Result { if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { return Err(RPCError::protocol( @@ -30,6 +30,11 @@ impl RPCOperationValueChanged { if watch_id == 0 { return Err(RPCError::protocol("ValueChanged needs a nonzero watch id")); } + if subkeys.is_empty() && value.is_some() { + return Err(RPCError::protocol( + "ValueChanged with a value must have subkeys", + )); + } Ok(Self { key, @@ -44,6 +49,11 @@ impl RPCOperationValueChanged { if self.watch_id == 0 { return Err(RPCError::protocol("ValueChanged does not have a valid id")); } + if self.subkeys.is_empty() && self.value.is_some() { + return Err(RPCError::protocol( + "ValueChanged with a value must have subkeys", + )); + } // further validation must be done by storage manager as this is more complicated Ok(()) } @@ -69,12 +79,20 @@ impl RPCOperationValueChanged { } #[allow(dead_code)] - pub fn value(&self) -> &SignedValueData { - &self.value + pub fn value(&self) -> Option<&SignedValueData> { + self.value.as_ref() } #[allow(dead_code)] - pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, u32, u64, SignedValueData) { + pub fn destructure( + self, + ) -> ( + TypedKey, + ValueSubkeyRangeSet, + u32, + u64, + Option, + ) { ( self.key, self.subkeys, @@ -113,9 +131,13 @@ impl RPCOperationValueChanged { subkeys.ranges_insert(vskr.0..=vskr.1); } let count = reader.get_count(); - let v_reader = reader.get_value().map_err(RPCError::protocol)?; let watch_id = reader.get_watch_id(); - let value = decode_signed_value_data(&v_reader)?; + let value = if reader.has_value() { + let v_reader = reader.get_value().map_err(RPCError::protocol)?; + Some(decode_signed_value_data(&v_reader)?) + } else { + None + }; Ok(Self { key, @@ -147,8 +169,10 @@ impl RPCOperationValueChanged { builder.set_count(self.count); builder.set_watch_id(self.watch_id); - let mut v_builder = builder.reborrow().init_value(); - encode_signed_value_data(&self.value, &mut v_builder)?; + if let Some(value) = &self.value { + let mut v_builder = builder.reborrow().init_value(); + encode_signed_value_data(value, &mut v_builder)?; + } Ok(()) } } diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 8a5aa114..0fe59913 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -15,7 +15,7 @@ impl RPCProcessor { subkeys: ValueSubkeyRangeSet, count: u32, watch_id: u64, - value: SignedValueData, + value: Option, ) -> RPCNetworkResult<()> { // Ensure destination is never using a safety route if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) { @@ -63,12 +63,16 @@ impl RPCProcessor { }; if debug_target_enabled!("dht") { - let debug_string_value = format!( - " len={} seq={} writer={}", - value.value_data().data().len(), - value.value_data().seq(), - value.value_data().writer(), - ); + let debug_string_value = if let Some(value) = &value { + format!( + " len={} seq={} writer={}", + value.value_data().data().len(), + value.value_data().seq(), + value.value_data().writer(), + ) + } else { + "(no value)".to_owned() + }; let debug_string_stmt = format!( "IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}", @@ -91,13 +95,11 @@ impl RPCProcessor { key, subkeys, count, - Arc::new(value), + value.map(Arc::new), inbound_node_id, watch_id, ) .await - .map_err(RPCError::internal)?; - - Ok(NetworkResult::value(())) + .map_err(RPCError::internal) } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 79724517..761010b4 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -41,7 +41,7 @@ struct ValueChangedInfo { subkeys: ValueSubkeyRangeSet, count: u32, watch_id: u64, - value: Arc, + value: Option>, } struct StorageManagerUnlockedInner { @@ -890,7 +890,7 @@ impl StorageManager { .map_err(VeilidAPIError::from)?; network_result_value_or_log!(rpc_processor - .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, (*vc.value).clone() ) + .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, vc.watch_id, vc.value.map(|v| (*v).clone()) ) .await .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {}); diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index 1214c6f2..3e30cced 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -1211,7 +1211,7 @@ where subkeys: evci.subkeys, count: evci.count, watch_id: evci.watch_id, - value, + value: Some(value), }); } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 3db4a86d..86b3440d 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -227,27 +227,27 @@ impl StorageManager { key: TypedKey, subkeys: ValueSubkeyRangeSet, mut count: u32, - value: Arc, + value: Option>, inbound_node_id: TypedKey, watch_id: u64, - ) -> VeilidAPIResult<()> { + ) -> VeilidAPIResult> { // Update local record store with new value - let (is_value_seq_newer, opt_update_callback) = { + let (is_value_seq_newer, opt_update_callback, value) = { let mut inner = self.lock().await?; // Don't process update if the record is closed let Some(opened_record) = inner.opened_records.get_mut(&key) else { - return Ok(()); + return Ok(NetworkResult::value(())); }; // No active watch means no callback let Some(mut active_watch) = opened_record.active_watch() else { - return Ok(()); + return Ok(NetworkResult::value(())); }; // If the watch id doesn't match, then don't process this if active_watch.id != watch_id { - return Ok(()); + return Ok(NetworkResult::value(())); } // If the reporting node is not the same as our watch, don't process the value change @@ -256,7 +256,7 @@ impl StorageManager { .node_ids() .contains(&inbound_node_id) { - return Ok(()); + return Ok(NetworkResult::value(())); } if count > active_watch.count { @@ -280,13 +280,37 @@ impl StorageManager { opened_record.set_active_watch(active_watch); } + // Null out default value + let value = value.filter(|value| *value.value_data() != ValueData::default()); + // Set the local value let mut is_value_seq_newer = false; - if let Some(first_subkey) = subkeys.first() { + if let Some(value) = &value { + let Some(first_subkey) = subkeys.first() else { + apibail_internal!("should not have value without first subkey"); + }; + let last_get_result = inner - .handle_get_local_value(key, first_subkey, false) + .handle_get_local_value(key, first_subkey, true) .await?; + let descriptor = last_get_result.opt_descriptor.unwrap(); + let schema = descriptor.schema()?; + + // Validate with schema + if !schema.check_subkey_value_data( + descriptor.owner(), + first_subkey, + value.value_data(), + ) { + // Validation failed, ignore this value + // Move to the next node + return Ok(NetworkResult::invalid_message(format!( + "Schema validation failed on subkey {}", + first_subkey + ))); + } + // Make sure this value would actually be newer is_value_seq_newer = true; if let Some(last_value) = &last_get_result.opt_value { @@ -307,7 +331,7 @@ impl StorageManager { } } - (is_value_seq_newer, inner.update_callback.clone()) + (is_value_seq_newer, inner.update_callback.clone(), value) }; // Announce ValueChanged VeilidUpdate @@ -323,7 +347,7 @@ impl StorageManager { subkeys, count, value: if is_value_seq_newer { - Some(value.value_data().clone()) + Some(value.unwrap().value_data().clone()) } else { None }, @@ -331,6 +355,6 @@ impl StorageManager { } } - Ok(()) + Ok(NetworkResult::value(())) } }