diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 8d58e6d8..42e9a3cb 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -6,7 +6,7 @@ pub(crate) enum Destination { /// Send to node directly Direct { /// The node to send to - target: NodeRef, + node: NodeRef, /// Require safety route or not safety_selection: SafetySelection, }, @@ -15,7 +15,7 @@ pub(crate) enum Destination { /// The relay to send to relay: NodeRef, /// The final destination the relay should send to - target: NodeRef, + node: NodeRef, /// Require safety route or not safety_selection: SafetySelection, }, @@ -29,15 +29,15 @@ pub(crate) enum Destination { } impl Destination { - pub fn target(&self) -> Option { + pub fn node(&self) -> Option { match self { Destination::Direct { - target, + node: target, safety_selection: _, } => Some(target.clone()), Destination::Relay { relay: _, - target, + node: target, safety_selection: _, } => Some(target.clone()), Destination::PrivateRoute { @@ -46,18 +46,18 @@ impl Destination { } => None, } } - pub fn direct(target: NodeRef) -> Self { - let sequencing = target.sequencing(); + pub fn direct(node: NodeRef) -> Self { + let sequencing = node.sequencing(); Self::Direct { - target, + node, safety_selection: SafetySelection::Unsafe(sequencing), } } - pub fn relay(relay: NodeRef, target: NodeRef) -> Self { - let sequencing = relay.sequencing().max(target.sequencing()); + pub fn relay(relay: NodeRef, node: NodeRef) -> Self { + let sequencing = relay.sequencing().max(node.sequencing()); Self::Relay { relay, - target, + node, safety_selection: SafetySelection::Unsafe(sequencing), } } @@ -71,19 +71,19 @@ impl Destination { pub fn with_safety(self, safety_selection: SafetySelection) -> Self { match self { Destination::Direct { - target, + node, safety_selection: _, } => Self::Direct { - target, + node, safety_selection, }, Destination::Relay { relay, - target, + node, safety_selection: _, } => Self::Relay { relay, - target, + node, safety_selection, }, Destination::PrivateRoute { @@ -99,12 +99,12 @@ impl Destination { pub fn get_safety_selection(&self) -> &SafetySelection { match self { Destination::Direct { - target: _, + node: _, safety_selection, } => safety_selection, Destination::Relay { relay: _, - target: _, + node: _, safety_selection, } => safety_selection, Destination::PrivateRoute { @@ -113,13 +113,31 @@ impl Destination { } => safety_selection, } } + + pub fn get_target(&self) -> Target { + match self { + Destination::Direct { + node, + safety_selection: _, + } + | Destination::Relay { + relay: _, + node, + safety_selection: _, + } => Target::NodeId(node.best_node_id()), + Destination::PrivateRoute { + private_route, + safety_selection: _, + } => Target::PrivateRoute(private_route.public_key.value), + } + } } impl fmt::Display for Destination { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Destination::Direct { - target, + node, safety_selection, } => { let sr = if matches!(safety_selection, SafetySelection::Safe(_)) { @@ -128,11 +146,11 @@ impl fmt::Display for Destination { "" }; - write!(f, "{}{}", target, sr) + write!(f, "{}{}", node, sr) } Destination::Relay { relay, - target, + node, safety_selection, } => { let sr = if matches!(safety_selection, SafetySelection::Safe(_)) { @@ -141,7 +159,7 @@ impl fmt::Display for Destination { "" }; - write!(f, "{}@{}{}", target, relay, sr) + write!(f, "{}@{}{}", node, relay, sr) } Destination::PrivateRoute { private_route, @@ -160,6 +178,46 @@ impl fmt::Display for Destination { } impl RPCProcessor { + /// Convert a 'Target' into a 'Destination' + pub async fn resolve_target_to_destination( + &self, + target: Target, + safety_selection: SafetySelection, + sequencing: Sequencing, + ) -> Result { + match target { + Target::NodeId(node_id) => { + // Resolve node + let mut nr = match self.resolve_node(node_id, safety_selection).await? { + Some(nr) => nr, + None => { + return Err(RPCError::network("could not resolve node id")); + } + }; + // Apply sequencing to match safety selection + nr.set_sequencing(sequencing); + + Ok(rpc_processor::Destination::Direct { + node: nr, + safety_selection, + }) + } + Target::PrivateRoute(rsid) => { + // Get remote private route + let rss = self.routing_table().route_spec_store(); + + let Some(private_route) = rss.best_remote_private_route(&rsid) else { + return Err(RPCError::network("could not get remote private route")); + }; + + Ok(rpc_processor::Destination::PrivateRoute { + private_route, + safety_selection, + }) + } + } + } + /// Convert the 'Destination' into a 'RespondTo' for a response pub(super) fn get_destination_respond_to( &self, @@ -170,7 +228,7 @@ impl RPCProcessor { match dest { Destination::Direct { - target, + node: target, safety_selection, } => match safety_selection { SafetySelection::Unsafe(_) => { @@ -198,7 +256,7 @@ impl RPCProcessor { }, Destination::Relay { relay, - target, + node: target, safety_selection, } => match safety_selection { SafetySelection::Unsafe(_) => { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 7c623409..db07a467 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -732,12 +732,12 @@ impl RPCProcessor { // To where are we sending the request match dest { Destination::Direct { - target: ref node_ref, + node: ref node_ref, safety_selection, } | Destination::Relay { relay: ref node_ref, - target: _, + node: _, safety_selection, } => { // Send to a node without a private route @@ -746,7 +746,7 @@ impl RPCProcessor { // Get the actual destination node id accounting for relays let (node_ref, destination_node_ref) = if let Destination::Relay { relay: _, - ref target, + node: ref target, safety_selection: _, } = dest { @@ -854,12 +854,12 @@ impl RPCProcessor { let routing_table = self.routing_table(); let target = match dest { Destination::Direct { - target, + node: target, safety_selection: _, } => target.clone(), Destination::Relay { relay: _, - target, + node: target, safety_selection: _, } => target.clone(), Destination::PrivateRoute { diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index b251e97e..148c10f4 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -35,7 +35,7 @@ impl RPCProcessor { ) ->RPCNetworkResult> { // Ensure destination never has a private route // and get the target noderef so we can validate the response - let Some(target) = dest.target() else { + let Some(target) = dest.node() else { return Err(RPCError::internal( "Never send get value requests over private routes", )); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 397f3b03..5c9201ab 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -39,7 +39,7 @@ impl RPCProcessor { ) ->RPCNetworkResult> { // Ensure destination never has a private route // and get the target noderef so we can validate the response - let Some(target) = dest.target() else { + let Some(target) = dest.node() else { return Err(RPCError::internal( "Never send set value requests over private routes", )); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 51832682..e8b48975 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -27,7 +27,7 @@ impl RPCProcessor { SafetySelection::Unsafe(_) => { let (opt_target_nr, routing_domain) = match &dest { Destination::Direct { - target, + node: target, safety_selection: _, } => { let routing_domain = match target.best_routing_domain() { @@ -52,7 +52,7 @@ impl RPCProcessor { } Destination::Relay { relay, - target, + node: target, safety_selection: _, } => { let routing_domain = match relay.best_routing_domain() { @@ -147,7 +147,7 @@ impl RPCProcessor { let mut opt_sender_info = None; match dest { Destination::Direct { - target, + node: target, safety_selection, } => { if matches!(safety_selection, SafetySelection::Unsafe(_)) { @@ -183,7 +183,7 @@ impl RPCProcessor { } Destination::Relay { relay: _, - target: _, + node: _, safety_selection: _, } | Destination::PrivateRoute { diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 96c1b1fc..18b31c36 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -2,13 +2,67 @@ use super::*; impl RPCProcessor { #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))] + // Sends a high level app message + // Can be sent via all methods including relays and routes + #[cfg_attr( + feature = "verbose-tracing", + instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err) + )] + pub async fn rpc_call_value_changed( + self, + dest: Destination, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + count: u32, + value: SignedValueData, + ) -> RPCNetworkResult<()> { + let value_changed = RPCOperationValueChanged::new(key, subkeys, count, value); + let statement = + RPCStatement::new(RPCStatementDetail::ValueChanged(Box::new(value_changed))); + + // Send the value changed request + self.statement(dest, statement).await + } + pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> { - // Ignore if disabled - let routing_table = self.routing_table(); - let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); - if !opi.signed_node_info().node_info().has_capability(CAP_DHT) { - return Ok(NetworkResult::service_unavailable("dht is not available")); + // Get the statement + let (_, _, _, kind) = msg.operation.destructure(); + let (key, subkeys, count, value) = match kind { + RPCOperationKind::Statement(s) => match s.destructure() { + RPCStatementDetail::ValueChanged(s) => s.destructure(), + _ => panic!("not a value changed statement"), + }, + _ => panic!("not a statement"), + }; + + #[cfg(feature = "debug-dht")] + { + let debug_string_value = format!( + " len={} seq={} writer={}", + value.value_data().data().len(), + value.value_data().seq(), + value.value_data().writer(), + ); + + let debug_string_stmt = format!( + "IN <== ValueChanged({} #{:?}+{}{}) <= {}", + key, + subkeys, + count, + debug_string_value, + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string_stmt); } - Err(RPCError::unimplemented("process_value_changed")) + + // Save the subkey, creating a new record if necessary + let storage_manager = self.storage_manager(); + storage_manager + .inbound_value_changed(key, subkeys, count, value) + .await + .map_err(RPCError::internal)?; + + Ok(NetworkResult::value(())) } } diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 7781398e..2861bb67 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -33,7 +33,7 @@ impl RPCProcessor { ) -> RPCNetworkResult> { // Ensure destination never has a private route // and get the target noderef so we can validate the response - let Some(target) = dest.target() else { + let Some(target) = dest.node() else { return Err(RPCError::internal( "Never send watch value requests over private routes", )); @@ -151,12 +151,101 @@ impl RPCProcessor { #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { + // Ensure this never came over a private route, safety route is okay though + match &msg.header.detail { + RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} + RPCMessageHeaderDetail::PrivateRouted(_) => { + return Ok(NetworkResult::invalid_message( + "not processing watch value request over private route", + )) + } + } + // Ignore if disabled let routing_table = self.routing_table(); let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); if !opi.signed_node_info().node_info().has_capability(CAP_DHT) { return Ok(NetworkResult::service_unavailable("dht is not available")); } - Err(RPCError::unimplemented("process_watch_value_q")) + + // Get the question + let kind = msg.operation.kind().clone(); + let watch_value_q = match kind { + RPCOperationKind::Question(q) => match q.destructure() { + (_, RPCQuestionDetail::WatchValueQ(q)) => q, + _ => panic!("not a watchvalue question"), + }, + _ => panic!("not a question"), + }; + + // Destructure + let (key, subkeys, expiration, count, opt_watch_signature) = watch_value_q.destructure(); + let opt_watcher = opt_watch_signature.map(|ws| ws.0); + + // Get target for ValueChanged notifications + let dest = network_result_try!(self.get_respond_to_destination(&msg)); + let target = dest.get_target(); + + #[cfg(feature = "debug-dht")] + { + let debug_string = format!( + "IN <=== WatchValueQ({} {}#{:?}@{}+{}) <== {}", + key, + if opt_watcher.is_some() { "+W " } else { "" }, + subkeys, + expiration, + count, + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string); + } + + // See if we have this record ourselves, if so, accept the watch + let storage_manager = self.storage_manager(); + let ret_expiration = network_result_try!(storage_manager + .inbound_watch_value( + key, + subkeys, + Timestamp::new(expiration), + count, + target, + opt_watcher + ) + .await + .map_err(RPCError::internal)?); + + // Get the nodes that we know about that are closer to the the key than our own node + let routing_table = self.routing_table(); + let closer_to_key_peers = if ret_expiration.as_u64() == 0 { + network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])) + } else { + vec![] + }; + + #[cfg(feature = "debug-dht")] + { + let debug_string_answer = format!( + "IN ===> WatchValueA({} #{} expiration={} peers={}) ==> {}", + key, + subkeys, + ret_expiration, + closer_to_key_peers.len(), + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string_answer); + } + + // Make WatchValue answer + let watch_value_a = + RPCOperationWatchValueA::new(ret_expiration.as_u64(), closer_to_key_peers)?; + + // Send GetValue answer + self.answer( + msg, + RPCAnswer::new(RPCAnswerDetail::WatchValueA(Box::new(watch_value_a))), + ) + .await } } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 2243af42..7917938b 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -243,18 +243,26 @@ impl StorageManager { want_descriptor: bool, ) -> VeilidAPIResult> { let mut inner = self.lock().await?; - let res = match inner - .handle_get_remote_value(key, subkey, want_descriptor) - .await - { - Ok(res) => res, - Err(VeilidAPIError::Internal { message }) => { - apibail_internal!(message); - } - Err(e) => { - return Ok(NetworkResult::invalid_message(e)); + + // See if this is a remote or local value + let (_is_local, last_subkey_result) = { + // See if the subkey we are getting has a last known local value + let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + // If this is local, it must have a descriptor already + if last_subkey_result.descriptor.is_some() { + if !want_descriptor { + last_subkey_result.descriptor = None; + } + (true, last_subkey_result) + } else { + // See if the subkey we are getting has a last known remote value + let last_subkey_result = inner + .handle_get_remote_value(key, subkey, want_descriptor) + .await?; + (false, last_subkey_result) } }; - Ok(NetworkResult::value(res)) + + Ok(NetworkResult::value(last_subkey_result)) } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index c9424ee5..db9e718c 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -172,21 +172,39 @@ impl StorageManager { subkeys: ValueSubkeyRangeSet, expiration: Timestamp, count: u32, - // xxx more here - ) -> VeilidAPIResult> { + target: Target, + opt_watcher: Option, + ) -> VeilidAPIResult> { let mut inner = self.lock().await?; - let res = match inner - .handle_watch_remote_value(key, subkeys, expiration, count) - .await - { - Ok(res) => res, - Err(VeilidAPIError::Internal { message }) => { - apibail_internal!(message); - } - Err(e) => { - return Ok(NetworkResult::invalid_message(e)); + + // See if this is a remote or local value + let (_is_local, opt_expiration_ts) = { + // See if the subkey we are watching has a local value + let opt_expiration_ts = inner + .handle_watch_local_value(key, subkeys, expiration, count, target, opt_watcher) + .await?; + if opt_expiration_ts.is_some() { + (true, opt_expiration_ts) + } else { + // See if the subkey we are watching is a remote value + let opt_expiration_ts = inner + .handle_watch_remote_value(key, subkeys, expiration, count, target, opt_watcher) + .await?; + (false, opt_expiration_ts) } }; - Ok(NetworkResult::value(res)) + + Ok(NetworkResult::value(opt_expiration_ts.unwrap_or_default())) + } + + /// Handle a received 'Value Changed' statement + pub async fn inbound_value_changed( + &self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + count: u32, + value: SignedValueData, + ) -> VeilidAPIResult<()> { + // } } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 08eaf1dd..305fbb9e 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -888,7 +888,7 @@ impl VeilidAPI { match &dest { Destination::Direct { - target, + node: target, safety_selection: _, } => Ok(format!( "Destination: {:#?}\nTarget Entry:\n{}\n", @@ -897,7 +897,7 @@ impl VeilidAPI { )), Destination::Relay { relay, - target, + node: target, safety_selection: _, } => Ok(format!( "Destination: {:#?}\nTarget Entry:\n{}\nRelay Entry:\n{}\n", diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 5520c6e7..a40df864 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -123,40 +123,14 @@ impl RoutingContext { async fn get_destination(&self, target: Target) -> VeilidAPIResult { let rpc_processor = self.api.rpc_processor()?; - - match target { - Target::NodeId(node_id) => { - // Resolve node - let mut nr = match rpc_processor - .resolve_node(node_id, self.unlocked_inner.safety_selection) - .await - { - Ok(Some(nr)) => nr, - Ok(None) => apibail_invalid_target!("could not resolve node id"), - Err(e) => return Err(e.into()), - }; - // Apply sequencing to match safety selection - nr.set_sequencing(self.sequencing()); - - Ok(rpc_processor::Destination::Direct { - target: nr, - safety_selection: self.unlocked_inner.safety_selection, - }) - } - Target::PrivateRoute(rsid) => { - // Get remote private route - let rss = self.api.routing_table()?.route_spec_store(); - - let Some(private_route) = rss.best_remote_private_route(&rsid) else { - apibail_invalid_target!("could not get remote private route"); - }; - - Ok(rpc_processor::Destination::PrivateRoute { - private_route, - safety_selection: self.unlocked_inner.safety_selection, - }) - } - } + rpc_processor + .resolve_target_to_destination( + target, + self.unlocked_inner.safety_selection, + self.sequencing(), + ) + .await + .map_err(VeilidAPIError::invalid_target) } ////////////////////////////////////////////////////////////////