first stab at inspectvalue

This commit is contained in:
Christien Rioux 2024-03-09 22:22:25 -05:00
parent ca65d12836
commit 9bccb59f77
25 changed files with 492 additions and 260 deletions

View File

@ -113,6 +113,11 @@ impl RPCOperationGetValueA {
// Validate descriptor // Validate descriptor
if let Some(descriptor) = &self.descriptor { if let Some(descriptor) = &self.descriptor {
// Ensure the descriptor itself validates
descriptor
.validate(get_value_context.vcrypto.clone())
.map_err(RPCError::protocol)?;
// Ensure descriptor matches last one // Ensure descriptor matches last one
if let Some(last_descriptor) = &get_value_context.last_descriptor { if let Some(last_descriptor) = &get_value_context.last_descriptor {
if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal {
@ -121,16 +126,16 @@ impl RPCOperationGetValueA {
)); ));
} }
} }
// Ensure the descriptor itself validates
descriptor
.validate(get_value_context.vcrypto.clone())
.map_err(RPCError::protocol)?;
} }
// Ensure the value validates // Ensure the value validates
if let Some(value) = &self.value { if let Some(value) = &self.value {
// Get descriptor to validate with // Get descriptor to validate with
let Some(descriptor) = self.descriptor.or(get_value_context.last_descriptor) else { let Some(descriptor) = self
.descriptor
.as_ref()
.or(get_value_context.last_descriptor.as_ref())
else {
return Err(RPCError::protocol( return Err(RPCError::protocol(
"no last descriptor, requires a descriptor", "no last descriptor, requires a descriptor",
)); ));

View File

@ -175,6 +175,11 @@ impl RPCOperationInspectValueA {
// Validate descriptor // Validate descriptor
if let Some(descriptor) = &self.descriptor { if let Some(descriptor) = &self.descriptor {
// Ensure the descriptor itself validates
descriptor
.validate(inspect_value_context.vcrypto.clone())
.map_err(RPCError::protocol)?;
// Ensure descriptor matches last one // Ensure descriptor matches last one
if let Some(last_descriptor) = &inspect_value_context.last_descriptor { if let Some(last_descriptor) = &inspect_value_context.last_descriptor {
if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal {
@ -183,10 +188,6 @@ impl RPCOperationInspectValueA {
)); ));
} }
} }
// Ensure the descriptor itself validates
descriptor
.validate(inspect_value_context.vcrypto.clone())
.map_err(RPCError::protocol)?;
} }
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());

View File

@ -141,13 +141,13 @@ impl RPCOperationSetValueA {
panic!("Wrong context type for SetValueA"); panic!("Wrong context type for SetValueA");
}; };
if let Some(value) = &self.value {
// Ensure the descriptor itself validates // Ensure the descriptor itself validates
set_value_context set_value_context
.descriptor .descriptor
.validate(set_value_context.vcrypto.clone()) .validate(set_value_context.vcrypto.clone())
.map_err(RPCError::protocol)?; .map_err(RPCError::protocol)?;
if let Some(value) = &self.value {
// And the signed value data // And the signed value data
value value
.validate( .validate(

View File

@ -21,9 +21,7 @@ impl RPCOperationValueChanged {
watch_id: u64, watch_id: u64,
value: SignedValueData, value: SignedValueData,
) -> Result<Self, RPCError> { ) -> Result<Self, RPCError> {
let subkeys_len = subkeys.ranges_len() as usize; if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN {
if subkeys_len > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN {
return Err(RPCError::protocol( return Err(RPCError::protocol(
"ValueChanged subkey ranges length too long", "ValueChanged subkey ranges length too long",
)); ));

View File

@ -234,7 +234,7 @@ impl RPCProcessor {
let c = self.config.get(); let c = self.config.get();
c.network.dht.set_value_count as usize c.network.dht.set_value_count as usize
}; };
let (subkey_result_value, subkey_result_descriptor) = if closer_to_key_peers.len() >= set_value_count { let (get_result_value, get_result_descriptor) = if closer_to_key_peers.len() >= set_value_count {
// Not close enough // Not close enough
(None, None) (None, None)
} else { } else {
@ -242,15 +242,15 @@ impl RPCProcessor {
// See if we have this record ourselves // See if we have this record ourselves
let storage_manager = self.storage_manager(); let storage_manager = self.storage_manager();
let subkey_result = network_result_try!(storage_manager let get_result = network_result_try!(storage_manager
.inbound_get_value(key, subkey, want_descriptor) .inbound_get_value(key, subkey, want_descriptor)
.await .await
.map_err(RPCError::internal)?); .map_err(RPCError::internal)?);
(subkey_result.value, subkey_result.descriptor) (get_result.opt_value, get_result.opt_descriptor)
}; };
if debug_target_enabled!("dht") { if debug_target_enabled!("dht") {
let debug_string_value = subkey_result_value.as_ref().map(|v| { let debug_string_value = get_result_value.as_ref().map(|v| {
format!(" len={} seq={} writer={}", format!(" len={} seq={} writer={}",
v.value_data().data().len(), v.value_data().data().len(),
v.value_data().seq(), v.value_data().seq(),
@ -263,7 +263,7 @@ impl RPCProcessor {
key, key,
subkey, subkey,
debug_string_value, debug_string_value,
if subkey_result_descriptor.is_some() { if get_result_descriptor.is_some() {
" +desc" " +desc"
} else { } else {
"" ""
@ -277,9 +277,9 @@ impl RPCProcessor {
// Make GetValue answer // Make GetValue answer
let get_value_a = RPCOperationGetValueA::new( let get_value_a = RPCOperationGetValueA::new(
subkey_result_value.map(|x| (*x).clone()), get_result_value.map(|x| (*x).clone()),
closer_to_key_peers, closer_to_key_peers,
subkey_result_descriptor.map(|x| (*x).clone()), get_result_descriptor.map(|x| (*x).clone()),
)?; )?;
// Send GetValue answer // Send GetValue answer

View File

@ -232,7 +232,7 @@ impl RPCProcessor {
.inbound_inspect_value(key, subkeys, want_descriptor) .inbound_inspect_value(key, subkeys, want_descriptor)
.await .await
.map_err(RPCError::internal)?); .map_err(RPCError::internal)?);
(inspect_result.seqs, inspect_result.descriptor) (inspect_result.seqs, inspect_result.opt_descriptor)
}; };
if debug_target_enabled!("dht") { if debug_target_enabled!("dht") {

View File

@ -15,7 +15,7 @@ struct OutboundGetValueContext {
/// The result of the outbound_get_value operation /// The result of the outbound_get_value operation
pub(super) struct OutboundGetValueResult { pub(super) struct OutboundGetValueResult {
/// The subkey that was retrieved /// The subkey that was retrieved
pub subkey_result: SubkeyResult, pub get_result: GetResult,
/// And where it was retrieved from /// And where it was retrieved from
pub value_nodes: Vec<NodeRef>, pub value_nodes: Vec<NodeRef>,
} }
@ -28,7 +28,7 @@ impl StorageManager {
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
safety_selection: SafetySelection, safety_selection: SafetySelection,
last_subkey_result: SubkeyResult, last_get_result: GetResult,
) -> VeilidAPIResult<OutboundGetValueResult> { ) -> VeilidAPIResult<OutboundGetValueResult> {
let routing_table = rpc_processor.routing_table(); let routing_table = rpc_processor.routing_table();
@ -44,15 +44,15 @@ impl StorageManager {
}; };
// Make do-get-value answer context // Make do-get-value answer context
let schema = if let Some(d) = &last_subkey_result.descriptor { let schema = if let Some(d) = &last_get_result.opt_descriptor {
Some(d.schema()?) Some(d.schema()?)
} else { } else {
None None
}; };
let context = Arc::new(Mutex::new(OutboundGetValueContext { let context = Arc::new(Mutex::new(OutboundGetValueContext {
value: last_subkey_result.value, value: last_get_result.opt_value,
value_nodes: vec![], value_nodes: vec![],
descriptor: last_subkey_result.descriptor.clone(), descriptor: last_get_result.opt_descriptor.clone(),
schema, schema,
})); }));
@ -60,7 +60,7 @@ impl StorageManager {
let call_routine = |next_node: NodeRef| { let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone(); let rpc_processor = rpc_processor.clone();
let context = context.clone(); let context = context.clone();
let last_descriptor = last_subkey_result.descriptor.clone(); let last_descriptor = last_get_result.opt_descriptor.clone();
async move { async move {
let gva = network_result_try!( let gva = network_result_try!(
rpc_processor rpc_processor
@ -184,9 +184,9 @@ impl StorageManager {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
} }
Ok(OutboundGetValueResult { Ok(OutboundGetValueResult {
subkey_result: SubkeyResult { get_result: GetResult {
value: ctx.value.clone(), opt_value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), opt_descriptor: ctx.descriptor.clone(),
}, },
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}) })
@ -201,9 +201,9 @@ impl StorageManager {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
} }
Ok(OutboundGetValueResult { Ok(OutboundGetValueResult {
subkey_result: SubkeyResult { get_result: GetResult {
value: ctx.value.clone(), opt_value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), opt_descriptor: ctx.descriptor.clone(),
}, },
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}) })
@ -218,9 +218,9 @@ impl StorageManager {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
} }
Ok(OutboundGetValueResult { Ok(OutboundGetValueResult {
subkey_result: SubkeyResult { get_result: GetResult {
value: ctx.value.clone(), opt_value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), opt_descriptor: ctx.descriptor.clone(),
}, },
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}) })
@ -240,28 +240,28 @@ impl StorageManager {
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<SubkeyResult>> { ) -> VeilidAPIResult<NetworkResult<GetResult>> {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// See if this is a remote or local value // See if this is a remote or local value
let (_is_local, last_subkey_result) = { let (_is_local, last_get_result) = {
// See if the subkey we are getting has a last known local value // See if the subkey we are getting has a last known local value
let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; let mut last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_subkey_result.descriptor.is_some() { if last_get_result.opt_descriptor.is_some() {
if !want_descriptor { if !want_descriptor {
last_subkey_result.descriptor = None; last_get_result.opt_descriptor = None;
} }
(true, last_subkey_result) (true, last_get_result)
} else { } else {
// See if the subkey we are getting has a last known remote value // See if the subkey we are getting has a last known remote value
let last_subkey_result = inner let last_get_result = inner
.handle_get_remote_value(key, subkey, want_descriptor) .handle_get_remote_value(key, subkey, want_descriptor)
.await?; .await?;
(false, last_subkey_result) (false, last_get_result)
} }
}; };
Ok(NetworkResult::value(last_subkey_result)) Ok(NetworkResult::value(last_get_result))
} }
} }

View File

@ -1,9 +1,9 @@
use super::*; use super::*;
/// The context of the outbound_get_value operation /// The context of the outbound_get_value operation
struct OutboundGxxx continue here etValueContext { struct OutboundInspectValueContext {
/// The latest value of the subkey, may be the value passed in /// The combined sequence map so far
pub value: Option<Arc<SignedValueData>>, pub seqs: Vec<ValueSeqNum>,
/// The nodes that have returned the value so far (up to the consensus count) /// The nodes that have returned the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>, pub value_nodes: Vec<NodeRef>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed /// The descriptor if we got a fresh one or empty if no descriptor was needed
@ -13,26 +13,26 @@ struct OutboundGxxx continue here etValueContext {
} }
/// The result of the outbound_get_value operation /// The result of the outbound_get_value operation
pub(super) struct OutboundGetValueResult { pub(super) struct OutboundInspectValueResult {
/// The subkey that was retrieved /// The subkey that was retrieved
pub subkey_result: SubkeyResult, pub inspect_result: InspectResult,
/// And where it was retrieved from /// And where it was retrieved from
pub value_nodes: Vec<NodeRef>, pub value_nodes: Vec<NodeRef>,
} }
impl StorageManager { impl StorageManager {
/// Perform a 'inspect value' query on the network /// Perform a 'inspect value' query on the network
pub(super) async fn outbound_get_value( pub(super) async fn outbound_inspect_value(
&self, &self,
rpc_processor: RPCProcessor, rpc_processor: RPCProcessor,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection, safety_selection: SafetySelection,
last_subkey_result: SubkeyResult, last_inspect_result: InspectResult,
) -> VeilidAPIResult<OutboundGetValueResult> { ) -> VeilidAPIResult<OutboundInspectValueResult> {
let routing_table = rpc_processor.routing_table(); let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue' // Get the DHT parameters for 'InspectValue' (the same as for 'GetValue')
let (key_count, consensus_count, fanout, timeout_us) = { let (key_count, consensus_count, fanout, timeout_us) = {
let c = self.unlocked_inner.config.get(); let c = self.unlocked_inner.config.get();
( (
@ -43,16 +43,16 @@ impl StorageManager {
) )
}; };
// Make do-get-value answer context // Make do-inspect-value answer context
let schema = if let Some(d) = &last_subkey_result.descriptor { let schema = if let Some(d) = &last_inspect_result.opt_descriptor {
Some(d.schema()?) Some(d.schema()?)
} else { } else {
None None
}; };
let context = Arc::new(Mutex::new(OutboundGetValueContext { let context = Arc::new(Mutex::new(OutboundInspectValueContext {
value: last_subkey_result.value, seqs: last_inspect_result.seqs,
value_nodes: vec![], value_nodes: vec![],
descriptor: last_subkey_result.descriptor.clone(), descriptor: last_inspect_result.opt_descriptor.clone(),
schema, schema,
})); }));
@ -60,23 +60,24 @@ impl StorageManager {
let call_routine = |next_node: NodeRef| { let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone(); let rpc_processor = rpc_processor.clone();
let context = context.clone(); let context = context.clone();
let last_descriptor = last_subkey_result.descriptor.clone(); let last_descriptor = last_inspect_result.opt_descriptor.clone();
let subkeys = subkeys.clone();
async move { async move {
let gva = network_result_try!( let iva = network_result_try!(
rpc_processor rpc_processor
.clone() .clone()
.rpc_call_get_value( .rpc_call_inspect_value(
Destination::direct(next_node.clone()).with_safety(safety_selection), Destination::direct(next_node.clone()).with_safety(safety_selection),
key, key,
subkey, subkeys.clone(),
last_descriptor.map(|x| (*x).clone()), last_descriptor.map(|x| (*x).clone()),
) )
.await? .await?
); );
// Keep the descriptor if we got one. If we had a last_descriptor it will // Keep the descriptor if we got one. If we had a last_descriptor it will
// already be validated by rpc_call_get_value // already be validated by rpc_call_inspect_value
if let Some(descriptor) = gva.answer.descriptor { if let Some(descriptor) = iva.answer.descriptor {
let mut ctx = context.lock(); let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() { if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?); ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
@ -85,12 +86,12 @@ impl StorageManager {
} }
// Keep the value if we got one and it is newer and it passes schema validation // Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value { if !iva.answer.seqs.is_empty() {
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); log_stor!(debug "Got seqs back: len={}", iva.answer.seqs.len());
let mut ctx = context.lock(); let mut ctx = context.lock();
// Ensure we have a schema and descriptor // Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { let (Some(_descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it // Got a value but no descriptor for it
// Move to the next node // Move to the next node
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
@ -98,53 +99,57 @@ impl StorageManager {
)); ));
}; };
// Validate with schema // Get number of subkeys from schema and ensure we are getting the
if !schema.check_subkey_value_data( // right number of sequence numbers betwen that and what we asked for
descriptor.owner(), let in_schema_subkeys = subkeys
subkey, .intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey()));
value.value_data(), if iva.answer.seqs.len() != in_schema_subkeys.len() {
) { // Not the right number of sequence numbers
// Validation failed, ignore this value
// Move to the next node // Move to the next node
return Ok(NetworkResult::invalid_message(format!( return Ok(NetworkResult::invalid_message(format!(
"Schema validation failed on subkey {}", "wrong number of seqs returned {} (wanted {})",
subkey iva.answer.seqs.len(),
in_schema_subkeys.len()
))); )));
} }
// If we have a prior value, see if this is a newer sequence number // If we have a prior seqs list, merge in the new seqs
if let Some(prior_value) = &ctx.value { if ctx.seqs.len() == 0 {
let prior_seq = prior_value.value_data().seq(); ctx.seqs = iva.answer.seqs.clone();
let new_seq = value.value_data().seq(); // One node has shown us the newest sequence numbers so far
ctx.value_nodes = vec![next_node];
if new_seq == prior_seq { } else {
// If sequence number is the same, the data should be the same if ctx.seqs.len() != iva.answer.seqs.len() {
if prior_value.value_data() != value.value_data() { return Err(RPCError::internal(
// Move to the next node "seqs list length should always be equal by now",
return Ok(NetworkResult::invalid_message("value data mismatch")); ));
} }
// Increase the consensus count for the existing value let mut newer_seq = false;
for pair in ctx.seqs.iter_mut().zip(iva.answer.seqs.iter()) {
// If the new seq isn't undefined and is better than the old seq (either greater or old is undefined)
// Then take that sequence number and note that we have gotten newer sequence numbers so we keep
// looking for consensus
if *pair.1 != ValueSeqNum::MAX
&& (*pair.0 == ValueSeqNum::MAX || pair.1 > pair.0)
{
newer_seq = true;
*pair.0 = *pair.1;
}
}
if newer_seq {
// One node has shown us the latest sequence numbers so far
ctx.value_nodes = vec![next_node];
} else {
// Increase the consensus count for the seqs list
ctx.value_nodes.push(next_node); ctx.value_nodes.push(next_node);
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(Arc::new(value));
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
} else {
// If the sequence number is older, ignore it
} }
} else {
// If we have no prior value, keep it
ctx.value = Some(Arc::new(value));
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
} }
} }
// Return peers if we have some // Return peers if we have some
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); log_network_result!(debug "InspectValue fanout call returned peers {}", iva.answer.peers.len());
Ok(NetworkResult::value(gva.answer.peers)) Ok(NetworkResult::value(iva.answer.peers))
} }
}; };
@ -152,7 +157,7 @@ impl StorageManager {
let check_done = |_closest_nodes: &[NodeRef]| { let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done // If we have reached sufficient consensus, return done
let ctx = context.lock(); let ctx = context.lock();
if ctx.value.is_some() if ctx.seqs.len() > 0
&& ctx.descriptor.is_some() && ctx.descriptor.is_some()
&& ctx.value_nodes.len() >= consensus_count && ctx.value_nodes.len() >= consensus_count
{ {
@ -179,14 +184,14 @@ impl StorageManager {
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count { if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus"); log_stor!(debug "InspectValue Fanout Timeout Consensus");
} else { } else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); log_stor!(debug "InspectValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
} }
Ok(OutboundGetValueResult { Ok(OutboundInspectValueResult {
subkey_result: SubkeyResult { inspect_result: InspectResult {
value: ctx.value.clone(), seqs: ctx.seqs.clone(),
descriptor: ctx.descriptor.clone(), opt_descriptor: ctx.descriptor.clone(),
}, },
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}) })
@ -196,14 +201,14 @@ impl StorageManager {
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count { if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Consensus"); log_stor!(debug "InspectValue Fanout Consensus");
} else { } else {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); log_stor!(debug "InspectValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
} }
Ok(OutboundGetValueResult { Ok(OutboundInspectValueResult {
subkey_result: SubkeyResult { inspect_result: InspectResult {
value: ctx.value.clone(), seqs: ctx.seqs.clone(),
descriptor: ctx.descriptor.clone(), opt_descriptor: ctx.descriptor.clone(),
}, },
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}) })
@ -213,14 +218,14 @@ impl StorageManager {
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count { if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Exhausted Consensus"); log_stor!(debug "InspectValue Fanout Exhausted Consensus");
} else { } else {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); log_stor!(debug "InspectValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
} }
Ok(OutboundGetValueResult { Ok(OutboundInspectValueResult {
subkey_result: SubkeyResult { inspect_result: InspectResult {
value: ctx.value.clone(), seqs: ctx.seqs.clone(),
descriptor: ctx.descriptor.clone(), opt_descriptor: ctx.descriptor.clone(),
}, },
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}) })
@ -228,40 +233,42 @@ impl StorageManager {
// Failed // Failed
TimeoutOr::Value(Err(e)) => { TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that // If we finished with an error, return that
log_stor!(debug "GetValue Fanout Error: {}", e); log_stor!(debug "InspectValue Fanout Error: {}", e);
Err(e.into()) Err(e.into())
} }
} }
} }
/// Handle a received 'Get Value' query /// Handle a received 'Inspect Value' query
pub async fn inbound_get_value( pub async fn inbound_inspect_value(
&self, &self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkeys: ValueSubkeyRangeSet,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<SubkeyResult>> { ) -> VeilidAPIResult<NetworkResult<InspectResult>> {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// See if this is a remote or local value // See if this is a remote or local value
let (_is_local, last_subkey_result) = { let (_is_local, last_get_result) = {
// See if the subkey we are getting has a last known local value // See if the subkey we are getting has a last known local value
let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; let mut last_inspect_result = inner
.handle_inspect_local_value(key, subkeys.clone(), true)
.await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_subkey_result.descriptor.is_some() { if last_inspect_result.opt_descriptor.is_some() {
if !want_descriptor { if !want_descriptor {
last_subkey_result.descriptor = None; last_inspect_result.opt_descriptor = None;
} }
(true, last_subkey_result) (true, last_inspect_result)
} else { } else {
// See if the subkey we are getting has a last known remote value // See if the subkey we are getting has a last known remote value
let last_subkey_result = inner let last_inspect_result = inner
.handle_get_remote_value(key, subkey, want_descriptor) .handle_inspect_remote_value(key, subkeys, want_descriptor)
.await?; .await?;
(false, last_subkey_result) (false, last_inspect_result)
} }
}; };
Ok(NetworkResult::value(last_subkey_result)) Ok(NetworkResult::value(last_get_result))
} }
} }

View File

@ -1,5 +1,6 @@
mod debug; mod debug;
mod get_value; mod get_value;
mod inspect_value;
mod record_store; mod record_store;
mod set_value; mod set_value;
mod storage_manager_inner; mod storage_manager_inner;
@ -204,6 +205,7 @@ impl StorageManager {
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> { ) -> VeilidAPIResult<DHTRecordDescriptor> {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
schema.validate()?;
// Create a new owned local record from scratch // Create a new owned local record from scratch
let (key, owner) = inner let (key, owner) = inner
@ -254,12 +256,12 @@ impl StorageManager {
key, key,
subkey, subkey,
safety_selection, safety_selection,
SubkeyResult::default(), GetResult::default(),
) )
.await?; .await?;
// If we got nothing back, the key wasn't found // If we got nothing back, the key wasn't found
if result.subkey_result.value.is_none() && result.subkey_result.descriptor.is_none() { if result.get_result.opt_value.is_none() && result.get_result.opt_descriptor.is_none() {
// No result // No result
apibail_key_not_found!(key); apibail_key_not_found!(key);
}; };
@ -280,7 +282,7 @@ impl StorageManager {
// Open the new record // Open the new record
inner inner
.open_new_record(key, writer, subkey, result.subkey_result, safety_selection) .open_new_record(key, writer, subkey, result.get_result, safety_selection)
.await .await
} }
@ -359,12 +361,12 @@ impl StorageManager {
}; };
// See if the requested subkey is our local record store // See if the requested subkey is our local record store
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; let last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
// Return the existing value if we have one unless we are forcing a refresh // Return the existing value if we have one unless we are forcing a refresh
if !force_refresh { if !force_refresh {
if let Some(last_subkey_result_value) = last_subkey_result.value { if let Some(last_get_result_value) = last_get_result.opt_value {
return Ok(Some(last_subkey_result_value.value_data().clone())); return Ok(Some(last_get_result_value.value_data().clone()));
} }
} }
@ -373,8 +375,8 @@ impl StorageManager {
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::online_ready_inner(&inner) else { let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
// Return the existing value if we have one if we aren't online // Return the existing value if we have one if we aren't online
if let Some(last_subkey_result_value) = last_subkey_result.value { if let Some(last_get_result_value) = last_get_result.opt_value {
return Ok(Some(last_subkey_result_value.value_data().clone())); return Ok(Some(last_get_result_value.value_data().clone()));
} }
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
}; };
@ -384,8 +386,8 @@ impl StorageManager {
// May have last descriptor / value // May have last descriptor / value
// Use the safety selection we opened the record with // Use the safety selection we opened the record with
let opt_last_seq = last_subkey_result let opt_last_seq = last_get_result
.value .opt_value
.as_ref() .as_ref()
.map(|v| v.value_data().seq()); .map(|v| v.value_data().seq());
let result = self let result = self
@ -394,12 +396,12 @@ impl StorageManager {
key, key,
subkey, subkey,
safety_selection, safety_selection,
last_subkey_result, last_get_result,
) )
.await?; .await?;
// See if we got a value back // See if we got a value back
let Some(subkey_result_value) = result.subkey_result.value else { let Some(get_result_value) = result.get_result.opt_value else {
// If we got nothing back then we also had nothing beforehand, return nothing // If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None); return Ok(None);
}; };
@ -409,17 +411,17 @@ impl StorageManager {
inner.set_value_nodes(key, result.value_nodes)?; inner.set_value_nodes(key, result.value_nodes)?;
// If we got a new value back then write it to the opened record // If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq { if Some(get_result_value.value_data().seq()) != opt_last_seq {
inner inner
.handle_set_local_value( .handle_set_local_value(
key, key,
subkey, subkey,
subkey_result_value.clone(), get_result_value.clone(),
WatchUpdateMode::UpdateAll, WatchUpdateMode::UpdateAll,
) )
.await?; .await?;
} }
Ok(Some(subkey_result_value.value_data().clone())) Ok(Some(get_result_value.value_data().clone()))
} }
/// Set the value of a subkey on an opened local record /// Set the value of a subkey on an opened local record
@ -456,16 +458,16 @@ impl StorageManager {
}; };
// See if the subkey we are modifying has a last known local value // See if the subkey we are modifying has a last known local value
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; let last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
// Get the descriptor and schema for the key // Get the descriptor and schema for the key
let Some(descriptor) = last_subkey_result.descriptor else { let Some(descriptor) = last_get_result.opt_descriptor else {
apibail_generic!("must have a descriptor"); apibail_generic!("must have a descriptor");
}; };
let schema = descriptor.schema()?; let schema = descriptor.schema()?;
// Make new subkey data // Make new subkey data
let value_data = if let Some(last_signed_value_data) = last_subkey_result.value { let value_data = if let Some(last_signed_value_data) = last_get_result.opt_value {
if last_signed_value_data.value_data().data() == data if last_signed_value_data.value_data().data() == data
&& last_signed_value_data.value_data().writer() == &writer.key && last_signed_value_data.value_data().writer() == &writer.key
{ {

View File

@ -61,3 +61,9 @@ impl TryFrom<&[u8]> for SubkeyTableKey {
Ok(SubkeyTableKey { key, subkey }) Ok(SubkeyTableKey { key, subkey })
} }
} }
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SeqsCacheKey {
pub key: TypedKey,
pub subkeys: ValueSubkeyRangeSet,
}

View File

@ -59,6 +59,8 @@ where
record_index: LruCache<RecordTableKey, Record<D>>, record_index: LruCache<RecordTableKey, Record<D>>,
/// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db /// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db
subkey_cache: LruCache<SubkeyTableKey, RecordData>, subkey_cache: LruCache<SubkeyTableKey, RecordData>,
/// The in-memory cache of commonly accessed sequence number data so we don't have to keep hitting the db
seqs_cache: LruCache<SeqsCacheKey, Vec<ValueSeqNum>>,
/// Total storage space or subkey data inclusive of structures in memory /// Total storage space or subkey data inclusive of structures in memory
subkey_cache_total_size: LimitedSize<usize>, subkey_cache_total_size: LimitedSize<usize>,
/// Total storage space of records in the tabledb inclusive of subkey data and structures /// Total storage space of records in the tabledb inclusive of subkey data and structures
@ -77,11 +79,20 @@ where
/// The result of the do_get_value_operation /// The result of the do_get_value_operation
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct SubkeyResult { pub struct GetResult {
/// The subkey value if we got one /// The subkey value if we got one
pub value: Option<Arc<SignedValueData>>, pub opt_value: Option<Arc<SignedValueData>>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed /// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<Arc<SignedValueDescriptor>>, pub opt_descriptor: Option<Arc<SignedValueDescriptor>>,
}
/// The result of the do_inspect_value_operation
#[derive(Default, Debug)]
pub struct InspectResult {
/// The sequence map
pub seqs: Vec<ValueSeqNum>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub opt_descriptor: Option<Arc<SignedValueDescriptor>>,
} }
impl<D> RecordStore<D> impl<D> RecordStore<D>
@ -105,6 +116,7 @@ where
subkey_table: None, subkey_table: None,
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
subkey_cache: LruCache::new(subkey_cache_size), subkey_cache: LruCache::new(subkey_cache_size),
seqs_cache: LruCache::new(subkey_cache_size),
subkey_cache_total_size: LimitedSize::new( subkey_cache_total_size: LimitedSize::new(
"subkey_cache_total_size", "subkey_cache_total_size",
0, 0,
@ -488,7 +500,7 @@ where
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<Option<SubkeyResult>> { ) -> VeilidAPIResult<Option<GetResult>> {
// Get record from index // Get record from index
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| { let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
( (
@ -513,9 +525,9 @@ where
// See if we have this subkey stored // See if we have this subkey stored
if !has_subkey { if !has_subkey {
// If not, return no value but maybe with descriptor // If not, return no value but maybe with descriptor
return Ok(Some(SubkeyResult { return Ok(Some(GetResult {
value: None, opt_value: None,
descriptor: opt_descriptor, opt_descriptor,
})); }));
} }
@ -529,9 +541,9 @@ where
if let Some(record_data) = self.subkey_cache.get_mut(&stk) { if let Some(record_data) = self.subkey_cache.get_mut(&stk) {
let out = record_data.signed_value_data().clone(); let out = record_data.signed_value_data().clone();
return Ok(Some(SubkeyResult { return Ok(Some(GetResult {
value: Some(out), opt_value: Some(out),
descriptor: opt_descriptor, opt_descriptor,
})); }));
} }
// If not in cache, try to pull from table store if it is in our stored subkey set // If not in cache, try to pull from table store if it is in our stored subkey set
@ -548,9 +560,9 @@ where
// Add to cache, do nothing with lru out // Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data); self.add_to_subkey_cache(stk, record_data);
Ok(Some(SubkeyResult { Ok(Some(GetResult {
value: Some(out), opt_value: Some(out),
descriptor: opt_descriptor, opt_descriptor,
})) }))
} }
@ -559,7 +571,7 @@ where
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<Option<SubkeyResult>> { ) -> VeilidAPIResult<Option<GetResult>> {
// record from index // record from index
let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| { let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| {
( (
@ -584,9 +596,9 @@ where
// See if we have this subkey stored // See if we have this subkey stored
if !has_subkey { if !has_subkey {
// If not, return no value but maybe with descriptor // If not, return no value but maybe with descriptor
return Ok(Some(SubkeyResult { return Ok(Some(GetResult {
value: None, opt_value: None,
descriptor: opt_descriptor, opt_descriptor,
})); }));
} }
@ -600,9 +612,9 @@ where
if let Some(record_data) = self.subkey_cache.peek(&stk) { if let Some(record_data) = self.subkey_cache.peek(&stk) {
let out = record_data.signed_value_data().clone(); let out = record_data.signed_value_data().clone();
return Ok(Some(SubkeyResult { return Ok(Some(GetResult {
value: Some(out), opt_value: Some(out),
descriptor: opt_descriptor, opt_descriptor,
})); }));
} }
// If not in cache, try to pull from table store if it is in our stored subkey set // If not in cache, try to pull from table store if it is in our stored subkey set
@ -616,9 +628,9 @@ where
let out = record_data.signed_value_data().clone(); let out = record_data.signed_value_data().clone();
Ok(Some(SubkeyResult { Ok(Some(GetResult {
value: Some(out), opt_value: Some(out),
descriptor: opt_descriptor, opt_descriptor,
})) }))
} }
@ -760,6 +772,84 @@ where
Ok(()) Ok(())
} }
pub async fn inspect_record(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
// Get record from index
let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let in_schema_subkeys = subkeys.intersect(&ValueSubkeyRangeSet::single_range(
0,
record.schema().max_subkey(),
));
(
in_schema_subkeys,
if want_descriptor {
Some(record.descriptor().clone())
} else {
None
},
)
}) else {
// Record not available
return Ok(None);
};
// Check if we can return some subkeys
if subkeys.is_empty() {
apibail_invalid_argument!("subkeys set does not overlap schema", "subkeys", subkeys);
}
// See if we have this inspection cached
let sck = SeqsCacheKey {
key,
subkeys: subkeys.clone(),
};
if let Some(seqs) = self.seqs_cache.get(&sck) {
return Ok(Some(InspectResult {
seqs: seqs.clone(),
opt_descriptor,
}));
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// Build sequence number list to return
let mut seqs = Vec::with_capacity(subkeys.len());
for subkey in subkeys.iter() {
let stk = SubkeyTableKey { key, subkey };
let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) {
record_data.signed_value_data().value_data().seq()
} else {
// If not in cache, try to pull from table store if it is in our stored subkey set
// XXX: This would be better if it didn't have to pull the whole record data to get the seq.
if let Some(record_data) = subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
{
record_data.signed_value_data().value_data().seq()
} else {
// Subkey not written to
ValueSubkey::MAX
}
};
seqs.push(seq)
}
Ok(Some(InspectResult {
seqs,
opt_descriptor,
}))
}
pub async fn _change_existing_watch( pub async fn _change_existing_watch(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
@ -1063,7 +1153,7 @@ where
log_stor!(error "first subkey should exist for value change notification"); log_stor!(error "first subkey should exist for value change notification");
continue; continue;
}; };
let subkey_result = match self.get_subkey(evci.key, first_subkey, false).await { let get_result = match self.get_subkey(evci.key, first_subkey, false).await {
Ok(Some(skr)) => skr, Ok(Some(skr)) => skr,
Ok(None) => { Ok(None) => {
log_stor!(error "subkey should have data for value change notification"); log_stor!(error "subkey should have data for value change notification");
@ -1074,7 +1164,7 @@ where
continue; continue;
} }
}; };
let Some(value) = subkey_result.value else { let Some(value) = get_result.opt_value else {
log_stor!(error "first subkey should have had value for value change notification"); log_stor!(error "first subkey should have had value for value change notification");
continue; continue;
}; };

View File

@ -23,7 +23,7 @@ where
detail: D, detail: D,
) -> VeilidAPIResult<Self> { ) -> VeilidAPIResult<Self> {
let schema = descriptor.schema()?; let schema = descriptor.schema()?;
let subkey_count = schema.subkey_count(); let subkey_count = schema.max_subkey() as usize + 1;
Ok(Self { Ok(Self {
descriptor, descriptor,
subkey_count, subkey_count,

View File

@ -231,21 +231,21 @@ impl StorageManager {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// See if this is a remote or local value // See if this is a remote or local value
let (is_local, last_subkey_result) = { let (is_local, last_get_result) = {
// See if the subkey we are modifying has a last known local value // See if the subkey we are modifying has a last known local value
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; let last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_subkey_result.descriptor.is_some() { if last_get_result.opt_descriptor.is_some() {
(true, last_subkey_result) (true, last_get_result)
} else { } else {
// See if the subkey we are modifying has a last known remote value // See if the subkey we are modifying has a last known remote value
let last_subkey_result = inner.handle_get_remote_value(key, subkey, true).await?; let last_get_result = inner.handle_get_remote_value(key, subkey, true).await?;
(false, last_subkey_result) (false, last_get_result)
} }
}; };
// Make sure this value would actually be newer // Make sure this value would actually be newer
if let Some(last_value) = &last_subkey_result.value { if let Some(last_value) = &last_get_result.opt_value {
if value.value_data().seq() <= last_value.value_data().seq() { if value.value_data().seq() <= last_value.value_data().seq() {
// inbound value is older or equal sequence number than the one we have, just return the one we have // inbound value is older or equal sequence number than the one we have, just return the one we have
return Ok(NetworkResult::value(Some(last_value.clone()))); return Ok(NetworkResult::value(Some(last_value.clone())));
@ -253,7 +253,7 @@ impl StorageManager {
} }
// Get the descriptor and schema for the key // Get the descriptor and schema for the key
let actual_descriptor = match last_subkey_result.descriptor { let actual_descriptor = match last_get_result.opt_descriptor {
Some(last_descriptor) => { Some(last_descriptor) => {
if let Some(descriptor) = descriptor { if let Some(descriptor) = descriptor {
// Descriptor must match last one if it is provided // Descriptor must match last one if it is provided

View File

@ -293,13 +293,12 @@ impl StorageManagerInner {
// Move copy subkey data from remote to local store // Move copy subkey data from remote to local store
for subkey in remote_record.stored_subkeys().iter() { for subkey in remote_record.stored_subkeys().iter() {
let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await? let Some(get_result) = remote_record_store.get_subkey(key, subkey, false).await? else {
else {
// Subkey was missing // Subkey was missing
warn!("Subkey was missing: {} #{}", key, subkey); warn!("Subkey was missing: {} #{}", key, subkey);
continue; continue;
}; };
let Some(subkey_data) = subkey_result.value else { let Some(subkey_data) = get_result.opt_value else {
// Subkey was missing // Subkey was missing
warn!("Subkey data was missing: {} #{}", key, subkey); warn!("Subkey data was missing: {} #{}", key, subkey);
continue; continue;
@ -388,7 +387,7 @@ impl StorageManagerInner {
key: TypedKey, key: TypedKey,
writer: Option<KeyPair>, writer: Option<KeyPair>,
subkey: ValueSubkey, subkey: ValueSubkey,
subkey_result: SubkeyResult, get_result: GetResult,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> { ) -> VeilidAPIResult<DHTRecordDescriptor> {
// Ensure the record is closed // Ensure the record is closed
@ -397,7 +396,7 @@ impl StorageManagerInner {
} }
// Must have descriptor // Must have descriptor
let Some(signed_value_descriptor) = subkey_result.descriptor else { let Some(signed_value_descriptor) = get_result.opt_descriptor else {
// No descriptor for new record, can't store this // No descriptor for new record, can't store this
apibail_generic!("no descriptor"); apibail_generic!("no descriptor");
}; };
@ -434,7 +433,7 @@ impl StorageManagerInner {
local_record_store.new_record(key, record).await?; local_record_store.new_record(key, record).await?;
// If we got a subkey with the getvalue, it has already been validated against the schema, so store it // If we got a subkey with the getvalue, it has already been validated against the schema, so store it
if let Some(signed_value_data) = subkey_result.value { if let Some(signed_value_data) = get_result.opt_value {
// Write subkey to local store // Write subkey to local store
local_record_store local_record_store
.set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate) .set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate)
@ -513,21 +512,21 @@ impl StorageManagerInner {
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<SubkeyResult> { ) -> VeilidAPIResult<GetResult> {
// See if it's in the local record store // See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else { let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!(); apibail_not_initialized!();
}; };
if let Some(subkey_result) = local_record_store if let Some(get_result) = local_record_store
.get_subkey(key, subkey, want_descriptor) .get_subkey(key, subkey, want_descriptor)
.await? .await?
{ {
return Ok(subkey_result); return Ok(get_result);
} }
Ok(SubkeyResult { Ok(GetResult {
value: None, opt_value: None,
descriptor: None, opt_descriptor: None,
}) })
} }
@ -551,26 +550,49 @@ impl StorageManagerInner {
Ok(()) Ok(())
} }
pub(super) async fn handle_inspect_local_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = local_record_store
.inspect_record(key, subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
Ok(InspectResult {
seqs: vec![],
opt_descriptor: None,
})
}
pub(super) async fn handle_get_remote_value( pub(super) async fn handle_get_remote_value(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
) -> VeilidAPIResult<SubkeyResult> { ) -> VeilidAPIResult<GetResult> {
// See if it's in the remote record store // See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else { let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!(); apibail_not_initialized!();
}; };
if let Some(subkey_result) = remote_record_store if let Some(get_result) = remote_record_store
.get_subkey(key, subkey, want_descriptor) .get_subkey(key, subkey, want_descriptor)
.await? .await?
{ {
return Ok(subkey_result); return Ok(get_result);
} }
Ok(SubkeyResult { Ok(GetResult {
value: None, opt_value: None,
descriptor: None, opt_descriptor: None,
}) })
} }
@ -608,6 +630,29 @@ impl StorageManagerInner {
Ok(()) Ok(())
} }
pub(super) async fn handle_inspect_remote_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = remote_record_store
.inspect_record(key, subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
Ok(InspectResult {
seqs: vec![],
opt_descriptor: None,
})
}
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where where

View File

@ -26,19 +26,19 @@ impl StorageManager {
break; break;
}; };
for subkey in osw.subkeys.iter() { for subkey in osw.subkeys.iter() {
let subkey_result = { let get_result = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
inner.handle_get_local_value(key, subkey, true).await inner.handle_get_local_value(key, subkey, true).await
}; };
let Ok(subkey_result) = subkey_result else { let Ok(get_result) = get_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
continue; continue;
}; };
let Some(value) = subkey_result.value else { let Some(value) = get_result.opt_value else {
log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey);
continue; continue;
}; };
let Some(descriptor) = subkey_result.descriptor else { let Some(descriptor) = get_result.opt_descriptor else {
log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey);
continue; continue;
}; };

View File

@ -20,7 +20,10 @@ impl SignedValueDescriptor {
pub fn validate(&self, vcrypto: CryptoSystemVersion) -> VeilidAPIResult<()> { pub fn validate(&self, vcrypto: CryptoSystemVersion) -> VeilidAPIResult<()> {
// validate signature // validate signature
vcrypto.verify(&self.owner, &self.schema_data, &self.signature) vcrypto.verify(&self.owner, &self.schema_data, &self.signature)?;
// validate schema
DHTSchema::try_from(self.schema_data.as_slice())?;
Ok(())
} }
pub fn owner(&self) -> &PublicKey { pub fn owner(&self) -> &PublicKey {

View File

@ -1427,7 +1427,7 @@ impl VeilidAPI {
"dht_schema", "dht_schema",
get_dht_schema, get_dht_schema,
) )
.unwrap_or_else(|_| Ok(DHTSchema::dflt(1)))?; .unwrap_or_else(|_| Ok(DHTSchema::default()))?;
let csv = get_debug_argument_at( let csv = get_debug_argument_at(
&args, &args,

View File

@ -241,6 +241,7 @@ impl RoutingContext {
) -> VeilidAPIResult<DHTRecordDescriptor> { ) -> VeilidAPIResult<DHTRecordDescriptor> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"RoutingContext::create_dht_record(self: {:?}, schema: {:?}, kind: {:?})", self, schema, kind); "RoutingContext::create_dht_record(self: {:?}, schema: {:?}, kind: {:?})", self, schema, kind);
schema.validate()?;
let kind = kind.unwrap_or(best_crypto_kind()); let kind = kind.unwrap_or(best_crypto_kind());
Crypto::validate_crypto_kind(kind)?; Crypto::validate_crypto_kind(kind)?;

View File

@ -9,7 +9,7 @@ pub async fn test_dhtrecorddescriptor() {
fix_typedkey(), fix_typedkey(),
fix_cryptokey(), fix_cryptokey(),
Some(fix_cryptokey()), Some(fix_cryptokey()),
DHTSchema::DFLT(DHTSchemaDFLT { o_cnt: 4321 }), DHTSchema::dflt(4321).unwrap(),
); );
let copy = deserialize_json(&serialize_json(&orig)).unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap();

View File

@ -4,7 +4,7 @@ use crate::*;
// dlft // dlft
pub async fn test_dhtschemadflt() { pub async fn test_dhtschemadflt() {
let orig = DHTSchemaDFLT { o_cnt: 9 }; let orig = DHTSchemaDFLT::new(9);
let copy = deserialize_json(&serialize_json(&orig)).unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap();
assert_eq!(orig, copy); assert_eq!(orig, copy);
@ -13,9 +13,10 @@ pub async fn test_dhtschemadflt() {
// mod // mod
pub async fn test_dhtschema() { pub async fn test_dhtschema() {
let orig = DHTSchema::SMPL(DHTSchemaSMPL { let orig = DHTSchema::SMPL(
o_cnt: 91, DHTSchemaSMPL::new(
members: vec![ 91,
vec![
DHTSchemaSMPLMember { DHTSchemaSMPLMember {
m_key: fix_cryptokey(), m_key: fix_cryptokey(),
m_cnt: 5, m_cnt: 5,
@ -25,7 +26,9 @@ pub async fn test_dhtschema() {
m_cnt: 6, m_cnt: 6,
}, },
], ],
}); )
.unwrap(),
);
let copy = deserialize_json(&serialize_json(&orig)).unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap();
assert_eq!(orig, copy); assert_eq!(orig, copy);
@ -44,9 +47,9 @@ pub async fn test_dhtschemasmplmember() {
} }
pub async fn test_dhtschemasmpl() { pub async fn test_dhtschemasmpl() {
let orig = DHTSchemaSMPL { let orig = DHTSchemaSMPL::new(
o_cnt: 91, 91,
members: vec![ vec![
DHTSchemaSMPLMember { DHTSchemaSMPLMember {
m_key: fix_cryptokey(), m_key: fix_cryptokey(),
m_cnt: 8, m_cnt: 8,
@ -56,7 +59,8 @@ pub async fn test_dhtschemasmpl() {
m_cnt: 9, m_cnt: 9,
}, },
], ],
}; )
.unwrap();
let copy = deserialize_json(&serialize_json(&orig)).unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap();
assert_eq!(orig, copy); assert_eq!(orig, copy);

View File

@ -5,13 +5,33 @@ use super::*;
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi))] #[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi))]
pub struct DHTSchemaDFLT { pub struct DHTSchemaDFLT {
/// Owner subkey count /// Owner subkey count
pub o_cnt: u16, o_cnt: u16,
} }
impl DHTSchemaDFLT { impl DHTSchemaDFLT {
pub const FCC: [u8; 4] = *b"DFLT"; pub const FCC: [u8; 4] = *b"DFLT";
pub const FIXED_SIZE: usize = 6; pub const FIXED_SIZE: usize = 6;
/// Make a schema
pub fn new(o_cnt: u16) -> VeilidAPIResult<Self> {
let out = Self { o_cnt };
out.validate()?;
Ok(out)
}
/// Validate the data representation
pub fn validate(&self) -> VeilidAPIResult<()> {
if self.o_cnt == 0 {
apibail_invalid_argument!("must have at least one subkey", "o_cnt", self.o_cnt);
}
Ok(())
}
/// Get the owner subkey count
pub fn o_cnt(&self) -> u16 {
self.o_cnt
}
/// Build the data representation of the schema /// Build the data representation of the schema
pub fn compile(&self) -> Vec<u8> { pub fn compile(&self) -> Vec<u8> {
let mut out = Vec::<u8>::with_capacity(Self::FIXED_SIZE); let mut out = Vec::<u8>::with_capacity(Self::FIXED_SIZE);
@ -22,9 +42,9 @@ impl DHTSchemaDFLT {
out out
} }
/// Get the number of subkeys this schema allocates /// Get the maximum subkey this schema allocates
pub fn subkey_count(&self) -> usize { pub fn max_subkey(&self) -> ValueSubkey {
self.o_cnt as usize self.o_cnt as ValueSubkey - 1
} }
/// Get the data size of this schema beyond the size of the structure itself /// Get the data size of this schema beyond the size of the structure itself
pub fn data_size(&self) -> usize { pub fn data_size(&self) -> usize {
@ -72,6 +92,6 @@ impl TryFrom<&[u8]> for DHTSchemaDFLT {
let o_cnt = u16::from_le_bytes(b[4..6].try_into().map_err(VeilidAPIError::internal)?); let o_cnt = u16::from_le_bytes(b[4..6].try_into().map_err(VeilidAPIError::internal)?);
Ok(Self { o_cnt }) Self::new(o_cnt)
} }
} }

View File

@ -16,11 +16,19 @@ pub enum DHTSchema {
} }
impl DHTSchema { impl DHTSchema {
pub fn dflt(o_cnt: u16) -> DHTSchema { pub fn dflt(o_cnt: u16) -> VeilidAPIResult<DHTSchema> {
DHTSchema::DFLT(DHTSchemaDFLT { o_cnt }) Ok(DHTSchema::DFLT(DHTSchemaDFLT::new(o_cnt)?))
}
pub fn smpl(o_cnt: u16, members: Vec<DHTSchemaSMPLMember>) -> VeilidAPIResult<DHTSchema> {
Ok(DHTSchema::SMPL(DHTSchemaSMPL::new(o_cnt, members)?))
}
/// Validate the data representation
pub fn validate(&self) -> VeilidAPIResult<()> {
match self {
DHTSchema::DFLT(d) => d.validate(),
DHTSchema::SMPL(s) => s.validate(),
} }
pub fn smpl(o_cnt: u16, members: Vec<DHTSchemaSMPLMember>) -> DHTSchema {
DHTSchema::SMPL(DHTSchemaSMPL { o_cnt, members })
} }
/// Build the data representation of the schema /// Build the data representation of the schema
@ -31,11 +39,11 @@ impl DHTSchema {
} }
} }
/// Get the number of subkeys this schema allocates /// Get maximum subkey number for this schema
pub fn subkey_count(&self) -> usize { pub fn max_subkey(&self) -> ValueSubkey {
match self { match self {
DHTSchema::DFLT(d) => d.subkey_count(), DHTSchema::DFLT(d) => d.max_subkey(),
DHTSchema::SMPL(s) => s.subkey_count(), DHTSchema::SMPL(s) => s.max_subkey(),
} }
} }
@ -71,7 +79,7 @@ impl DHTSchema {
impl Default for DHTSchema { impl Default for DHTSchema {
fn default() -> Self { fn default() -> Self {
Self::dflt(1) Self::dflt(1).unwrap()
} }
} }

View File

@ -16,15 +16,48 @@ pub struct DHTSchemaSMPLMember {
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi))] #[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi))]
pub struct DHTSchemaSMPL { pub struct DHTSchemaSMPL {
/// Owner subkey count /// Owner subkey count
pub o_cnt: u16, o_cnt: u16,
/// Members /// Members
pub members: Vec<DHTSchemaSMPLMember>, members: Vec<DHTSchemaSMPLMember>,
} }
impl DHTSchemaSMPL { impl DHTSchemaSMPL {
pub const FCC: [u8; 4] = *b"SMPL"; pub const FCC: [u8; 4] = *b"SMPL";
pub const FIXED_SIZE: usize = 6; pub const FIXED_SIZE: usize = 6;
/// Make a schema
pub fn new(o_cnt: u16, members: Vec<DHTSchemaSMPLMember>) -> VeilidAPIResult<Self> {
let out = Self { o_cnt, members };
out.validate()?;
Ok(out)
}
/// Validate the data representation
pub fn validate(&self) -> VeilidAPIResult<()> {
let keycount = self
.members
.iter()
.fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize));
if keycount == 0 {
apibail_invalid_argument!("must have at least one subkey", "keycount", keycount);
}
if keycount > 65535 {
apibail_invalid_argument!("too many subkeys", "keycount", keycount);
}
Ok(())
}
/// Get the owner subkey count
pub fn o_cnt(&self) -> u16 {
self.o_cnt
}
/// Get the members of the schema
pub fn members(&self) -> &[DHTSchemaSMPLMember] {
&self.members
}
/// Build the data representation of the schema /// Build the data representation of the schema
pub fn compile(&self) -> Vec<u8> { pub fn compile(&self) -> Vec<u8> {
let mut out = Vec::<u8>::with_capacity( let mut out = Vec::<u8>::with_capacity(
@ -44,11 +77,13 @@ impl DHTSchemaSMPL {
out out
} }
/// Get the number of subkeys this schema allocates /// Get the maximum subkey this schema allocates
pub fn subkey_count(&self) -> usize { pub fn max_subkey(&self) -> ValueSubkey {
self.members let subkey_count = self
.members
.iter() .iter()
.fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)) .fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize));
(subkey_count - 1) as ValueSubkey
} }
/// Get the data size of this schema beyond the size of the structure itself /// Get the data size of this schema beyond the size of the structure itself
@ -134,6 +169,6 @@ impl TryFrom<&[u8]> for DHTSchemaSMPL {
members.push(DHTSchemaSMPLMember { m_key, m_cnt }); members.push(DHTSchemaSMPLMember { m_key, m_cnt });
} }
Ok(Self { o_cnt, members }) Self::new(o_cnt, members)
} }
} }

View File

@ -2,7 +2,9 @@ use super::*;
use core::ops::{Deref, DerefMut}; use core::ops::{Deref, DerefMut};
use range_set_blaze::*; use range_set_blaze::*;
#[derive(Clone, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema)] #[derive(
Clone, Default, Hash, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema,
)]
#[serde(transparent)] #[serde(transparent)]
pub struct ValueSubkeyRangeSet { pub struct ValueSubkeyRangeSet {
#[serde(with = "serialize_range_set_blaze")] #[serde(with = "serialize_range_set_blaze")]
@ -29,6 +31,11 @@ impl ValueSubkeyRangeSet {
data.insert(value); data.insert(value);
Self { data } Self { data }
} }
pub fn single_range(low: ValueSubkey, high: ValueSubkey) -> Self {
let mut data = RangeSetBlaze::new();
data.ranges_insert(low..=high);
Self { data }
}
pub fn intersect(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet { pub fn intersect(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(&self.data & &other.data) Self::new_with_data(&self.data & &other.data)

View File

@ -38,7 +38,7 @@ extension ValidateSMPL on DHTSchemaSMPL {
return true; return true;
} }
int subkeyCount() => members.fold(0, (acc, v) => acc + v.mCnt) + oCnt; int subkeyCount() => members.fold(oCnt, (acc, v) => acc + v.mCnt);
} }
extension Validate on DHTSchema { extension Validate on DHTSchema {