diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 344a5128..2fa813b6 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -359,7 +359,7 @@ struct OperationSetValueQ @0xbac06191ff8bdbc5 { } struct OperationSetValueA @0x9378d0732dc95be2 { - set @0 :Bool; # true if the value was accepted + set @0 :Bool; # true if the set was close enough to be set value @1 :SignedValueData; # optional: the current value at the key if the set seq number was lower or equal to what was there before peers @2 :List(PeerInfo); # returned 'closer peer' information on either success or failure } diff --git a/veilid-core/src/intf/native/protected_store.rs b/veilid-core/src/intf/native/protected_store.rs index 47176ecd..5d1b9a28 100644 --- a/veilid-core/src/intf/native/protected_store.rs +++ b/veilid-core/src/intf/native/protected_store.rs @@ -152,7 +152,7 @@ impl ProtectedStore { pub async fn save_user_secret_rkyv(&self, key: K, value: &T) -> EyreResult where K: AsRef + fmt::Debug, - T: RkyvSerialize>, + T: RkyvSerialize, { let v = to_rkyv(value)?; self.save_user_secret(key, &v).await @@ -175,8 +175,7 @@ impl ProtectedStore { T: RkyvArchive, ::Archived: for<'t> CheckBytes>, - ::Archived: - RkyvDeserialize, + ::Archived: RkyvDeserialize, { let out = self.load_user_secret(key).await?; let b = match out { diff --git a/veilid-core/src/intf/table_db.rs b/veilid-core/src/intf/table_db.rs index 485760f1..f07e7aaa 100644 --- a/veilid-core/src/intf/table_db.rs +++ b/veilid-core/src/intf/table_db.rs @@ -92,7 +92,7 @@ impl TableDB { /// Store a key in rkyv format with a value in a column in the TableDB. Performs a single transaction immediately. pub async fn store_rkyv(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where - T: RkyvSerialize>, + T: RkyvSerialize, { let v = to_rkyv(value)?; @@ -127,8 +127,7 @@ impl TableDB { T: RkyvArchive, ::Archived: for<'t> CheckBytes>, - ::Archived: - RkyvDeserialize, + ::Archived: RkyvDeserialize, { let db = self.inner.lock().database.clone(); let out = db.get(col, key).wrap_err("failed to get key")?; @@ -240,7 +239,7 @@ impl TableDBTransaction { /// Store a key in rkyv format with a value in a column in the TableDB pub fn store_rkyv(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where - T: RkyvSerialize>, + T: RkyvSerialize, { let v = to_rkyv(value)?; let mut inner = self.inner.lock(); diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 1a446dbb..b3381a5b 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -45,14 +45,6 @@ use rkyv::{ bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize, }; -type RkyvSerializer = rkyv::ser::serializers::CompositeSerializer< - rkyv::ser::serializers::AlignedSerializer, - rkyv::ser::serializers::FallbackScratch< - rkyv::ser::serializers::HeapScratch<1024>, - rkyv::ser::serializers::AllocScratch, - >, - rkyv::ser::serializers::SharedSerializeMap, ->; type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>; use serde::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs index ae84a767..ef996b95 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs @@ -19,9 +19,9 @@ impl RPCOperationAppCallQ { Ok(()) } - pub fn message(&self) -> &[u8] { - &self.message - } + // pub fn message(&self) -> &[u8] { + // &self.message + // } pub fn destructure(self) -> Vec { self.message @@ -62,9 +62,9 @@ impl RPCOperationAppCallA { Ok(()) } - pub fn message(&self) -> &[u8] { - &self.message - } + // pub fn message(&self) -> &[u8] { + // &self.message + // } pub fn destructure(self) -> Vec { self.message @@ -86,5 +86,4 @@ impl RPCOperationAppCallA { builder.set_message(&self.message); Ok(()) } - } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs index a7456d63..b25ef5f6 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs @@ -19,9 +19,9 @@ impl RPCOperationAppMessage { Ok(()) } - pub fn message(&self) -> &[u8] { - &self.message - } + // pub fn message(&self) -> &[u8] { + // &self.message + // } pub fn destructure(self) -> Vec { self.message } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs index 046cb8dc..607dacbc 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs @@ -15,9 +15,9 @@ impl RPCOperationFindNodeQ { Ok(()) } - pub fn node_id(&self) -> &TypedKey { - &self.node_id - } + // pub fn node_id(&self) -> &TypedKey { + // &self.node_id + // } pub fn destructure(self) -> TypedKey { self.node_id @@ -57,9 +57,9 @@ impl RPCOperationFindNodeA { Ok(()) } - pub fn peers(&self) -> &[PeerInfo] { - &self.peers - } + // pub fn peers(&self) -> &[PeerInfo] { + // &self.peers + // } pub fn destructure(self) -> Vec { self.peers diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index e46bf5d3..4261e461 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -39,15 +39,15 @@ impl RPCOperationGetValueQ { Ok(()) } - pub fn key(&self) -> &TypedKey { - &self.key - } - pub fn subkey(&self) -> ValueSubkey { - self.subkey - } - pub fn want_descriptor(&self) -> bool { - self.want_descriptor - } + // pub fn key(&self) -> &TypedKey { + // &self.key + // } + // pub fn subkey(&self) -> ValueSubkey { + // self.subkey + // } + // pub fn want_descriptor(&self) -> bool { + // self.want_descriptor + // } pub fn destructure(self) -> (TypedKey, ValueSubkey, bool) { (self.key, self.subkey, self.want_descriptor) } @@ -155,15 +155,15 @@ impl RPCOperationGetValueA { Ok(()) } - pub fn value(&self) -> Option<&SignedValueData> { - self.value.as_ref() - } - pub fn peers(&self) -> &[PeerInfo] { - &self.peers - } - pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { - self.descriptor.as_ref() - } + // pub fn value(&self) -> Option<&SignedValueData> { + // self.value.as_ref() + // } + // pub fn peers(&self) -> &[PeerInfo] { + // &self.peers + // } + // pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { + // self.descriptor.as_ref() + // } pub fn destructure( self, ) -> ( diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs index 6edcae84..f049ab1a 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs @@ -20,9 +20,9 @@ impl RPCOperationReturnReceipt { Ok(()) } - pub fn receipt(&self) -> &[u8] { - &self.receipt - } + // pub fn receipt(&self) -> &[u8] { + // &self.receipt + // } pub fn destructure(self) -> Vec { self.receipt diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs index f94095df..c7fa4cf2 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs @@ -5,14 +5,15 @@ const MAX_SET_VALUE_A_PEERS_LEN: usize = 20; #[derive(Clone)] pub struct ValidateSetValueContext { - last_descriptor: Option, - subkey: ValueSubkey, - vcrypto: CryptoSystemVersion, + pub descriptor: SignedValueDescriptor, + pub subkey: ValueSubkey, + pub vcrypto: CryptoSystemVersion, } + impl fmt::Debug for ValidateSetValueContext { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ValidateSetValueContext") - .field("last_descriptor", &self.last_descriptor) + .field("descriptor", &self.descriptor) .field("subkey", &self.subkey) .field("vcrypto", &self.vcrypto.kind().to_string()) .finish() @@ -45,21 +46,21 @@ impl RPCOperationSetValueQ { Ok(()) } - pub fn key(&self) -> &TypedKey { - &self.key - } + // pub fn key(&self) -> &TypedKey { + // &self.key + // } - pub fn subkey(&self) -> ValueSubkey { - self.subkey - } + // pub fn subkey(&self) -> ValueSubkey { + // self.subkey + // } - pub fn value(&self) -> &SignedValueData { - &self.value - } + // pub fn value(&self) -> &SignedValueData { + // &self.value + // } - pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { - self.descriptor.as_ref() - } + // pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { + // self.descriptor.as_ref() + // } pub fn destructure( self, ) -> ( @@ -137,22 +138,16 @@ impl RPCOperationSetValueA { }; if let Some(value) = &self.value { - // Get descriptor to validate with - let Some(descriptor) = &set_value_context.last_descriptor else { - return Err(RPCError::protocol( - "no last descriptor, requires a descriptor", - )); - }; - // Ensure the descriptor itself validates - descriptor + set_value_context + .descriptor .validate(set_value_context.vcrypto.clone()) .map_err(RPCError::protocol)?; // And the signed value data value .validate( - descriptor.owner(), + set_value_context.descriptor.owner(), set_value_context.subkey, set_value_context.vcrypto.clone(), ) @@ -163,15 +158,15 @@ impl RPCOperationSetValueA { Ok(()) } - pub fn set(&self) -> bool { - self.set - } - pub fn value(&self) -> Option<&SignedValueData> { - self.value.as_ref() - } - pub fn peers(&self) -> &[PeerInfo] { - &self.peers - } + // pub fn set(&self) -> bool { + // self.set + // } + // pub fn value(&self) -> Option<&SignedValueData> { + // self.value.as_ref() + // } + // pub fn peers(&self) -> &[PeerInfo] { + // &self.peers + // } pub fn destructure(self) -> (bool, Option, Vec) { (self.set, self.value, self.peers) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs index 29311316..0b5ec38c 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs @@ -12,9 +12,9 @@ impl RPCOperationSignal { pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { self.signal_info.validate(validate_context.crypto.clone()) } - pub fn signal_info(&self) -> &SignalInfo { - &self.signal_info - } + // pub fn signal_info(&self) -> &SignalInfo { + // &self.signal_info + // } pub fn destructure(self) -> SignalInfo { self.signal_info } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs index 3c9853b4..99cb5985 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -13,9 +13,9 @@ impl RPCOperationStatusQ { Ok(()) } - pub fn node_status(&self) -> Option<&NodeStatus> { - self.node_status.as_ref() - } + // pub fn node_status(&self) -> Option<&NodeStatus> { + // self.node_status.as_ref() + // } pub fn destructure(self) -> Option { self.node_status } @@ -60,12 +60,12 @@ impl RPCOperationStatusA { Ok(()) } - pub fn node_status(&self) -> Option<&NodeStatus> { - self.node_status.as_ref() - } - pub fn sender_info(&self) -> Option<&SenderInfo> { - self.sender_info.as_ref() - } + // pub fn node_status(&self) -> Option<&NodeStatus> { + // self.node_status.as_ref() + // } + // pub fn sender_info(&self) -> Option<&SenderInfo> { + // self.sender_info.as_ref() + // } pub fn destructure(self) -> (Option, Option) { (self.node_status, self.sender_info) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs index 87fac84f..fa76e1bd 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs @@ -30,15 +30,15 @@ impl RPCOperationValidateDialInfo { pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn dial_info(&self) -> &DialInfo { - &self.dial_info - } - pub fn receipt(&self) -> &[u8] { - &self.receipt - } - pub fn redirect(&self) -> bool { - self.redirect - } + // pub fn dial_info(&self) -> &DialInfo { + // &self.dial_info + // } + // pub fn receipt(&self) -> &[u8] { + // &self.receipt + // } + // pub fn redirect(&self) -> bool { + // self.redirect + // } pub fn destructure(self) -> (DialInfo, Vec, bool) { (self.dial_info, self.receipt, self.redirect) } diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 07dcc1c2..fe2b416f 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -52,7 +52,9 @@ impl RPCProcessor { }; // Verify peers are in the correct peer scope - for peer_info in find_node_a.peers() { + let peers = find_node_a.destructure(); + + for peer_info in &peers { if !self.filter_node_info(RoutingDomain::PublicInternet, peer_info.signed_node_info()) { return Err(RPCError::invalid_format( "find_node response has invalid peer scope", @@ -60,7 +62,6 @@ impl RPCProcessor { } } - let peers = find_node_a.destructure(); Ok(NetworkResult::value(Answer::new(latency, peers))) } diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 26b45861..7e68b172 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -45,7 +45,7 @@ impl RPCProcessor { return Err(RPCError::internal("unsupported cryptosystem")); }; - // Send the app call question + // Send the getvalue question let question_context = QuestionContext::GetValue(ValidateGetValueContext { last_descriptor, subkey, @@ -119,10 +119,10 @@ impl RPCProcessor { // See if we have this record ourselves let storage_manager = self.storage_manager(); - let subkey_result = storage_manager - .handle_get_value(key, subkey, want_descriptor) + let subkey_result = network_result_try!(storage_manager + .inbound_get_value(key, subkey, want_descriptor) .await - .map_err(RPCError::internal)?; + .map_err(RPCError::internal)?); // Make GetValue answer let get_value_a = RPCOperationGetValueA::new( diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 2f195412..cf4a61be 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -1,12 +1,154 @@ use super::*; +#[derive(Clone, Debug)] +pub struct SetValueAnswer { + pub set: bool, + pub value: Option, + pub peers: Vec, +} + impl RPCProcessor { + /// Sends a set value request and wait for response + /// Can be sent via all methods including relays + /// Safety routes may be used, but never private routes. + /// Because this leaks information about the identity of the node itself, + /// replying to this request received over a private route will leak + /// the identity of the node and defeat the private route. + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn rpc_call_set_value( + self, + dest: Destination, + key: TypedKey, + subkey: ValueSubkey, + value: SignedValueData, + descriptor: SignedValueDescriptor, + send_descriptor: bool, + ) -> Result>, RPCError> { + // Ensure destination never has a private route + if matches!( + dest, + Destination::PrivateRoute { + private_route: _, + safety_selection: _ + } + ) { + return Err(RPCError::internal( + "Never send set value requests over private routes", + )); + } + + let set_value_q = RPCOperationSetValueQ::new( + key, + subkey, + value, + if send_descriptor { + Some(descriptor.clone()) + } else { + None + }, + ); + let question = RPCQuestion::new( + network_result_try!(self.get_destination_respond_to(&dest)?), + RPCQuestionDetail::SetValueQ(set_value_q), + ); + let Some(vcrypto) = self.crypto.get(key.kind) else { + return Err(RPCError::internal("unsupported cryptosystem")); + }; + + // Send the setvalue question + let question_context = QuestionContext::SetValue(ValidateSetValueContext { + descriptor, + subkey, + vcrypto, + }); + + let waitable_reply = network_result_try!( + self.question(dest, question, Some(question_context)) + .await? + ); + + // Wait for reply + let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), + TimeoutOr::Value(v) => v, + }; + + // Get the right answer type + let (_, _, _, kind) = msg.operation.destructure(); + let set_value_a = match kind { + RPCOperationKind::Answer(a) => match a.destructure() { + RPCAnswerDetail::SetValueA(a) => a, + _ => return Err(RPCError::invalid_format("not a setvalue answer")), + }, + _ => return Err(RPCError::invalid_format("not an answer")), + }; + + let (set, value, peers) = set_value_a.destructure(); + + Ok(NetworkResult::value(Answer::new( + latency, + SetValueAnswer { set, value, peers }, + ))) + } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] pub(crate) async fn process_set_value_q( &self, msg: RPCMessage, ) -> Result, RPCError> { - // tracing::Span::current().record("res", &tracing::field::display(res)); - Err(RPCError::unimplemented("process_set_value_q")) + // Ensure this never came over a private route, safety route is okay though + match &msg.header.detail { + RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} + RPCMessageHeaderDetail::PrivateRouted(_) => { + return Ok(NetworkResult::invalid_message( + "not processing set value request over private route", + )) + } + } + + // Get the question + let kind = msg.operation.kind().clone(); + let set_value_q = match kind { + RPCOperationKind::Question(q) => match q.destructure() { + (_, RPCQuestionDetail::SetValueQ(q)) => q, + _ => panic!("not a setvalue question"), + }, + _ => panic!("not a question"), + }; + + // Destructure + let (key, subkey, value, descriptor) = set_value_q.destructure(); + + // Get the nodes that we know about that are closer to the the key than our own node + let routing_table = self.routing_table(); + let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key)); + + // If there are less than 'set_value_count' peers that are closer, then store here too + let set_value_count = { + let c = self.config.get(); + c.network.dht.set_value_fanout as usize + }; + let (set, new_value) = if closer_to_key_peers.len() >= set_value_count { + // Not close enough + (false, None) + } else { + // Close enough, lets set it + + // Save the subkey, creating a new record if necessary + let storage_manager = self.storage_manager(); + let new_value = network_result_try!(storage_manager + .inbound_set_value(key, subkey, value, descriptor) + .await + .map_err(RPCError::internal)?); + + (true, new_value) + }; + + // Make SetValue answer + let set_value_a = RPCOperationSetValueA::new(set, new_value, closer_to_key_peers)?; + + // Send SetValue answer + self.answer(msg, RPCAnswer::new(RPCAnswerDetail::SetValueA(set_value_a))) + .await } } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index cedccf5c..f45ea125 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -96,20 +96,21 @@ impl RPCProcessor { }, _ => return Err(RPCError::invalid_format("not an answer")), }; + let (a_node_status, sender_info) = status_a.destructure(); // Ensure the returned node status is the kind for the routing domain we asked for if let Some(target_nr) = opt_target_nr { - if let Some(node_status) = status_a.node_status() { + if let Some(a_node_status) = a_node_status { match routing_domain { RoutingDomain::PublicInternet => { - if !matches!(node_status, NodeStatus::PublicInternet(_)) { + if !matches!(a_node_status, NodeStatus::PublicInternet(_)) { return Ok(NetworkResult::invalid_message( "node status doesn't match PublicInternet routing domain", )); } } RoutingDomain::LocalNetwork => { - if !matches!(node_status, NodeStatus::LocalNetwork(_)) { + if !matches!(a_node_status, NodeStatus::LocalNetwork(_)) { return Ok(NetworkResult::invalid_message( "node status doesn't match LocalNetwork routing domain", )); @@ -118,7 +119,7 @@ impl RPCProcessor { } // Update latest node status in routing table - target_nr.update_node_status(node_status.clone()); + target_nr.update_node_status(a_node_status.clone()); } } @@ -132,7 +133,7 @@ impl RPCProcessor { safety_selection, } => { if matches!(safety_selection, SafetySelection::Unsafe(_)) { - if let Some(sender_info) = status_a.sender_info() { + if let Some(sender_info) = sender_info { match send_data_kind { SendDataKind::Direct(connection_descriptor) => { // Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as @@ -186,13 +187,15 @@ impl RPCProcessor { msg: RPCMessage, ) -> Result, RPCError> { // Get the question - let status_q = match msg.operation.kind() { - RPCOperationKind::Question(q) => match q.detail() { - RPCQuestionDetail::StatusQ(q) => q, + let kind = msg.operation.kind().clone(); + let status_q = match kind { + RPCOperationKind::Question(q) => match q.destructure() { + (_, RPCQuestionDetail::StatusQ(q)) => q, _ => panic!("not a status question"), }, _ => panic!("not a question"), }; + let q_node_status = status_q.destructure(); let (node_status, sender_info) = match &msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => { @@ -200,17 +203,17 @@ impl RPCProcessor { let routing_domain = detail.routing_domain; // Ensure the node status from the question is the kind for the routing domain we received the request in - if let Some(node_status) = status_q.node_status() { + if let Some(q_node_status) = q_node_status { match routing_domain { RoutingDomain::PublicInternet => { - if !matches!(node_status, NodeStatus::PublicInternet(_)) { + if !matches!(q_node_status, NodeStatus::PublicInternet(_)) { return Ok(NetworkResult::invalid_message( "node status doesn't match PublicInternet routing domain", )); } } RoutingDomain::LocalNetwork => { - if !matches!(node_status, NodeStatus::LocalNetwork(_)) { + if !matches!(q_node_status, NodeStatus::LocalNetwork(_)) { return Ok(NetworkResult::invalid_message( "node status doesn't match LocalNetwork routing domain", )); @@ -221,7 +224,7 @@ impl RPCProcessor { // update node status for the requesting node to our routing table if let Some(sender_nr) = msg.opt_sender_nr.clone() { // Update latest node status in routing table for the statusq sender - sender_nr.update_node_status(node_status.clone()); + sender_nr.update_node_status(q_node_status.clone()); } } diff --git a/veilid-core/src/storage_manager/do_get_value.rs b/veilid-core/src/storage_manager/get_value.rs similarity index 91% rename from veilid-core/src/storage_manager/do_get_value.rs rename to veilid-core/src/storage_manager/get_value.rs index 1e1f0138..7130e674 100644 --- a/veilid-core/src/storage_manager/do_get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -15,7 +15,7 @@ struct DoGetValueContext { impl StorageManager { /// Perform a 'get value' query on the network - pub async fn do_get_value( + pub async fn outbound_get_value( &self, rpc_processor: RPCProcessor, key: TypedKey, @@ -116,8 +116,9 @@ impl StorageManager { // Increase the consensus count for the existing value ctx.value_count += 1; } else if new_seq > prior_seq { - // If the sequence number is greater, go with it + // If the sequence number is greater, start over with the new value ctx.value = Some(value); + // One node has show us this value so far ctx.value_count = 1; } else { // If the sequence number is older, ignore it @@ -174,8 +175,17 @@ impl StorageManager { } /// Handle a recieved 'Get Value' query - pub async fn handle_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result { + pub async fn inbound_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result, VeilidAPIError> { let mut inner = self.lock().await?; - inner.handle_get_remote_value(key, subkey, want_descriptor) + let res = match inner.handle_get_remote_value(key, subkey, want_descriptor) { + Ok(res) => res, + Err(VeilidAPIError::Internal { message }) => { + apibail_internal!(message); + }, + Err(e) => { + return Ok(NetworkResult::invalid_message(e)); + }, + }; + Ok(NetworkResult::value(res)) } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 528e339b..5e8b4fb5 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,7 +1,8 @@ -mod do_get_value; +mod get_value; mod keys; mod record_store; mod record_store_limits; +mod set_value; mod storage_manager_inner; mod tasks; mod types; @@ -88,8 +89,8 @@ impl StorageManager { #[instrument(level = "debug", skip_all, err)] pub async fn init(&self) -> EyreResult<()> { debug!("startup storage manager"); - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock().await; inner.init(self.clone()).await?; Ok(()) @@ -175,7 +176,7 @@ impl StorageManager { // Use the safety selection we opened the record with let subkey: ValueSubkey = 0; let subkey_result = self - .do_get_value( + .outbound_get_value( rpc_processor, key, subkey, @@ -224,6 +225,8 @@ impl StorageManager { /// Get the value of a subkey from an opened local record /// may refresh the record, and will if it is forced to or the subkey is not available locally yet + /// Returns Ok(None) if no value was found + /// Returns Ok(Some(value)) is a value was found online or locally pub async fn get_value( &self, key: TypedKey, @@ -263,7 +266,7 @@ impl StorageManager { .as_ref() .map(|v| v.value_data().seq()); let subkey_result = self - .do_get_value( + .outbound_get_value( rpc_processor, key, subkey, @@ -290,6 +293,8 @@ impl StorageManager { /// Set the value of a subkey on an opened local record /// Puts changes to the network immediately and may refresh the record if the there is a newer subkey available online + /// Returns Ok(None) if the value was set + /// Returns Ok(Some(newer value)) if a newer value was found online pub async fn set_value( &self, key: TypedKey, @@ -328,6 +333,7 @@ impl StorageManager { } else { ValueData::new(data, writer.key) }; + let seq = value_data.seq(); // Validate with schema if !schema.check_subkey_value_data(descriptor.owner(), subkey, &value_data) { @@ -343,110 +349,43 @@ impl StorageManager { vcrypto, writer.secret, )?; - let subkey_result = SubkeyResult { - value: Some(signed_value_data), - descriptor: Some(descriptor) - }; // Get rpc processor and drop mutex so we don't block while getting the value from the network let Some(rpc_processor) = inner.rpc_processor.clone() else { // Offline, just write it locally and return immediately inner - .handle_set_local_value(key, subkey, signed_value_data) + .handle_set_local_value(key, subkey, signed_value_data.clone()) .await?; - }; - // Drop the lock for network access - drop(inner); - - // Use the safety selection we opened the record with - let final_subkey_result = self - .do_set_value( - rpc_processor, - key, - subkey, - opened_record.safety_selection(), - subkey_result, - ) - .await?; - - // See if we got a value back - let Some(subkey_result_value) = subkey_result.value else { - // If we got nothing back then we also had nothing beforehand, return nothing - return Ok(None); - }; - - // If we got a new value back then write it to the opened record - if Some(subkey_result_value.value_data().seq()) != opt_last_seq { - let mut inner = self.lock().await?; - inner - .handle_set_local_value(key, subkey, subkey_result_value.clone()) - .await?; - } - Ok(Some(subkey_result_value.into_value_data())) - - - - - - - - - - - // Store subkey locally - inner - .handle_set_local_value(key, subkey, signed_value_data) - .await?; - - // Return the existing value if we have one unless we are forcing a refresh - if !force_refresh { - if let Some(last_subkey_result_value) = last_subkey_result.value { - return Ok(Some(last_subkey_result_value.into_value_data())); - } - } - - // Refresh if we can - - // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = inner.rpc_processor.clone() else { - // Offline, try again later - apibail_try_again!(); + // Add to offline writes to flush + inner.offline_subkey_writes.entry(key).and_modify(|x| { x.insert(subkey); } ).or_insert(ValueSubkeyRangeSet::single(subkey)); + return Ok(Some(signed_value_data.into_value_data())) }; // Drop the lock for network access drop(inner); - // May have last descriptor / value // Use the safety selection we opened the record with - let opt_last_seq = last_subkey_result - .value - .as_ref() - .map(|v| v.value_data().seq()); - let subkey_result = self - .do_get_value( + + let final_signed_value_data = self + .outbound_set_value( rpc_processor, key, subkey, opened_record.safety_selection(), - last_subkey_result, + signed_value_data, + descriptor, ) .await?; - // See if we got a value back - let Some(subkey_result_value) = subkey_result.value else { - // If we got nothing back then we also had nothing beforehand, return nothing - return Ok(None); - }; - // If we got a new value back then write it to the opened record - if Some(subkey_result_value.value_data().seq()) != opt_last_seq { + if final_signed_value_data.value_data().seq() != seq { let mut inner = self.lock().await?; inner - .handle_set_local_value(key, subkey, subkey_result_value.clone()) + .handle_set_local_value(key, subkey, final_signed_value_data.clone()) .await?; } - Ok(Some(subkey_result_value.into_value_data())) + Ok(Some(final_signed_value_data.into_value_data())) } pub async fn watch_values( diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index c6f8858a..d37e1d3b 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -9,9 +9,9 @@ use hashlink::LruCache; pub struct RecordStore where - D: Clone + RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, - ::Archived: RkyvDeserialize, + ::Archived: RkyvDeserialize, { table_store: TableStore, name: String, @@ -41,9 +41,9 @@ pub struct SubkeyResult { impl RecordStore where - D: Clone + RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, - ::Archived: RkyvDeserialize, + ::Archived: RkyvDeserialize, { pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { let subkey_cache_size = limits.subkey_cache_size as usize; @@ -421,7 +421,11 @@ where ) -> Result<(), VeilidAPIError> { // Check size limit for data if signed_value_data.value_data().data().len() > self.limits.max_subkey_size { - return Err(VeilidAPIError::generic("record subkey too large")); + apibail_invalid_argument!( + "record subkey too large", + "signed_value_data.value_data.data.len", + signed_value_data.value_data().data().len() + ); } // Get record from index diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs new file mode 100644 index 00000000..22afcc8a --- /dev/null +++ b/veilid-core/src/storage_manager/set_value.rs @@ -0,0 +1,225 @@ +use super::*; + +/// The context of the do_get_value operation +struct DoSetValueContext { + /// The latest value of the subkey, may be the value passed in + pub value: SignedValueData, + /// The consensus count for the value we have received + pub value_count: usize, + /// The parsed schema from the descriptor if we have one + pub schema: DHTSchema, +} + +impl StorageManager { + + /// Perform a 'set value' query on the network + pub async fn outbound_set_value( + &self, + rpc_processor: RPCProcessor, + key: TypedKey, + subkey: ValueSubkey, + safety_selection: SafetySelection, + value: SignedValueData, + descriptor: SignedValueDescriptor, + ) -> Result { + let routing_table = rpc_processor.routing_table(); + + // Get the DHT parameters for 'SetValue' + let (key_count, consensus_count, fanout, timeout_us) = { + let c = self.unlocked_inner.config.get(); + ( + c.network.dht.max_find_node_count as usize, + c.network.dht.set_value_count as usize, + c.network.dht.set_value_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)), + ) + }; + + // Make do-set-value answer context + let schema = descriptor.schema()?; + let context = Arc::new(Mutex::new(DoSetValueContext { + value, + value_count: 0, + schema, + })); + + // Routine to call to generate fanout + let call_routine = |next_node: NodeRef| { + let rpc_processor = rpc_processor.clone(); + let context = context.clone(); + let descriptor = descriptor.clone(); + async move { + + let send_descriptor = true; // xxx check if next_node needs the descriptor or not + + // get most recent value to send + let value = { + let ctx = context.lock(); + ctx.value.clone() + }; + + // send across the wire + let vres = rpc_processor + .clone() + .rpc_call_set_value( + Destination::direct(next_node).with_safety(safety_selection), + key, + subkey, + value, + descriptor.clone(), + send_descriptor, + ) + .await?; + let sva = network_result_value_or_log!(vres => { + // Any other failures, just try the next node + return Ok(None); + }); + + // If the node was close enough to possibly set the value + if sva.answer.set { + let mut ctx = context.lock(); + + // Keep the value if we got one and it is newer and it passes schema validation + if let Some(value) = sva.answer.value { + + // Validate with schema + if !ctx.schema.check_subkey_value_data( + descriptor.owner(), + subkey, + value.value_data(), + ) { + // Validation failed, ignore this value + // Move to the next node + return Ok(None); + } + + // We have a prior value, ensure this is a newer sequence number + let prior_seq = ctx.value.value_data().seq(); + let new_seq = value.value_data().seq(); + if new_seq > prior_seq { + // If the sequence number is greater, keep it + ctx.value = value; + // One node has show us this value so far + ctx.value_count = 1; + } else { + // If the sequence number is older, or an equal sequence number, + // node should have not returned a value here. + // Skip this node's closer list because it is misbehaving + return Ok(None); + } + } + else + { + // It was set on this node and no newer value was found and returned, + // so increase our consensus count + ctx.value_count += 1; + } + } + + // Return peers if we have some + Ok(Some(sva.answer.peers)) + } + }; + + // Routine to call to check if we're done at each step + let check_done = |_closest_nodes: &[NodeRef]| { + // If we have reached sufficient consensus, return done + let ctx = context.lock(); + if ctx.value_count >= consensus_count { + return Some(()); + } + None + }; + + // Call the fanout + let fanout_call = FanoutCall::new( + routing_table.clone(), + key, + key_count, + fanout, + timeout_us, + call_routine, + check_done, + ); + + match fanout_call.run().await { + // If we don't finish in the timeout (too much time passed checking for consensus) + TimeoutOr::Timeout | + // If we finished with consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) | + // If we finished without consensus (ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => { + // Return the best answer we've got + let ctx = context.lock(); + Ok(ctx.value.clone()) + } + // Failed + TimeoutOr::Value(Err(e)) => { + // If we finished with an error, return that + Err(e.into()) + } + } + } + + /// Handle a recieved 'Set Value' query + /// Returns a None if the value passed in was set + /// Returns a Some(current value) if the value was older and the current value was kept + pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option) -> Result>, VeilidAPIError> { + let mut inner = self.lock().await?; + + // See if the subkey we are modifying has a last known local value + let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?; + + // Make sure this value would actually be newer + if let Some(last_value) = &last_subkey_result.value { + if value.value_data().seq() < last_value.value_data().seq() { + // inbound value is older than the one we have, just return the one we have + return Ok(NetworkResult::value(Some(last_value.clone()))); + } + } + + // Get the descriptor and schema for the key + let actual_descriptor = match last_subkey_result.descriptor { + Some(last_descriptor) => { + if let Some(descriptor) = descriptor { + // Descriptor must match last one if it is provided + if descriptor.cmp_no_sig(&last_descriptor) != cmp::Ordering::Equal { + return Ok(NetworkResult::invalid_message("setvalue descriptor does not match last descriptor")); + } + } else { + // Descriptor was not provided always go with last descriptor + } + last_descriptor + } + None => { + if let Some(descriptor) = descriptor { + descriptor + } else { + // No descriptor + return Ok(NetworkResult::invalid_message("descriptor must be provided")); + } + } + }; + let Ok(schema) = actual_descriptor.schema() else { + return Ok(NetworkResult::invalid_message("invalid schema")); + }; + + // Validate new value with schema + if !schema.check_subkey_value_data(actual_descriptor.owner(), subkey, value.value_data()) { + // Validation failed, ignore this value + return Ok(NetworkResult::invalid_message("failed schema validation")); + } + + // Do the set and return no new value + match inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await { + Ok(()) => {}, + Err(VeilidAPIError::Internal { message }) => { + apibail_internal!(message); + }, + Err(e) => { + return Ok(NetworkResult::invalid_message(e)); + }, + } + Ok(NetworkResult::value(None)) + } +} diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 149feb6f..58f0e46c 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -120,10 +120,14 @@ impl StorageManagerInner { // Final flush on record stores if let Some(mut local_record_store) = self.local_record_store.take() { - local_record_store.tick().await; + if let Err(e) = local_record_store.tick().await { + log_stor!(error "termination local record store tick failed: {}", e); + } } if let Some(mut remote_record_store) = self.remote_record_store.take() { - remote_record_store.tick().await; + if let Err(e) = remote_record_store.tick().await { + log_stor!(error "termination remote record store tick failed: {}", e); + } } // Save metadata @@ -142,7 +146,7 @@ impl StorageManagerInner { async fn save_metadata(&mut self) -> EyreResult<()>{ if let Some(metadata_db) = &self.metadata_db { let tx = metadata_db.transact(); - tx.store_rkyv(0, b"offline_subkey_writes", &self.offline_subkey_writes); + tx.store_rkyv(0, b"offline_subkey_writes", &self.offline_subkey_writes)?; tx.commit().await.wrap_err("failed to commit")? } Ok(()) @@ -155,8 +159,6 @@ impl StorageManagerInner { Ok(()) } - write offline subkey write flush background task or make a ticket for it and get back to it after the rest of set value - pub async fn create_new_owned_local_record( &mut self, kind: CryptoKind, @@ -386,12 +388,23 @@ impl StorageManagerInner { key: TypedKey, subkey: ValueSubkey, signed_value_data: SignedValueData, + signed_value_descriptor: SignedValueDescriptor, ) -> Result<(), VeilidAPIError> { // See if it's in the remote record store let Some(remote_record_store) = self.remote_record_store.as_mut() else { apibail_not_initialized!(); }; + // See if we have a remote record already or not + if remote_record_store.with_record(key, |_|{}).is_none() { + // record didn't exist, make it + let cur_ts = get_aligned_timestamp(); + let remote_record_detail = RemoteRecordDetail { }; + let record = + Record::::new(cur_ts, signed_value_descriptor, remote_record_detail)?; + remote_record_store.new_record(key, record).await? + }; + // Write subkey to remote store remote_record_store .set_subkey(key, subkey, signed_value_data) @@ -403,9 +416,9 @@ impl StorageManagerInner { /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where - D: Clone + RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, - ::Archived: RkyvDeserialize, + ::Archived: RkyvDeserialize, { let compiled = record.descriptor().schema_data(); let mut hash_data = Vec::::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len()); diff --git a/veilid-core/src/storage_manager/types/local_record_detail.rs b/veilid-core/src/storage_manager/types/local_record_detail.rs index e632e749..9f16ba80 100644 --- a/veilid-core/src/storage_manager/types/local_record_detail.rs +++ b/veilid-core/src/storage_manager/types/local_record_detail.rs @@ -7,6 +7,6 @@ use super::*; #[archive_attr(repr(C), derive(CheckBytes))] pub struct LocalRecordDetail { /// The last 'safety selection' used when creating/opening this record. - /// Even when closed, this safety selection applies to republication attempts by the system. + /// Even when closed, this safety selection applies to re-publication attempts by the system. pub safety_selection: SafetySelection, } diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/types/record.rs index eab24579..68afa013 100644 --- a/veilid-core/src/storage_manager/types/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -6,9 +6,9 @@ use super::*; #[archive_attr(repr(C), derive(CheckBytes))] pub struct Record where - D: Clone + RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, - ::Archived: RkyvDeserialize, + ::Archived: RkyvDeserialize, { descriptor: SignedValueDescriptor, subkey_count: usize, @@ -19,9 +19,9 @@ where impl Record where - D: Clone + RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, - ::Archived: RkyvDeserialize, + ::Archived: RkyvDeserialize, { pub fn new( cur_ts: Timestamp, diff --git a/veilid-core/src/veilid_api/serialize_helpers/mod.rs b/veilid-core/src/veilid_api/serialize_helpers/mod.rs index edd6c968..5a3805fd 100644 --- a/veilid-core/src/veilid_api/serialize_helpers/mod.rs +++ b/veilid-core/src/veilid_api/serialize_helpers/mod.rs @@ -1,8 +1,9 @@ mod rkyv_enum_set; mod rkyv_range_set_blaze; pub mod serialize_arc; -pub mod serialize_range_set_blaze; mod serialize_json; +pub mod serialize_range_set_blaze; +mod veilid_rkyv; use super::*; use core::fmt::Debug; @@ -10,28 +11,4 @@ use core::fmt::Debug; pub use rkyv_enum_set::*; pub use rkyv_range_set_blaze::*; pub use serialize_json::*; - -pub fn to_rkyv(v: &T) -> EyreResult> -where - T: RkyvSerialize>, -{ - Ok(rkyv::to_bytes::(v) - .wrap_err("failed to freeze object")? - .to_vec()) -} - -pub fn from_rkyv(v: Vec) -> EyreResult -where - T: RkyvArchive, - ::Archived: - for<'t> CheckBytes>, - ::Archived: - rkyv::Deserialize, -{ - match rkyv::from_bytes::(&v) { - Ok(v) => Ok(v), - Err(e) => { - bail!("failed to deserialize frozen object: {}", e); - } - } -} +pub use veilid_rkyv::*; diff --git a/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs b/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs index 6a2c4736..67388e4a 100644 --- a/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs +++ b/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs @@ -52,16 +52,16 @@ where D: rkyv::Fallible + ?Sized, T: rkyv::Archive + Integer, rkyv::Archived: rkyv::Deserialize, - // D::Error: From, // xxx this doesn't work + D::Error: From, { fn deserialize_with( field: &rkyv::Archived>, deserializer: &mut D, ) -> Result, D::Error> { let mut out = RangeSetBlaze::::new(); - // if field.len() % 2 == 1 { - // return Err("invalid range set length".to_owned().into()); - // } + if field.len() % 2 == 1 { + return Err("invalid range set length".to_owned().into()); + } let f = field.as_slice(); for i in 0..field.len() / 2 { let l: T = f[i * 2].deserialize(deserializer)?; diff --git a/veilid-core/src/veilid_api/serialize_helpers/veilid_rkyv.rs b/veilid-core/src/veilid_api/serialize_helpers/veilid_rkyv.rs new file mode 100644 index 00000000..e7658c95 --- /dev/null +++ b/veilid-core/src/veilid_api/serialize_helpers/veilid_rkyv.rs @@ -0,0 +1,151 @@ +use super::*; +use rkyv::ser::Serializer; + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct VeilidRkyvSerializer { + inner: S, +} + +impl VeilidRkyvSerializer { + pub fn into_inner(self) -> S { + self.inner + } +} + +impl rkyv::Fallible for VeilidRkyvSerializer { + type Error = VeilidRkyvError; +} + +impl rkyv::ser::ScratchSpace for VeilidRkyvSerializer { + unsafe fn push_scratch( + &mut self, + layout: core::alloc::Layout, + ) -> Result, Self::Error> { + self.inner + .push_scratch(layout) + .map_err(VeilidRkyvError::Inner) + } + unsafe fn pop_scratch( + &mut self, + ptr: core::ptr::NonNull, + layout: core::alloc::Layout, + ) -> Result<(), Self::Error> { + self.inner + .pop_scratch(ptr, layout) + .map_err(VeilidRkyvError::Inner) + } +} + +impl rkyv::ser::Serializer for VeilidRkyvSerializer { + #[inline] + fn pos(&self) -> usize { + self.inner.pos() + } + + #[inline] + fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> { + self.inner.write(bytes).map_err(VeilidRkyvError::Inner) + } +} + +impl Default for VeilidRkyvSerializer { + fn default() -> Self { + Self { + inner: S::default(), + } + } +} + +pub type DefaultVeilidRkyvSerializer = + VeilidRkyvSerializer>; + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Default)] +pub struct VeilidSharedDeserializeMap { + inner: SharedDeserializeMap, +} + +impl VeilidSharedDeserializeMap { + #[inline] + pub fn new() -> Self { + Self { + inner: SharedDeserializeMap::new(), + } + } +} +impl rkyv::Fallible for VeilidSharedDeserializeMap { + type Error = VeilidRkyvError; +} + +impl rkyv::de::SharedDeserializeRegistry for VeilidSharedDeserializeMap { + fn get_shared_ptr(&mut self, ptr: *const u8) -> Option<&dyn rkyv::de::SharedPointer> { + self.inner.get_shared_ptr(ptr) + } + + fn add_shared_ptr( + &mut self, + ptr: *const u8, + shared: Box, + ) -> Result<(), Self::Error> { + self.inner + .add_shared_ptr(ptr, shared) + .map_err(VeilidRkyvError::Inner) + } +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub enum VeilidRkyvError { + Inner(E), + StringError(String), +} + +impl From for VeilidRkyvError { + fn from(s: String) -> Self { + Self::StringError(s) + } +} + +impl fmt::Display for VeilidRkyvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + VeilidRkyvError::Inner(e) => write!(f, "Inner: {}", e), + VeilidRkyvError::StringError(s) => write!(f, "StringError: {}", s), + } + } +} + +impl std::error::Error for VeilidRkyvError {} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub fn to_rkyv(value: &T) -> EyreResult> +where + T: RkyvSerialize, +{ + let mut serializer = DefaultVeilidRkyvSerializer::default(); + serializer + .serialize_value(value) + .wrap_err("failed to serialize object")?; + Ok(serializer + .into_inner() + .into_serializer() + .into_inner() + .to_vec()) +} + +pub fn from_rkyv(bytes: Vec) -> EyreResult +where + T: RkyvArchive, + ::Archived: + for<'t> CheckBytes>, + ::Archived: RkyvDeserialize, +{ + rkyv::check_archived_root::(&bytes) + .map_err(|e| eyre!("checkbytes failed: {}", e))? + .deserialize(&mut VeilidSharedDeserializeMap::default()) + .map_err(|e| eyre!("failed to deserialize: {}", e)) +} diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs index 5cac4c93..3dd40f67 100644 --- a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -29,6 +29,11 @@ impl ValueSubkeyRangeSet { data: Default::default(), } } + pub fn single(value: ValueSubkey) -> Self { + let mut data = RangeSetBlaze::new(); + data.insert(value); + Self { data } + } } impl Deref for ValueSubkeyRangeSet {