diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index b0740651..f56a3a42 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -113,6 +113,11 @@ impl RPCOperationGetValueA { // Validate descriptor if let Some(descriptor) = &self.descriptor { + // Ensure the descriptor itself validates + descriptor + .validate(get_value_context.vcrypto.clone()) + .map_err(RPCError::protocol)?; + // Ensure descriptor matches last one if let Some(last_descriptor) = &get_value_context.last_descriptor { if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { @@ -121,16 +126,16 @@ impl RPCOperationGetValueA { )); } } - // Ensure the descriptor itself validates - descriptor - .validate(get_value_context.vcrypto.clone()) - .map_err(RPCError::protocol)?; } // Ensure the value validates if let Some(value) = &self.value { // Get descriptor to validate with - let Some(descriptor) = self.descriptor.or(get_value_context.last_descriptor) else { + let Some(descriptor) = self + .descriptor + .as_ref() + .or(get_value_context.last_descriptor.as_ref()) + else { return Err(RPCError::protocol( "no last descriptor, requires a descriptor", )); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs index 34340e65..b00b6319 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs @@ -175,6 +175,11 @@ impl RPCOperationInspectValueA { // Validate descriptor if let Some(descriptor) = &self.descriptor { + // Ensure the descriptor itself validates + descriptor + .validate(inspect_value_context.vcrypto.clone()) + .map_err(RPCError::protocol)?; + // Ensure descriptor matches last one if let Some(last_descriptor) = &inspect_value_context.last_descriptor { if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { @@ -183,10 +188,6 @@ impl RPCOperationInspectValueA { )); } } - // Ensure the descriptor itself validates - descriptor - .validate(inspect_value_context.vcrypto.clone()) - .map_err(RPCError::protocol)?; } PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs index 154124bb..45d00643 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs @@ -141,13 +141,13 @@ impl RPCOperationSetValueA { panic!("Wrong context type for SetValueA"); }; - if let Some(value) = &self.value { - // Ensure the descriptor itself validates - set_value_context - .descriptor - .validate(set_value_context.vcrypto.clone()) - .map_err(RPCError::protocol)?; + // Ensure the descriptor itself validates + set_value_context + .descriptor + .validate(set_value_context.vcrypto.clone()) + .map_err(RPCError::protocol)?; + if let Some(value) = &self.value { // And the signed value data value .validate( diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index 393f29e8..3340ce2f 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -21,9 +21,7 @@ impl RPCOperationValueChanged { watch_id: u64, value: SignedValueData, ) -> Result { - let subkeys_len = subkeys.ranges_len() as usize; - - if subkeys_len > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { + if subkeys.ranges_len() > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { return Err(RPCError::protocol( "ValueChanged subkey ranges length too long", )); diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 982091fb..94d71306 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -234,7 +234,7 @@ impl RPCProcessor { let c = self.config.get(); c.network.dht.set_value_count as usize }; - let (subkey_result_value, subkey_result_descriptor) = if closer_to_key_peers.len() >= set_value_count { + let (get_result_value, get_result_descriptor) = if closer_to_key_peers.len() >= set_value_count { // Not close enough (None, None) } else { @@ -242,15 +242,15 @@ impl RPCProcessor { // See if we have this record ourselves let storage_manager = self.storage_manager(); - let subkey_result = network_result_try!(storage_manager + let get_result = network_result_try!(storage_manager .inbound_get_value(key, subkey, want_descriptor) .await .map_err(RPCError::internal)?); - (subkey_result.value, subkey_result.descriptor) + (get_result.opt_value, get_result.opt_descriptor) }; if debug_target_enabled!("dht") { - let debug_string_value = subkey_result_value.as_ref().map(|v| { + let debug_string_value = get_result_value.as_ref().map(|v| { format!(" len={} seq={} writer={}", v.value_data().data().len(), v.value_data().seq(), @@ -263,7 +263,7 @@ impl RPCProcessor { key, subkey, debug_string_value, - if subkey_result_descriptor.is_some() { + if get_result_descriptor.is_some() { " +desc" } else { "" @@ -277,9 +277,9 @@ impl RPCProcessor { // Make GetValue answer let get_value_a = RPCOperationGetValueA::new( - subkey_result_value.map(|x| (*x).clone()), + get_result_value.map(|x| (*x).clone()), closer_to_key_peers, - subkey_result_descriptor.map(|x| (*x).clone()), + get_result_descriptor.map(|x| (*x).clone()), )?; // Send GetValue answer diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index 3cd98f55..011d8646 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -31,7 +31,7 @@ impl RPCProcessor { key: TypedKey, subkeys: ValueSubkeyRangeSet, last_descriptor: Option, - ) ->RPCNetworkResult> { + ) -> RPCNetworkResult> { // Ensure destination never has a private route // and get the target noderef so we can validate the response let Some(target) = dest.node() else { @@ -232,7 +232,7 @@ impl RPCProcessor { .inbound_inspect_value(key, subkeys, want_descriptor) .await .map_err(RPCError::internal)?); - (inspect_result.seqs, inspect_result.descriptor) + (inspect_result.seqs, inspect_result.opt_descriptor) }; if debug_target_enabled!("dht") { diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 4cfb7ede..6cb1d140 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -15,7 +15,7 @@ struct OutboundGetValueContext { /// The result of the outbound_get_value operation pub(super) struct OutboundGetValueResult { /// The subkey that was retrieved - pub subkey_result: SubkeyResult, + pub get_result: GetResult, /// And where it was retrieved from pub value_nodes: Vec, } @@ -28,7 +28,7 @@ impl StorageManager { key: TypedKey, subkey: ValueSubkey, safety_selection: SafetySelection, - last_subkey_result: SubkeyResult, + last_get_result: GetResult, ) -> VeilidAPIResult { let routing_table = rpc_processor.routing_table(); @@ -44,15 +44,15 @@ impl StorageManager { }; // Make do-get-value answer context - let schema = if let Some(d) = &last_subkey_result.descriptor { + let schema = if let Some(d) = &last_get_result.opt_descriptor { Some(d.schema()?) } else { None }; let context = Arc::new(Mutex::new(OutboundGetValueContext { - value: last_subkey_result.value, + value: last_get_result.opt_value, value_nodes: vec![], - descriptor: last_subkey_result.descriptor.clone(), + descriptor: last_get_result.opt_descriptor.clone(), schema, })); @@ -60,7 +60,7 @@ impl StorageManager { let call_routine = |next_node: NodeRef| { let rpc_processor = rpc_processor.clone(); let context = context.clone(); - let last_descriptor = last_subkey_result.descriptor.clone(); + let last_descriptor = last_get_result.opt_descriptor.clone(); async move { let gva = network_result_try!( rpc_processor @@ -184,9 +184,9 @@ impl StorageManager { log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); } Ok(OutboundGetValueResult { - subkey_result: SubkeyResult { - value: ctx.value.clone(), - descriptor: ctx.descriptor.clone(), + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), }, value_nodes: ctx.value_nodes.clone(), }) @@ -201,9 +201,9 @@ impl StorageManager { log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); } Ok(OutboundGetValueResult { - subkey_result: SubkeyResult { - value: ctx.value.clone(), - descriptor: ctx.descriptor.clone(), + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), }, value_nodes: ctx.value_nodes.clone(), }) @@ -218,9 +218,9 @@ impl StorageManager { log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); } Ok(OutboundGetValueResult { - subkey_result: SubkeyResult { - value: ctx.value.clone(), - descriptor: ctx.descriptor.clone(), + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), }, value_nodes: ctx.value_nodes.clone(), }) @@ -240,28 +240,28 @@ impl StorageManager { key: TypedKey, subkey: ValueSubkey, want_descriptor: bool, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult> { let mut inner = self.lock().await?; // See if this is a remote or local value - let (_is_local, last_subkey_result) = { + let (_is_local, last_get_result) = { // See if the subkey we are getting has a last known local value - let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + let mut last_get_result = inner.handle_get_local_value(key, subkey, true).await?; // If this is local, it must have a descriptor already - if last_subkey_result.descriptor.is_some() { + if last_get_result.opt_descriptor.is_some() { if !want_descriptor { - last_subkey_result.descriptor = None; + last_get_result.opt_descriptor = None; } - (true, last_subkey_result) + (true, last_get_result) } else { // See if the subkey we are getting has a last known remote value - let last_subkey_result = inner + let last_get_result = inner .handle_get_remote_value(key, subkey, want_descriptor) .await?; - (false, last_subkey_result) + (false, last_get_result) } }; - Ok(NetworkResult::value(last_subkey_result)) + Ok(NetworkResult::value(last_get_result)) } } diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 8ab19361..685f4d6f 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -1,9 +1,9 @@ use super::*; /// The context of the outbound_get_value operation -struct OutboundGxxx continue here etValueContext { - /// The latest value of the subkey, may be the value passed in - pub value: Option>, +struct OutboundInspectValueContext { + /// The combined sequence map so far + pub seqs: Vec, /// The nodes that have returned the value so far (up to the consensus count) pub value_nodes: Vec, /// The descriptor if we got a fresh one or empty if no descriptor was needed @@ -13,26 +13,26 @@ struct OutboundGxxx continue here etValueContext { } /// The result of the outbound_get_value operation -pub(super) struct OutboundGetValueResult { +pub(super) struct OutboundInspectValueResult { /// The subkey that was retrieved - pub subkey_result: SubkeyResult, + pub inspect_result: InspectResult, /// And where it was retrieved from pub value_nodes: Vec, } impl StorageManager { /// Perform a 'inspect value' query on the network - pub(super) async fn outbound_get_value( + pub(super) async fn outbound_inspect_value( &self, rpc_processor: RPCProcessor, key: TypedKey, - subkey: ValueSubkey, + subkeys: ValueSubkeyRangeSet, safety_selection: SafetySelection, - last_subkey_result: SubkeyResult, - ) -> VeilidAPIResult { + last_inspect_result: InspectResult, + ) -> VeilidAPIResult { let routing_table = rpc_processor.routing_table(); - // Get the DHT parameters for 'GetValue' + // Get the DHT parameters for 'InspectValue' (the same as for 'GetValue') let (key_count, consensus_count, fanout, timeout_us) = { let c = self.unlocked_inner.config.get(); ( @@ -43,16 +43,16 @@ impl StorageManager { ) }; - // Make do-get-value answer context - let schema = if let Some(d) = &last_subkey_result.descriptor { + // Make do-inspect-value answer context + let schema = if let Some(d) = &last_inspect_result.opt_descriptor { Some(d.schema()?) } else { None }; - let context = Arc::new(Mutex::new(OutboundGetValueContext { - value: last_subkey_result.value, + let context = Arc::new(Mutex::new(OutboundInspectValueContext { + seqs: last_inspect_result.seqs, value_nodes: vec![], - descriptor: last_subkey_result.descriptor.clone(), + descriptor: last_inspect_result.opt_descriptor.clone(), schema, })); @@ -60,23 +60,24 @@ impl StorageManager { let call_routine = |next_node: NodeRef| { let rpc_processor = rpc_processor.clone(); let context = context.clone(); - let last_descriptor = last_subkey_result.descriptor.clone(); + let last_descriptor = last_inspect_result.opt_descriptor.clone(); + let subkeys = subkeys.clone(); async move { - let gva = network_result_try!( + let iva = network_result_try!( rpc_processor .clone() - .rpc_call_get_value( + .rpc_call_inspect_value( Destination::direct(next_node.clone()).with_safety(safety_selection), key, - subkey, + subkeys.clone(), last_descriptor.map(|x| (*x).clone()), ) .await? ); // Keep the descriptor if we got one. If we had a last_descriptor it will - // already be validated by rpc_call_get_value - if let Some(descriptor) = gva.answer.descriptor { + // already be validated by rpc_call_inspect_value + if let Some(descriptor) = iva.answer.descriptor { let mut ctx = context.lock(); if ctx.descriptor.is_none() && ctx.schema.is_none() { ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?); @@ -85,12 +86,12 @@ impl StorageManager { } // Keep the value if we got one and it is newer and it passes schema validation - if let Some(value) = gva.answer.value { - log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); + if !iva.answer.seqs.is_empty() { + log_stor!(debug "Got seqs back: len={}", iva.answer.seqs.len()); let mut ctx = context.lock(); // Ensure we have a schema and descriptor - let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { + let (Some(_descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { // Got a value but no descriptor for it // Move to the next node return Ok(NetworkResult::invalid_message( @@ -98,53 +99,57 @@ impl StorageManager { )); }; - // Validate with schema - if !schema.check_subkey_value_data( - descriptor.owner(), - subkey, - value.value_data(), - ) { - // Validation failed, ignore this value + // Get number of subkeys from schema and ensure we are getting the + // right number of sequence numbers betwen that and what we asked for + let in_schema_subkeys = subkeys + .intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey())); + if iva.answer.seqs.len() != in_schema_subkeys.len() { + // Not the right number of sequence numbers // Move to the next node return Ok(NetworkResult::invalid_message(format!( - "Schema validation failed on subkey {}", - subkey + "wrong number of seqs returned {} (wanted {})", + iva.answer.seqs.len(), + in_schema_subkeys.len() ))); } - // If we have a prior value, see if this is a newer sequence number - if let Some(prior_value) = &ctx.value { - let prior_seq = prior_value.value_data().seq(); - let new_seq = value.value_data().seq(); - - if new_seq == prior_seq { - // If sequence number is the same, the data should be the same - if prior_value.value_data() != value.value_data() { - // Move to the next node - return Ok(NetworkResult::invalid_message("value data mismatch")); + // If we have a prior seqs list, merge in the new seqs + if ctx.seqs.len() == 0 { + ctx.seqs = iva.answer.seqs.clone(); + // One node has shown us the newest sequence numbers so far + ctx.value_nodes = vec![next_node]; + } else { + if ctx.seqs.len() != iva.answer.seqs.len() { + return Err(RPCError::internal( + "seqs list length should always be equal by now", + )); + } + let mut newer_seq = false; + for pair in ctx.seqs.iter_mut().zip(iva.answer.seqs.iter()) { + // If the new seq isn't undefined and is better than the old seq (either greater or old is undefined) + // Then take that sequence number and note that we have gotten newer sequence numbers so we keep + // looking for consensus + if *pair.1 != ValueSeqNum::MAX + && (*pair.0 == ValueSeqNum::MAX || pair.1 > pair.0) + { + newer_seq = true; + *pair.0 = *pair.1; } - // Increase the consensus count for the existing value - ctx.value_nodes.push(next_node); - } else if new_seq > prior_seq { - // If the sequence number is greater, start over with the new value - ctx.value = Some(Arc::new(value)); - // One node has shown us this value so far + } + if newer_seq { + // One node has shown us the latest sequence numbers so far ctx.value_nodes = vec![next_node]; } else { - // If the sequence number is older, ignore it + // Increase the consensus count for the seqs list + ctx.value_nodes.push(next_node); } - } else { - // If we have no prior value, keep it - ctx.value = Some(Arc::new(value)); - // One node has shown us this value so far - ctx.value_nodes = vec![next_node]; } } // Return peers if we have some - log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); + log_network_result!(debug "InspectValue fanout call returned peers {}", iva.answer.peers.len()); - Ok(NetworkResult::value(gva.answer.peers)) + Ok(NetworkResult::value(iva.answer.peers)) } }; @@ -152,7 +157,7 @@ impl StorageManager { let check_done = |_closest_nodes: &[NodeRef]| { // If we have reached sufficient consensus, return done let ctx = context.lock(); - if ctx.value.is_some() + if ctx.seqs.len() > 0 && ctx.descriptor.is_some() && ctx.value_nodes.len() >= consensus_count { @@ -179,14 +184,14 @@ impl StorageManager { // Return the best answer we've got let ctx = context.lock(); if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "GetValue Fanout Timeout Consensus"); + log_stor!(debug "InspectValue Fanout Timeout Consensus"); } else { - log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); + log_stor!(debug "InspectValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); } - Ok(OutboundGetValueResult { - subkey_result: SubkeyResult { - value: ctx.value.clone(), - descriptor: ctx.descriptor.clone(), + Ok(OutboundInspectValueResult { + inspect_result: InspectResult { + seqs: ctx.seqs.clone(), + opt_descriptor: ctx.descriptor.clone(), }, value_nodes: ctx.value_nodes.clone(), }) @@ -196,14 +201,14 @@ impl StorageManager { // Return the best answer we've got let ctx = context.lock(); if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "GetValue Fanout Consensus"); + log_stor!(debug "InspectValue Fanout Consensus"); } else { - log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); + log_stor!(debug "InspectValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); } - Ok(OutboundGetValueResult { - subkey_result: SubkeyResult { - value: ctx.value.clone(), - descriptor: ctx.descriptor.clone(), + Ok(OutboundInspectValueResult { + inspect_result: InspectResult { + seqs: ctx.seqs.clone(), + opt_descriptor: ctx.descriptor.clone(), }, value_nodes: ctx.value_nodes.clone(), }) @@ -213,14 +218,14 @@ impl StorageManager { // Return the best answer we've got let ctx = context.lock(); if ctx.value_nodes.len() >= consensus_count { - log_stor!(debug "GetValue Fanout Exhausted Consensus"); + log_stor!(debug "InspectValue Fanout Exhausted Consensus"); } else { - log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); + log_stor!(debug "InspectValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); } - Ok(OutboundGetValueResult { - subkey_result: SubkeyResult { - value: ctx.value.clone(), - descriptor: ctx.descriptor.clone(), + Ok(OutboundInspectValueResult { + inspect_result: InspectResult { + seqs: ctx.seqs.clone(), + opt_descriptor: ctx.descriptor.clone(), }, value_nodes: ctx.value_nodes.clone(), }) @@ -228,40 +233,42 @@ impl StorageManager { // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that - log_stor!(debug "GetValue Fanout Error: {}", e); + log_stor!(debug "InspectValue Fanout Error: {}", e); Err(e.into()) } } } - /// Handle a received 'Get Value' query - pub async fn inbound_get_value( + /// Handle a received 'Inspect Value' query + pub async fn inbound_inspect_value( &self, key: TypedKey, - subkey: ValueSubkey, + subkeys: ValueSubkeyRangeSet, want_descriptor: bool, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult> { let mut inner = self.lock().await?; // See if this is a remote or local value - let (_is_local, last_subkey_result) = { + let (_is_local, last_get_result) = { // See if the subkey we are getting has a last known local value - let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + let mut last_inspect_result = inner + .handle_inspect_local_value(key, subkeys.clone(), true) + .await?; // If this is local, it must have a descriptor already - if last_subkey_result.descriptor.is_some() { + if last_inspect_result.opt_descriptor.is_some() { if !want_descriptor { - last_subkey_result.descriptor = None; + last_inspect_result.opt_descriptor = None; } - (true, last_subkey_result) + (true, last_inspect_result) } else { // See if the subkey we are getting has a last known remote value - let last_subkey_result = inner - .handle_get_remote_value(key, subkey, want_descriptor) + let last_inspect_result = inner + .handle_inspect_remote_value(key, subkeys, want_descriptor) .await?; - (false, last_subkey_result) + (false, last_inspect_result) } }; - Ok(NetworkResult::value(last_subkey_result)) + Ok(NetworkResult::value(last_get_result)) } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index c9e28341..90a054f1 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,5 +1,6 @@ mod debug; mod get_value; +mod inspect_value; mod record_store; mod set_value; mod storage_manager_inner; @@ -204,6 +205,7 @@ impl StorageManager { safety_selection: SafetySelection, ) -> VeilidAPIResult { let mut inner = self.lock().await?; + schema.validate()?; // Create a new owned local record from scratch let (key, owner) = inner @@ -254,12 +256,12 @@ impl StorageManager { key, subkey, safety_selection, - SubkeyResult::default(), + GetResult::default(), ) .await?; // If we got nothing back, the key wasn't found - if result.subkey_result.value.is_none() && result.subkey_result.descriptor.is_none() { + if result.get_result.opt_value.is_none() && result.get_result.opt_descriptor.is_none() { // No result apibail_key_not_found!(key); }; @@ -280,7 +282,7 @@ impl StorageManager { // Open the new record inner - .open_new_record(key, writer, subkey, result.subkey_result, safety_selection) + .open_new_record(key, writer, subkey, result.get_result, safety_selection) .await } @@ -359,12 +361,12 @@ impl StorageManager { }; // See if the requested subkey is our local record store - let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + let last_get_result = inner.handle_get_local_value(key, subkey, true).await?; // Return the existing value if we have one unless we are forcing a refresh if !force_refresh { - if let Some(last_subkey_result_value) = last_subkey_result.value { - return Ok(Some(last_subkey_result_value.value_data().clone())); + if let Some(last_get_result_value) = last_get_result.opt_value { + return Ok(Some(last_get_result_value.value_data().clone())); } } @@ -373,8 +375,8 @@ impl StorageManager { // Get rpc processor and drop mutex so we don't block while getting the value from the network let Some(rpc_processor) = Self::online_ready_inner(&inner) else { // Return the existing value if we have one if we aren't online - if let Some(last_subkey_result_value) = last_subkey_result.value { - return Ok(Some(last_subkey_result_value.value_data().clone())); + if let Some(last_get_result_value) = last_get_result.opt_value { + return Ok(Some(last_get_result_value.value_data().clone())); } apibail_try_again!("offline, try again later"); }; @@ -384,8 +386,8 @@ impl StorageManager { // May have last descriptor / value // Use the safety selection we opened the record with - let opt_last_seq = last_subkey_result - .value + let opt_last_seq = last_get_result + .opt_value .as_ref() .map(|v| v.value_data().seq()); let result = self @@ -394,12 +396,12 @@ impl StorageManager { key, subkey, safety_selection, - last_subkey_result, + last_get_result, ) .await?; // See if we got a value back - let Some(subkey_result_value) = result.subkey_result.value else { + let Some(get_result_value) = result.get_result.opt_value else { // If we got nothing back then we also had nothing beforehand, return nothing return Ok(None); }; @@ -409,17 +411,17 @@ impl StorageManager { inner.set_value_nodes(key, result.value_nodes)?; // If we got a new value back then write it to the opened record - if Some(subkey_result_value.value_data().seq()) != opt_last_seq { + if Some(get_result_value.value_data().seq()) != opt_last_seq { inner .handle_set_local_value( key, subkey, - subkey_result_value.clone(), + get_result_value.clone(), WatchUpdateMode::UpdateAll, ) .await?; } - Ok(Some(subkey_result_value.value_data().clone())) + Ok(Some(get_result_value.value_data().clone())) } /// Set the value of a subkey on an opened local record @@ -456,16 +458,16 @@ impl StorageManager { }; // See if the subkey we are modifying has a last known local value - let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + let last_get_result = inner.handle_get_local_value(key, subkey, true).await?; // Get the descriptor and schema for the key - let Some(descriptor) = last_subkey_result.descriptor else { + let Some(descriptor) = last_get_result.opt_descriptor else { apibail_generic!("must have a descriptor"); }; let schema = descriptor.schema()?; // Make new subkey data - let value_data = if let Some(last_signed_value_data) = last_subkey_result.value { + let value_data = if let Some(last_signed_value_data) = last_get_result.opt_value { if last_signed_value_data.value_data().data() == data && last_signed_value_data.value_data().writer() == &writer.key { diff --git a/veilid-core/src/storage_manager/record_store/keys.rs b/veilid-core/src/storage_manager/record_store/keys.rs index 547e4aa9..3a78c7ad 100644 --- a/veilid-core/src/storage_manager/record_store/keys.rs +++ b/veilid-core/src/storage_manager/record_store/keys.rs @@ -61,3 +61,9 @@ impl TryFrom<&[u8]> for SubkeyTableKey { Ok(SubkeyTableKey { key, subkey }) } } + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct SeqsCacheKey { + pub key: TypedKey, + pub subkeys: ValueSubkeyRangeSet, +} diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index 6f343b26..91427200 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -59,6 +59,8 @@ where record_index: LruCache>, /// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db subkey_cache: LruCache, + /// The in-memory cache of commonly accessed sequence number data so we don't have to keep hitting the db + seqs_cache: LruCache>, /// Total storage space or subkey data inclusive of structures in memory subkey_cache_total_size: LimitedSize, /// Total storage space of records in the tabledb inclusive of subkey data and structures @@ -77,11 +79,20 @@ where /// The result of the do_get_value_operation #[derive(Default, Debug)] -pub struct SubkeyResult { +pub struct GetResult { /// The subkey value if we got one - pub value: Option>, + pub opt_value: Option>, /// The descriptor if we got a fresh one or empty if no descriptor was needed - pub descriptor: Option>, + pub opt_descriptor: Option>, +} + +/// The result of the do_inspect_value_operation +#[derive(Default, Debug)] +pub struct InspectResult { + /// The sequence map + pub seqs: Vec, + /// The descriptor if we got a fresh one or empty if no descriptor was needed + pub opt_descriptor: Option>, } impl RecordStore @@ -105,6 +116,7 @@ where subkey_table: None, record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), + seqs_cache: LruCache::new(subkey_cache_size), subkey_cache_total_size: LimitedSize::new( "subkey_cache_total_size", 0, @@ -488,7 +500,7 @@ where key: TypedKey, subkey: ValueSubkey, want_descriptor: bool, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult> { // Get record from index let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| { ( @@ -513,9 +525,9 @@ where // See if we have this subkey stored if !has_subkey { // If not, return no value but maybe with descriptor - return Ok(Some(SubkeyResult { - value: None, - descriptor: opt_descriptor, + return Ok(Some(GetResult { + opt_value: None, + opt_descriptor, })); } @@ -529,9 +541,9 @@ where if let Some(record_data) = self.subkey_cache.get_mut(&stk) { let out = record_data.signed_value_data().clone(); - return Ok(Some(SubkeyResult { - value: Some(out), - descriptor: opt_descriptor, + return Ok(Some(GetResult { + opt_value: Some(out), + opt_descriptor, })); } // If not in cache, try to pull from table store if it is in our stored subkey set @@ -548,9 +560,9 @@ where // Add to cache, do nothing with lru out self.add_to_subkey_cache(stk, record_data); - Ok(Some(SubkeyResult { - value: Some(out), - descriptor: opt_descriptor, + Ok(Some(GetResult { + opt_value: Some(out), + opt_descriptor, })) } @@ -559,7 +571,7 @@ where key: TypedKey, subkey: ValueSubkey, want_descriptor: bool, - ) -> VeilidAPIResult> { + ) -> VeilidAPIResult> { // record from index let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| { ( @@ -584,9 +596,9 @@ where // See if we have this subkey stored if !has_subkey { // If not, return no value but maybe with descriptor - return Ok(Some(SubkeyResult { - value: None, - descriptor: opt_descriptor, + return Ok(Some(GetResult { + opt_value: None, + opt_descriptor, })); } @@ -600,9 +612,9 @@ where if let Some(record_data) = self.subkey_cache.peek(&stk) { let out = record_data.signed_value_data().clone(); - return Ok(Some(SubkeyResult { - value: Some(out), - descriptor: opt_descriptor, + return Ok(Some(GetResult { + opt_value: Some(out), + opt_descriptor, })); } // If not in cache, try to pull from table store if it is in our stored subkey set @@ -616,9 +628,9 @@ where let out = record_data.signed_value_data().clone(); - Ok(Some(SubkeyResult { - value: Some(out), - descriptor: opt_descriptor, + Ok(Some(GetResult { + opt_value: Some(out), + opt_descriptor, })) } @@ -760,6 +772,84 @@ where Ok(()) } + pub async fn inspect_record( + &mut self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult> { + // Get record from index + let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| { + // Get number of subkeys from schema and ensure we are getting the + // right number of sequence numbers betwen that and what we asked for + let in_schema_subkeys = subkeys.intersect(&ValueSubkeyRangeSet::single_range( + 0, + record.schema().max_subkey(), + )); + ( + in_schema_subkeys, + if want_descriptor { + Some(record.descriptor().clone()) + } else { + None + }, + ) + }) else { + // Record not available + return Ok(None); + }; + + // Check if we can return some subkeys + if subkeys.is_empty() { + apibail_invalid_argument!("subkeys set does not overlap schema", "subkeys", subkeys); + } + + // See if we have this inspection cached + let sck = SeqsCacheKey { + key, + subkeys: subkeys.clone(), + }; + if let Some(seqs) = self.seqs_cache.get(&sck) { + return Ok(Some(InspectResult { + seqs: seqs.clone(), + opt_descriptor, + })); + } + + // Get subkey table + let Some(subkey_table) = self.subkey_table.clone() else { + apibail_internal!("record store not initialized"); + }; + + // Build sequence number list to return + let mut seqs = Vec::with_capacity(subkeys.len()); + for subkey in subkeys.iter() { + let stk = SubkeyTableKey { key, subkey }; + let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) { + record_data.signed_value_data().value_data().seq() + } else { + // If not in cache, try to pull from table store if it is in our stored subkey set + // XXX: This would be better if it didn't have to pull the whole record data to get the seq. + if let Some(record_data) = subkey_table + .load_json::(0, &stk.bytes()) + .await + .map_err(VeilidAPIError::internal)? + { + record_data.signed_value_data().value_data().seq() + } else { + // Subkey not written to + ValueSubkey::MAX + } + }; + seqs.push(seq) + } + + Ok(Some(InspectResult { + seqs, + opt_descriptor, + })) + } + pub async fn _change_existing_watch( &mut self, key: TypedKey, @@ -1063,7 +1153,7 @@ where log_stor!(error "first subkey should exist for value change notification"); continue; }; - let subkey_result = match self.get_subkey(evci.key, first_subkey, false).await { + let get_result = match self.get_subkey(evci.key, first_subkey, false).await { Ok(Some(skr)) => skr, Ok(None) => { log_stor!(error "subkey should have data for value change notification"); @@ -1074,7 +1164,7 @@ where continue; } }; - let Some(value) = subkey_result.value else { + let Some(value) = get_result.opt_value else { log_stor!(error "first subkey should have had value for value change notification"); continue; }; diff --git a/veilid-core/src/storage_manager/record_store/record.rs b/veilid-core/src/storage_manager/record_store/record.rs index 5e33d508..4a93cdd1 100644 --- a/veilid-core/src/storage_manager/record_store/record.rs +++ b/veilid-core/src/storage_manager/record_store/record.rs @@ -23,7 +23,7 @@ where detail: D, ) -> VeilidAPIResult { let schema = descriptor.schema()?; - let subkey_count = schema.subkey_count(); + let subkey_count = schema.max_subkey() as usize + 1; Ok(Self { descriptor, subkey_count, diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index de8c7579..b18b2d78 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -231,21 +231,21 @@ impl StorageManager { let mut inner = self.lock().await?; // See if this is a remote or local value - let (is_local, last_subkey_result) = { + let (is_local, last_get_result) = { // See if the subkey we are modifying has a last known local value - let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + let last_get_result = inner.handle_get_local_value(key, subkey, true).await?; // If this is local, it must have a descriptor already - if last_subkey_result.descriptor.is_some() { - (true, last_subkey_result) + if last_get_result.opt_descriptor.is_some() { + (true, last_get_result) } else { // See if the subkey we are modifying has a last known remote value - let last_subkey_result = inner.handle_get_remote_value(key, subkey, true).await?; - (false, last_subkey_result) + let last_get_result = inner.handle_get_remote_value(key, subkey, true).await?; + (false, last_get_result) } }; // Make sure this value would actually be newer - if let Some(last_value) = &last_subkey_result.value { + if let Some(last_value) = &last_get_result.opt_value { if value.value_data().seq() <= last_value.value_data().seq() { // inbound value is older or equal sequence number than the one we have, just return the one we have return Ok(NetworkResult::value(Some(last_value.clone()))); @@ -253,7 +253,7 @@ impl StorageManager { } // Get the descriptor and schema for the key - let actual_descriptor = match last_subkey_result.descriptor { + let actual_descriptor = match last_get_result.opt_descriptor { Some(last_descriptor) => { if let Some(descriptor) = descriptor { // Descriptor must match last one if it is provided diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 3205329c..cc971252 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -293,13 +293,12 @@ impl StorageManagerInner { // Move copy subkey data from remote to local store for subkey in remote_record.stored_subkeys().iter() { - let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await? - else { + let Some(get_result) = remote_record_store.get_subkey(key, subkey, false).await? else { // Subkey was missing warn!("Subkey was missing: {} #{}", key, subkey); continue; }; - let Some(subkey_data) = subkey_result.value else { + let Some(subkey_data) = get_result.opt_value else { // Subkey was missing warn!("Subkey data was missing: {} #{}", key, subkey); continue; @@ -388,7 +387,7 @@ impl StorageManagerInner { key: TypedKey, writer: Option, subkey: ValueSubkey, - subkey_result: SubkeyResult, + get_result: GetResult, safety_selection: SafetySelection, ) -> VeilidAPIResult { // Ensure the record is closed @@ -397,7 +396,7 @@ impl StorageManagerInner { } // Must have descriptor - let Some(signed_value_descriptor) = subkey_result.descriptor else { + let Some(signed_value_descriptor) = get_result.opt_descriptor else { // No descriptor for new record, can't store this apibail_generic!("no descriptor"); }; @@ -434,7 +433,7 @@ impl StorageManagerInner { local_record_store.new_record(key, record).await?; // If we got a subkey with the getvalue, it has already been validated against the schema, so store it - if let Some(signed_value_data) = subkey_result.value { + if let Some(signed_value_data) = get_result.opt_value { // Write subkey to local store local_record_store .set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate) @@ -513,21 +512,21 @@ impl StorageManagerInner { key: TypedKey, subkey: ValueSubkey, want_descriptor: bool, - ) -> VeilidAPIResult { + ) -> VeilidAPIResult { // See if it's in the local record store let Some(local_record_store) = self.local_record_store.as_mut() else { apibail_not_initialized!(); }; - if let Some(subkey_result) = local_record_store + if let Some(get_result) = local_record_store .get_subkey(key, subkey, want_descriptor) .await? { - return Ok(subkey_result); + return Ok(get_result); } - Ok(SubkeyResult { - value: None, - descriptor: None, + Ok(GetResult { + opt_value: None, + opt_descriptor: None, }) } @@ -551,26 +550,49 @@ impl StorageManagerInner { Ok(()) } + pub(super) async fn handle_inspect_local_value( + &mut self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(inspect_result) = local_record_store + .inspect_record(key, subkeys, want_descriptor) + .await? + { + return Ok(inspect_result); + } + + Ok(InspectResult { + seqs: vec![], + opt_descriptor: None, + }) + } + pub(super) async fn handle_get_remote_value( &mut self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool, - ) -> VeilidAPIResult { + ) -> VeilidAPIResult { // See if it's in the remote record store let Some(remote_record_store) = self.remote_record_store.as_mut() else { apibail_not_initialized!(); }; - if let Some(subkey_result) = remote_record_store + if let Some(get_result) = remote_record_store .get_subkey(key, subkey, want_descriptor) .await? { - return Ok(subkey_result); + return Ok(get_result); } - Ok(SubkeyResult { - value: None, - descriptor: None, + Ok(GetResult { + opt_value: None, + opt_descriptor: None, }) } @@ -608,6 +630,29 @@ impl StorageManagerInner { Ok(()) } + pub(super) async fn handle_inspect_remote_value( + &mut self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the local record store + let Some(remote_record_store) = self.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(inspect_result) = remote_record_store + .inspect_record(key, subkeys, want_descriptor) + .await? + { + return Ok(inspect_result); + } + + Ok(InspectResult { + seqs: vec![], + opt_descriptor: None, + }) + } + /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 93a4d55e..cd31dfdc 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -26,19 +26,19 @@ impl StorageManager { break; }; for subkey in osw.subkeys.iter() { - let subkey_result = { + let get_result = { let mut inner = self.lock().await?; inner.handle_get_local_value(key, subkey, true).await }; - let Ok(subkey_result) = subkey_result else { + let Ok(get_result) = get_result else { log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); - continue; + continue; }; - let Some(value) = subkey_result.value else { + let Some(value) = get_result.opt_value else { log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey); continue; }; - let Some(descriptor) = subkey_result.descriptor else { + let Some(descriptor) = get_result.opt_descriptor else { log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey); continue; }; diff --git a/veilid-core/src/storage_manager/types/signed_value_descriptor.rs b/veilid-core/src/storage_manager/types/signed_value_descriptor.rs index a1c07615..2a7b8428 100644 --- a/veilid-core/src/storage_manager/types/signed_value_descriptor.rs +++ b/veilid-core/src/storage_manager/types/signed_value_descriptor.rs @@ -20,7 +20,10 @@ impl SignedValueDescriptor { pub fn validate(&self, vcrypto: CryptoSystemVersion) -> VeilidAPIResult<()> { // validate signature - vcrypto.verify(&self.owner, &self.schema_data, &self.signature) + vcrypto.verify(&self.owner, &self.schema_data, &self.signature)?; + // validate schema + DHTSchema::try_from(self.schema_data.as_slice())?; + Ok(()) } pub fn owner(&self) -> &PublicKey { diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 4cdd0c96..77fd4b6b 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1427,7 +1427,7 @@ impl VeilidAPI { "dht_schema", get_dht_schema, ) - .unwrap_or_else(|_| Ok(DHTSchema::dflt(1)))?; + .unwrap_or_else(|_| Ok(DHTSchema::default()))?; let csv = get_debug_argument_at( &args, diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index eeba9db8..d5601021 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -241,6 +241,7 @@ impl RoutingContext { ) -> VeilidAPIResult { event!(target: "veilid_api", Level::DEBUG, "RoutingContext::create_dht_record(self: {:?}, schema: {:?}, kind: {:?})", self, schema, kind); + schema.validate()?; let kind = kind.unwrap_or(best_crypto_kind()); Crypto::validate_crypto_kind(kind)?; diff --git a/veilid-core/src/veilid_api/tests/test_types_dht.rs b/veilid-core/src/veilid_api/tests/test_types_dht.rs index 73f32eda..a8b5aa13 100644 --- a/veilid-core/src/veilid_api/tests/test_types_dht.rs +++ b/veilid-core/src/veilid_api/tests/test_types_dht.rs @@ -9,7 +9,7 @@ pub async fn test_dhtrecorddescriptor() { fix_typedkey(), fix_cryptokey(), Some(fix_cryptokey()), - DHTSchema::DFLT(DHTSchemaDFLT { o_cnt: 4321 }), + DHTSchema::dflt(4321).unwrap(), ); let copy = deserialize_json(&serialize_json(&orig)).unwrap(); diff --git a/veilid-core/src/veilid_api/tests/test_types_dht_schema.rs b/veilid-core/src/veilid_api/tests/test_types_dht_schema.rs index 582d4040..885ac144 100644 --- a/veilid-core/src/veilid_api/tests/test_types_dht_schema.rs +++ b/veilid-core/src/veilid_api/tests/test_types_dht_schema.rs @@ -4,7 +4,7 @@ use crate::*; // dlft pub async fn test_dhtschemadflt() { - let orig = DHTSchemaDFLT { o_cnt: 9 }; + let orig = DHTSchemaDFLT::new(9); let copy = deserialize_json(&serialize_json(&orig)).unwrap(); assert_eq!(orig, copy); @@ -13,19 +13,22 @@ pub async fn test_dhtschemadflt() { // mod pub async fn test_dhtschema() { - let orig = DHTSchema::SMPL(DHTSchemaSMPL { - o_cnt: 91, - members: vec![ - DHTSchemaSMPLMember { - m_key: fix_cryptokey(), - m_cnt: 5, - }, - DHTSchemaSMPLMember { - m_key: fix_cryptokey(), - m_cnt: 6, - }, - ], - }); + let orig = DHTSchema::SMPL( + DHTSchemaSMPL::new( + 91, + vec![ + DHTSchemaSMPLMember { + m_key: fix_cryptokey(), + m_cnt: 5, + }, + DHTSchemaSMPLMember { + m_key: fix_cryptokey(), + m_cnt: 6, + }, + ], + ) + .unwrap(), + ); let copy = deserialize_json(&serialize_json(&orig)).unwrap(); assert_eq!(orig, copy); @@ -44,9 +47,9 @@ pub async fn test_dhtschemasmplmember() { } pub async fn test_dhtschemasmpl() { - let orig = DHTSchemaSMPL { - o_cnt: 91, - members: vec![ + let orig = DHTSchemaSMPL::new( + 91, + vec![ DHTSchemaSMPLMember { m_key: fix_cryptokey(), m_cnt: 8, @@ -56,7 +59,8 @@ pub async fn test_dhtschemasmpl() { m_cnt: 9, }, ], - }; + ) + .unwrap(); let copy = deserialize_json(&serialize_json(&orig)).unwrap(); assert_eq!(orig, copy); diff --git a/veilid-core/src/veilid_api/types/dht/schema/dflt.rs b/veilid-core/src/veilid_api/types/dht/schema/dflt.rs index 25c5bf9b..bb595a23 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/dflt.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/dflt.rs @@ -5,13 +5,33 @@ use super::*; #[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi))] pub struct DHTSchemaDFLT { /// Owner subkey count - pub o_cnt: u16, + o_cnt: u16, } impl DHTSchemaDFLT { pub const FCC: [u8; 4] = *b"DFLT"; pub const FIXED_SIZE: usize = 6; + /// Make a schema + pub fn new(o_cnt: u16) -> VeilidAPIResult { + let out = Self { o_cnt }; + out.validate()?; + Ok(out) + } + + /// Validate the data representation + pub fn validate(&self) -> VeilidAPIResult<()> { + if self.o_cnt == 0 { + apibail_invalid_argument!("must have at least one subkey", "o_cnt", self.o_cnt); + } + Ok(()) + } + + /// Get the owner subkey count + pub fn o_cnt(&self) -> u16 { + self.o_cnt + } + /// Build the data representation of the schema pub fn compile(&self) -> Vec { let mut out = Vec::::with_capacity(Self::FIXED_SIZE); @@ -22,9 +42,9 @@ impl DHTSchemaDFLT { out } - /// Get the number of subkeys this schema allocates - pub fn subkey_count(&self) -> usize { - self.o_cnt as usize + /// Get the maximum subkey this schema allocates + pub fn max_subkey(&self) -> ValueSubkey { + self.o_cnt as ValueSubkey - 1 } /// Get the data size of this schema beyond the size of the structure itself pub fn data_size(&self) -> usize { @@ -72,6 +92,6 @@ impl TryFrom<&[u8]> for DHTSchemaDFLT { let o_cnt = u16::from_le_bytes(b[4..6].try_into().map_err(VeilidAPIError::internal)?); - Ok(Self { o_cnt }) + Self::new(o_cnt) } } diff --git a/veilid-core/src/veilid_api/types/dht/schema/mod.rs b/veilid-core/src/veilid_api/types/dht/schema/mod.rs index 42213240..78138b89 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/mod.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/mod.rs @@ -16,11 +16,19 @@ pub enum DHTSchema { } impl DHTSchema { - pub fn dflt(o_cnt: u16) -> DHTSchema { - DHTSchema::DFLT(DHTSchemaDFLT { o_cnt }) + pub fn dflt(o_cnt: u16) -> VeilidAPIResult { + Ok(DHTSchema::DFLT(DHTSchemaDFLT::new(o_cnt)?)) } - pub fn smpl(o_cnt: u16, members: Vec) -> DHTSchema { - DHTSchema::SMPL(DHTSchemaSMPL { o_cnt, members }) + pub fn smpl(o_cnt: u16, members: Vec) -> VeilidAPIResult { + Ok(DHTSchema::SMPL(DHTSchemaSMPL::new(o_cnt, members)?)) + } + + /// Validate the data representation + pub fn validate(&self) -> VeilidAPIResult<()> { + match self { + DHTSchema::DFLT(d) => d.validate(), + DHTSchema::SMPL(s) => s.validate(), + } } /// Build the data representation of the schema @@ -31,11 +39,11 @@ impl DHTSchema { } } - /// Get the number of subkeys this schema allocates - pub fn subkey_count(&self) -> usize { + /// Get maximum subkey number for this schema + pub fn max_subkey(&self) -> ValueSubkey { match self { - DHTSchema::DFLT(d) => d.subkey_count(), - DHTSchema::SMPL(s) => s.subkey_count(), + DHTSchema::DFLT(d) => d.max_subkey(), + DHTSchema::SMPL(s) => s.max_subkey(), } } @@ -71,7 +79,7 @@ impl DHTSchema { impl Default for DHTSchema { fn default() -> Self { - Self::dflt(1) + Self::dflt(1).unwrap() } } diff --git a/veilid-core/src/veilid_api/types/dht/schema/smpl.rs b/veilid-core/src/veilid_api/types/dht/schema/smpl.rs index 93093eec..ab3fac23 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/smpl.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/smpl.rs @@ -16,15 +16,48 @@ pub struct DHTSchemaSMPLMember { #[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi))] pub struct DHTSchemaSMPL { /// Owner subkey count - pub o_cnt: u16, + o_cnt: u16, /// Members - pub members: Vec, + members: Vec, } impl DHTSchemaSMPL { pub const FCC: [u8; 4] = *b"SMPL"; pub const FIXED_SIZE: usize = 6; + /// Make a schema + pub fn new(o_cnt: u16, members: Vec) -> VeilidAPIResult { + let out = Self { o_cnt, members }; + out.validate()?; + Ok(out) + } + + /// Validate the data representation + pub fn validate(&self) -> VeilidAPIResult<()> { + let keycount = self + .members + .iter() + .fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)); + + if keycount == 0 { + apibail_invalid_argument!("must have at least one subkey", "keycount", keycount); + } + if keycount > 65535 { + apibail_invalid_argument!("too many subkeys", "keycount", keycount); + } + Ok(()) + } + + /// Get the owner subkey count + pub fn o_cnt(&self) -> u16 { + self.o_cnt + } + + /// Get the members of the schema + pub fn members(&self) -> &[DHTSchemaSMPLMember] { + &self.members + } + /// Build the data representation of the schema pub fn compile(&self) -> Vec { let mut out = Vec::::with_capacity( @@ -44,11 +77,13 @@ impl DHTSchemaSMPL { out } - /// Get the number of subkeys this schema allocates - pub fn subkey_count(&self) -> usize { - self.members + /// Get the maximum subkey this schema allocates + pub fn max_subkey(&self) -> ValueSubkey { + let subkey_count = self + .members .iter() - .fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)) + .fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)); + (subkey_count - 1) as ValueSubkey } /// Get the data size of this schema beyond the size of the structure itself @@ -134,6 +169,6 @@ impl TryFrom<&[u8]> for DHTSchemaSMPL { members.push(DHTSchemaSMPLMember { m_key, m_cnt }); } - Ok(Self { o_cnt, members }) + Self::new(o_cnt, members) } } diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs index c769826a..8282f3b5 100644 --- a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -2,7 +2,9 @@ use super::*; use core::ops::{Deref, DerefMut}; use range_set_blaze::*; -#[derive(Clone, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema)] +#[derive( + Clone, Default, Hash, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema, +)] #[serde(transparent)] pub struct ValueSubkeyRangeSet { #[serde(with = "serialize_range_set_blaze")] @@ -29,6 +31,11 @@ impl ValueSubkeyRangeSet { data.insert(value); Self { data } } + pub fn single_range(low: ValueSubkey, high: ValueSubkey) -> Self { + let mut data = RangeSetBlaze::new(); + data.ranges_insert(low..=high); + Self { data } + } pub fn intersect(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet { Self::new_with_data(&self.data & &other.data) diff --git a/veilid-flutter/lib/routing_context.dart b/veilid-flutter/lib/routing_context.dart index 92a9818f..2c6cfa7f 100644 --- a/veilid-flutter/lib/routing_context.dart +++ b/veilid-flutter/lib/routing_context.dart @@ -38,7 +38,7 @@ extension ValidateSMPL on DHTSchemaSMPL { return true; } - int subkeyCount() => members.fold(0, (acc, v) => acc + v.mCnt) + oCnt; + int subkeyCount() => members.fold(oCnt, (acc, v) => acc + v.mCnt); } extension Validate on DHTSchema {