diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs index b00b6319..26ccec60 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs @@ -1,8 +1,8 @@ use super::*; use crate::storage_manager::SignedValueDescriptor; -const MAX_INSPECT_VALUE_Q_SUBKEYS_LEN: usize = 512; -const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = MAX_INSPECT_VALUE_Q_SUBKEYS_LEN; +const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512; +const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512; const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20; #[derive(Clone)] @@ -34,12 +34,6 @@ impl RPCOperationInspectValueQ { subkeys: ValueSubkeyRangeSet, want_descriptor: bool, ) -> Result { - // Needed because RangeSetBlaze uses different types here all the time - #[allow(clippy::unnecessary_cast)] - let subkeys_len = subkeys.len() as usize; - if subkeys_len > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN { - return Err(RPCError::protocol("InspectValue subkeys length too long")); - } Ok(Self { key, subkeys, @@ -70,7 +64,7 @@ impl RPCOperationInspectValueQ { let key = decode_typed_key(&k_reader)?; let sk_reader = reader.get_subkeys().map_err(RPCError::protocol)?; // Maximum number of ranges that can hold the maximum number of subkeys is one subkey per range - if sk_reader.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN { + if sk_reader.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN { return Err(RPCError::protocol("InspectValueQ too many subkey ranges")); } let mut subkeys = ValueSubkeyRangeSet::new(); @@ -88,11 +82,6 @@ impl RPCOperationInspectValueQ { } subkeys.ranges_insert(vskr.0..=vskr.1); } - // Needed because RangeSetBlaze uses different types here all the time - #[allow(clippy::unnecessary_cast)] - if subkeys.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN { - return Err(RPCError::protocol("InspectValueQ too many subkey ranges")); - } let want_descriptor = reader.reborrow().get_want_descriptor(); Ok(Self { @@ -218,6 +207,11 @@ impl RPCOperationInspectValueA { ) -> Result { let seqs = if reader.has_seqs() { let seqs_reader = reader.get_seqs().map_err(RPCError::protocol)?; + if seqs_reader.len() as usize > MAX_INSPECT_VALUE_A_SEQS_LEN { + return Err(RPCError::protocol( + "decoded InspectValueA seqs length too long", + )); + } let Some(seqs) = seqs_reader.as_slice().map(|s| s.to_vec()) else { return Err(RPCError::protocol("invalid decoded InspectValueA seqs")); }; diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index edcadbac..ed0725db 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -8,6 +8,19 @@ where result: Option>, } +#[derive(Debug, Copy, Clone)] +pub(crate) enum FanoutResultKind { + Timeout, + Finished, + Exhausted, +} + +#[derive(Debug, Clone)] +pub(crate) struct FanoutResult { + pub kind: FanoutResultKind, + pub value_nodes: Vec, +} + pub(crate) type FanoutCallReturnType = RPCNetworkResult>; pub(crate) type FanoutNodeInfoFilter = Arc bool + Send + Sync>; diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 6cb1d140..1e608074 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -14,10 +14,12 @@ struct OutboundGetValueContext { /// The result of the outbound_get_value operation pub(super) struct OutboundGetValueResult { + /// Fanout result + pub fanout_result: FanoutResult, + /// Consensus count for this operation, + pub consensus_count: usize, /// The subkey that was retrieved pub get_result: GetResult, - /// And where it was retrieved from - pub value_nodes: Vec, } impl StorageManager { @@ -79,7 +81,13 @@ impl StorageManager { if let Some(descriptor) = gva.answer.descriptor { let mut ctx = context.lock(); if ctx.descriptor.is_none() && ctx.schema.is_none() { - ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?); + let schema = match descriptor.schema() { + Ok(v) => v, + Err(e) => { + return Ok(NetworkResult::invalid_message(e)); + } + }; + ctx.schema = Some(schema); ctx.descriptor = Some(Arc::new(descriptor)); } } @@ -173,65 +181,36 @@ impl StorageManager { check_done, ); - match fanout_call.run(vec![]).await { + let kind = match fanout_call.run(vec![]).await { // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "GetValue Fanout Timeout Consensus"); - } else { - log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundGetValueResult { - get_result: GetResult { - opt_value: ctx.value.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - value_nodes: ctx.value_nodes.clone(), - }) - } - // If we finished with consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "GetValue Fanout Consensus"); - } else { - log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundGetValueResult { - get_result: GetResult { - opt_value: ctx.value.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - value_nodes: ctx.value_nodes.clone(), - }) - } - // If we finished without consensus (ran out of nodes before getting consensus) - TimeoutOr::Value(Ok(None)) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "GetValue Fanout Exhausted Consensus"); - } else { - log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundGetValueResult { - get_result: GetResult { - opt_value: ctx.value.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - value_nodes: ctx.value_nodes.clone(), - }) - } + TimeoutOr::Timeout => FanoutResultKind::Timeout, + // If we finished with or without consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, + // If we ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that log_stor!(debug "GetValue Fanout Error: {}", e); - Err(e.into()) + return Err(e.into()); } - } + }; + + let ctx = context.lock(); + let fanout_result = FanoutResult { + kind, + value_nodes: ctx.value_nodes.clone(), + }; + log_stor!(debug "GetValue Fanout: {:?}", fanout_result); + + Ok(OutboundGetValueResult { + fanout_result, + consensus_count, + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), + }, + }) } /// Handle a received 'Get Value' query diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 48033d56..079e221c 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -1,23 +1,53 @@ use super::*; +/// The fully parsed descriptor +struct DescriptorInfo { + /// The descriptor itself + descriptor: Arc, + + /// The in-schema subkeys that overlap the inspected range + subkeys: ValueSubkeyRangeSet, +} + +impl DescriptorInfo { + pub fn new( + descriptor: Arc, + subkeys: &ValueSubkeyRangeSet, + ) -> VeilidAPIResult { + let schema = descriptor.schema().map_err(RPCError::invalid_format)?; + let subkeys = subkeys.intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey())); + + Ok(Self { + descriptor, + subkeys, + }) + } +} + +/// Info tracked per subkey +struct SubkeySeqCount { + /// The newest sequence number found for a subkey + pub seq: ValueSeqNum, + /// The nodes that have returned the value so far (up to the consensus count) + pub value_nodes: Vec, +} + /// The context of the outbound_get_value operation struct OutboundInspectValueContext { - /// The combined sequence map so far - pub seqs: Vec, - /// The nodes that have returned the value so far (up to the consensus count) - pub value_nodes: Vec, + /// The combined sequence numbers and result counts so far + pub seqcounts: Vec, /// The descriptor if we got a fresh one or empty if no descriptor was needed - pub descriptor: Option>, - /// The parsed schema from the descriptor if we have one - pub schema: Option, + pub opt_descriptor_info: Option, } /// The result of the outbound_get_value operation pub(super) struct OutboundInspectValueResult { - /// The subkey that was retrieved + /// Fanout results for each subkey + pub fanout_results: Vec, + /// Required count for consensus for this operation, + pub consensus_count: usize, + /// The inspection that was retrieved pub inspect_result: InspectResult, - /// And where it was retrieved from - pub value_nodes: Vec, } impl StorageManager { @@ -28,39 +58,62 @@ impl StorageManager { key: TypedKey, subkeys: ValueSubkeyRangeSet, safety_selection: SafetySelection, - last_inspect_result: InspectResult, + mut local_inspect_result: InspectResult, + use_set_scope: bool, ) -> VeilidAPIResult { let routing_table = rpc_processor.routing_table(); - // Get the DHT parameters for 'InspectValue' (the same as for 'GetValue') + // Get the DHT parameters for 'InspectValue' + // Can use either 'get scope' or 'set scope' depending on the purpose of the inspection let (key_count, consensus_count, fanout, timeout_us) = { let c = self.unlocked_inner.config.get(); - ( - c.network.dht.max_find_node_count as usize, - c.network.dht.get_value_count as usize, - c.network.dht.get_value_fanout as usize, - TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), - ) + + if use_set_scope { + // If we're simulating a set, increase the previous sequence number we have by 1 + for seq in &mut local_inspect_result.seqs { + *seq += 1; + } + + ( + c.network.dht.max_find_node_count as usize, + c.network.dht.set_value_count as usize, + c.network.dht.set_value_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)), + ) + } else { + ( + c.network.dht.max_find_node_count as usize, + c.network.dht.get_value_count as usize, + c.network.dht.get_value_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), + ) + } }; // Make do-inspect-value answer context - let schema = if let Some(d) = &last_inspect_result.opt_descriptor { - Some(d.schema()?) + let opt_descriptor_info = if let Some(descriptor) = &local_inspect_result.opt_descriptor { + Some(DescriptorInfo::new(descriptor.clone(), &subkeys)?) } else { None }; + let context = Arc::new(Mutex::new(OutboundInspectValueContext { - seqs: last_inspect_result.seqs, - value_nodes: vec![], - descriptor: last_inspect_result.opt_descriptor.clone(), - schema, + seqcounts: local_inspect_result + .seqs + .iter() + .map(|s| SubkeySeqCount { + seq: *s, + value_nodes: vec![], + }) + .collect(), + opt_descriptor_info, })); // Routine to call to generate fanout let call_routine = |next_node: NodeRef| { let rpc_processor = rpc_processor.clone(); let context = context.clone(); - let last_descriptor = last_inspect_result.opt_descriptor.clone(); + let opt_descriptor = local_inspect_result.opt_descriptor.clone(); let subkeys = subkeys.clone(); async move { let iva = network_result_try!( @@ -70,7 +123,7 @@ impl StorageManager { Destination::direct(next_node.clone()).with_safety(safety_selection), key, subkeys.clone(), - last_descriptor.map(|x| (*x).clone()), + opt_descriptor.map(|x| (*x).clone()), ) .await? ); @@ -79,9 +132,15 @@ impl StorageManager { // already be validated by rpc_call_inspect_value if let Some(descriptor) = iva.answer.descriptor { let mut ctx = context.lock(); - if ctx.descriptor.is_none() && ctx.schema.is_none() { - ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?); - ctx.descriptor = Some(Arc::new(descriptor)); + if ctx.opt_descriptor_info.is_none() { + let descriptor_info = + match DescriptorInfo::new(Arc::new(descriptor.clone()), &subkeys) { + Ok(v) => v, + Err(e) => { + return Ok(NetworkResult::invalid_message(e)); + } + }; + ctx.opt_descriptor_info = Some(descriptor_info); } } @@ -90,59 +149,72 @@ impl StorageManager { log_stor!(debug "Got seqs back: len={}", iva.answer.seqs.len()); let mut ctx = context.lock(); - // Ensure we have a schema and descriptor - let (Some(_descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { + // Ensure we have a schema and descriptor etc + let Some(descriptor_info) = &ctx.opt_descriptor_info else { // Got a value but no descriptor for it // Move to the next node return Ok(NetworkResult::invalid_message( - "Got value with no descriptor", + "Got inspection with no descriptor", )); }; // Get number of subkeys from schema and ensure we are getting the // right number of sequence numbers betwen that and what we asked for - let in_schema_subkeys = subkeys - .intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey())); - if iva.answer.seqs.len() != in_schema_subkeys.len() { + if iva.answer.seqs.len() != descriptor_info.subkeys.len() { // Not the right number of sequence numbers // Move to the next node return Ok(NetworkResult::invalid_message(format!( "wrong number of seqs returned {} (wanted {})", iva.answer.seqs.len(), - in_schema_subkeys.len() + descriptor_info.subkeys.len() ))); } // If we have a prior seqs list, merge in the new seqs - if ctx.seqs.is_empty() { - ctx.seqs = iva.answer.seqs.clone(); - // One node has shown us the newest sequence numbers so far - ctx.value_nodes = vec![next_node]; + if ctx.seqcounts.is_empty() { + ctx.seqcounts = iva + .answer + .seqs + .iter() + .map(|s| SubkeySeqCount { + seq: *s, + // One node has shown us the newest sequence numbers so far + value_nodes: vec![next_node.clone()], + }) + .collect(); } else { - if ctx.seqs.len() != iva.answer.seqs.len() { + if ctx.seqcounts.len() != iva.answer.seqs.len() { return Err(RPCError::internal( "seqs list length should always be equal by now", )); } - let mut newer_seq = false; - for pair in ctx.seqs.iter_mut().zip(iva.answer.seqs.iter()) { + for pair in ctx.seqcounts.iter_mut().zip(iva.answer.seqs.iter()) { + let ctx_seqcnt = pair.0; + let answer_seq = *pair.1; + + // If we already have consensus for this subkey, don't bother updating it any more + // While we may find a better sequence number if we keep looking, this does not mimic the behavior + // of get and set unless we stop here + if ctx_seqcnt.value_nodes.len() >= consensus_count { + continue; + } + // If the new seq isn't undefined and is better than the old seq (either greater or old is undefined) // Then take that sequence number and note that we have gotten newer sequence numbers so we keep // looking for consensus - if *pair.1 != ValueSeqNum::MAX - && (*pair.0 == ValueSeqNum::MAX || pair.1 > pair.0) - { - newer_seq = true; - *pair.0 = *pair.1; + // If the sequence number matches the old sequence number, then we keep the value node for reference later + if answer_seq != ValueSeqNum::MAX { + if ctx_seqcnt.seq == ValueSeqNum::MAX || answer_seq > ctx_seqcnt.seq + { + // One node has shown us the latest sequence numbers so far + ctx_seqcnt.seq = answer_seq; + ctx_seqcnt.value_nodes = vec![next_node.clone()]; + } else if answer_seq == ctx_seqcnt.seq { + // Keep the nodes that showed us the latest values + ctx_seqcnt.value_nodes.push(next_node.clone()); + } } } - if newer_seq { - // One node has shown us the latest sequence numbers so far - ctx.value_nodes = vec![next_node]; - } else { - // Increase the consensus count for the seqs list - ctx.value_nodes.push(next_node); - } } } @@ -155,12 +227,16 @@ impl StorageManager { // Routine to call to check if we're done at each step let check_done = |_closest_nodes: &[NodeRef]| { - // If we have reached sufficient consensus, return done + // If we have reached sufficient consensus on all subkeys, return done let ctx = context.lock(); - if !ctx.seqs.is_empty() - && ctx.descriptor.is_some() - && ctx.value_nodes.len() >= consensus_count - { + let mut has_consensus = true; + for cs in ctx.seqcounts.iter() { + if cs.value_nodes.len() < consensus_count { + has_consensus = false; + break; + } + } + if !ctx.seqcounts.is_empty() && ctx.opt_descriptor_info.is_some() && has_consensus { return Some(()); } None @@ -178,65 +254,54 @@ impl StorageManager { check_done, ); - match fanout_call.run(vec![]).await { + let kind = match fanout_call.run(vec![]).await { // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "InspectValue Fanout Timeout Consensus"); - } else { - log_stor!(debug "InspectValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundInspectValueResult { - inspect_result: InspectResult { - seqs: ctx.seqs.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - value_nodes: ctx.value_nodes.clone(), - }) - } - // If we finished with consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "InspectValue Fanout Consensus"); - } else { - log_stor!(debug "InspectValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundInspectValueResult { - inspect_result: InspectResult { - seqs: ctx.seqs.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - value_nodes: ctx.value_nodes.clone(), - }) - } - // If we finished without consensus (ran out of nodes before getting consensus) - TimeoutOr::Value(Ok(None)) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "InspectValue Fanout Exhausted Consensus"); - } else { - log_stor!(debug "InspectValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundInspectValueResult { - inspect_result: InspectResult { - seqs: ctx.seqs.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - value_nodes: ctx.value_nodes.clone(), - }) - } + TimeoutOr::Timeout => FanoutResultKind::Timeout, + // If we finished with or without consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, + // If we ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that log_stor!(debug "InspectValue Fanout Error: {}", e); - Err(e.into()) + return Err(e.into()); } + }; + + let ctx = context.lock(); + let mut fanout_results = vec![]; + for cs in &ctx.seqcounts { + let has_consensus = cs.value_nodes.len() > consensus_count; + let fanout_result = FanoutResult { + kind: if has_consensus { + FanoutResultKind::Finished + } else { + kind + }, + value_nodes: cs.value_nodes.clone(), + }; + fanout_results.push(fanout_result); } + + log_stor!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::>()); + + Ok(OutboundInspectValueResult { + fanout_results, + consensus_count, + inspect_result: InspectResult { + subkeys: ctx + .opt_descriptor_info + .as_ref() + .map(|d| d.subkeys.clone()) + .unwrap_or_default(), + seqs: ctx.seqcounts.iter().map(|cs| cs.seq).collect(), + opt_descriptor: ctx + .opt_descriptor_info + .as_ref() + .map(|d| d.descriptor.clone()), + }, + }) } /// Handle a received 'Inspect Value' query diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 90a054f1..1a83d49f 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -408,7 +408,7 @@ impl StorageManager { // Keep the list of nodes that returned a value for later reference let mut inner = self.lock().await?; - inner.set_value_nodes(key, result.value_nodes)?; + inner.set_value_nodes(key, result.fanout_result.value_nodes)?; // If we got a new value back then write it to the opened record if Some(get_result_value.value_data().seq()) != opt_last_seq { @@ -541,7 +541,7 @@ impl StorageManager { // Keep the list of nodes that returned a value for later reference let mut inner = self.lock().await?; - inner.set_value_nodes(key, result.value_nodes)?; + inner.set_value_nodes(key, result.fanout_result.value_nodes)?; // Return the new value if it differs from what was asked to set if result.signed_value_data.value_data() != signed_value_data.value_data() { @@ -732,6 +732,71 @@ impl StorageManager { Ok(true) } + /// Inspect an opened DHT record for its subkey sequence numbers + pub async fn inspect_record( + &self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + scope: DHTReportScope, + ) -> VeilidAPIResult { + let mut inner = self.lock().await?; + let safety_selection = { + let Some(opened_record) = inner.opened_records.get(&key) else { + apibail_generic!("record not open"); + }; + opened_record.safety_selection() + }; + + // See if the requested record is our local record store + let local_inspect_result = inner + .handle_inspect_local_value(key, subkeys.clone(), true) + .await?; + + // If this is the maximum scope we're interested in, return the report + if matches!(scope, DHTReportScope::Local) { + return Ok(DHTRecordReport::new( + local_inspect_result.subkeys, + local_inspect_result.seqs, + )); + } + + // Get rpc processor and drop mutex so we don't block while getting the value from the network + let Some(rpc_processor) = Self::online_ready_inner(&inner) else { + apibail_try_again!("offline, try again later"); + }; + + // Drop the lock for network access + drop(inner); + + // Get the inspect record report from the network + let result = self + .outbound_inspect_value( + rpc_processor, + key, + subkeys, + safety_selection, + local_inspect_result, + matches!(scope, DHTReportScope::NetworkSet), + ) + .await?; + + // See if we got a seqs list back + if result.inspect_result.seqs.is_empty() { + // If we got nothing back then we also had nothing beforehand, return nothing + return Ok(DHTRecordReport::default()); + }; + + // Keep the list of nodes that returned a value for later reference + // xxx switch 'value nodes' to keeping ranges of subkeys per node + // let mut inner = self.lock().await?; + // inner.set_value_nodes(key, result.fanout_results.value_nodes)?; + + Ok(DHTRecordReport::new( + result.inspect_result.subkeys, + result.inspect_result.seqs, + )) + } + // Send single value change out to the network #[instrument(level = "trace", skip(self), err)] async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index 91427200..7b59613a 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -89,6 +89,8 @@ pub struct GetResult { /// The result of the do_inspect_value_operation #[derive(Default, Debug)] pub struct InspectResult { + /// The actual in-schema subkey range being reported on + pub subkeys: ValueSubkeyRangeSet, /// The sequence map pub seqs: Vec, /// The descriptor if we got a fresh one or empty if no descriptor was needed @@ -811,6 +813,7 @@ where }; if let Some(seqs) = self.seqs_cache.get(&sck) { return Ok(Some(InspectResult { + subkeys, seqs: seqs.clone(), opt_descriptor, })); @@ -845,6 +848,7 @@ where } Ok(Some(InspectResult { + subkeys, seqs, opt_descriptor, })) diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index b18b2d78..a0b85817 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -14,10 +14,12 @@ struct OutboundSetValueContext { /// The result of the outbound_set_value operation pub(super) struct OutboundSetValueResult { + /// Fanout result + pub fanout_result: FanoutResult, + /// Consensus count for this operation, + pub consensus_count: usize, /// The value that was set pub signed_value_data: Arc, - /// And where it was set to - pub value_nodes: Vec, } impl StorageManager { @@ -164,57 +166,32 @@ impl StorageManager { check_done, ); - match fanout_call.run(vec![]).await { + let kind = match fanout_call.run(vec![]).await { // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "SetValue Fanout Timeout Consensus"); - } else { - log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); - } - - Ok(OutboundSetValueResult { - signed_value_data: ctx.value.clone(), - value_nodes: ctx.value_nodes.clone(), - }) - } + TimeoutOr::Timeout => FanoutResultKind::Timeout, // If we finished with or without consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "SetValue Fanout Consensus"); - } else { - log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundSetValueResult { - signed_value_data: ctx.value.clone(), - value_nodes: ctx.value_nodes.clone(), - }) - } + TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, // If we ran out of nodes before getting consensus) - TimeoutOr::Value(Ok(None)) => { - // Return the best answer we've got - let ctx = context.lock(); - if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "SetValue Fanout Exhausted Consensus"); - } else { - log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); - } - Ok(OutboundSetValueResult { - signed_value_data: ctx.value.clone(), - value_nodes: ctx.value_nodes.clone(), - }) - } + TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that log_stor!(debug "SetValue Fanout Error: {}", e); - Err(e.into()) + return Err(e.into()); } - } + }; + let ctx = context.lock(); + let fanout_result = FanoutResult { + kind, + value_nodes: ctx.value_nodes.clone(), + }; + log_stor!(debug "SetValue Fanout: {:?}", fanout_result); + + Ok(OutboundSetValueResult { + fanout_result, + consensus_count, + signed_value_data: ctx.value.clone(), + }) } /// Handle a received 'Set Value' query diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index cc971252..c51fbe0f 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -568,6 +568,7 @@ impl StorageManagerInner { } Ok(InspectResult { + subkeys: ValueSubkeyRangeSet::new(), seqs: vec![], opt_descriptor: None, }) @@ -648,6 +649,7 @@ impl StorageManagerInner { } Ok(InspectResult { + subkeys: ValueSubkeyRangeSet::new(), seqs: vec![], opt_descriptor: None, }) diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index d5601021..1760f02b 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -410,6 +410,35 @@ impl RoutingContext { storage_manager.cancel_watch_values(key, subkeys).await } + /// Inspects a DHT record for subkey state. + /// + /// * `key` is the record key to watch. it must first be opened for reading or writing. + /// * `subkeys` is the the range of subkeys to inspect. The range must not exceed 512 discrete non-overlapping or adjacent subranges. + /// If no range is specified, this is equivalent to inspecting the entire range of subkeys. In total, the list of subkeys returned will be truncated at 512 elements. + /// * `scope` is what kind of range the inspection has: + /// If scope is set to DHTReportScope::Local, results will be only for a locally stored record + /// If scope is set to DHTReportScope::NetworkGet, results will be as if each subkey were get over the network as a GetValue + /// If scope is set to DHTReportScope::NetworkSet, results will be as if each subkey were set over the network as a SetValue + /// + /// This is useful for checking if you should push new subkeys to the network, or retrieve the current state of a record from the network + /// to see what needs updating locally. + /// + /// Returns a DHTRecordReport with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range. + #[instrument(target = "veilid_api", level = "debug", ret, err)] + pub async fn inspect_dht_record( + &self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + scope: DHTReportScope, + ) -> VeilidAPIResult { + event!(target: "veilid_api", Level::DEBUG, + "RoutingContext::inspect_dht_record(self: {:?}, key: {:?}, subkeys: {:?}, scope: {:?})", self, key, subkeys, scope); + + Crypto::validate_crypto_kind(key.kind)?; + let storage_manager = self.api.storage_manager()?; + storage_manager.inspect_record(key, subkeys, scope).await + } + /////////////////////////////////// /// Block Store diff --git a/veilid-core/src/veilid_api/types/dht/dht_record_report.rs b/veilid-core/src/veilid_api/types/dht/dht_record_report.rs new file mode 100644 index 00000000..f07ccfd5 --- /dev/null +++ b/veilid-core/src/veilid_api/types/dht/dht_record_report.rs @@ -0,0 +1,46 @@ +use super::*; + +/// DHT Record Report +#[derive( + Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, JsonSchema, +)] +#[cfg_attr( + target_arch = "wasm32", + derive(Tsify), + tsify(from_wasm_abi, into_wasm_abi) +)] +pub struct DHTRecordReport { + /// The actual subkey range within the schema being reported on + /// This may be a subset of the requested range if it exceeds the schema limits + /// or has more than 512 subkeys + subkeys: ValueSubkeyRangeSet, + /// The sequence numbers of each subkey requested from a DHT Record + seqs: Vec, +} +from_impl_to_jsvalue!(DHTRecordReport); + +impl DHTRecordReport { + pub fn new(subkeys: ValueSubkeyRangeSet, seqs: Vec) -> Self { + Self { subkeys, seqs } + } + + pub fn seqs(&self) -> &[ValueSeqNum] { + &self.seqs + } +} + +/// DHT Record Report Scope +#[derive( + Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, +)] +#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi, namespace))] +pub enum DHTReportScope { + Local = 0, + NetworkGet = 1, + NetworkSet = 2, +} +impl Default for DHTReportScope { + fn default() -> Self { + Self::Local + } +} diff --git a/veilid-core/src/veilid_api/types/dht/mod.rs b/veilid-core/src/veilid_api/types/dht/mod.rs index bd7b93eb..d5c9b4cc 100644 --- a/veilid-core/src/veilid_api/types/dht/mod.rs +++ b/veilid-core/src/veilid_api/types/dht/mod.rs @@ -1,4 +1,5 @@ mod dht_record_descriptor; +mod dht_record_report; mod schema; mod value_data; mod value_subkey_range_set; @@ -6,6 +7,7 @@ mod value_subkey_range_set; use super::*; pub use dht_record_descriptor::*; +pub use dht_record_report::*; pub use schema::*; pub use value_data::*; pub use value_subkey_range_set::*;