diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 97904483..b3c19e91 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -249,7 +249,7 @@ struct SignedNodeInfo { } struct SenderInfo { - socketAddress @0 :SocketAddress; # socket address was available for peer + socketAddress @0 :SocketAddress; # socket address that for the sending peer } struct PeerInfo { @@ -265,12 +265,12 @@ struct RoutedOperation { } struct OperationStatusQ { - nodeStatus @0 :NodeStatus; # node status update about the statusq sender + nodeStatus @0 :NodeStatus; # Optional: node status update about the statusq sender } struct OperationStatusA { - nodeStatus @0 :NodeStatus; # returned node status - senderInfo @1 :SenderInfo; # info about StatusQ sender from the perspective of the replier + nodeStatus @0 :NodeStatus; # Optional: returned node status + senderInfo @1 :SenderInfo; # Optional: info about StatusQ sender from the perspective of the replier } struct OperationValidateDialInfo { diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 8ae5256f..a7166f5e 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -783,6 +783,26 @@ impl NetworkManager { .await } + /// Process a received safety receipt + #[instrument(level = "trace", skip(self, receipt_data), ret)] + pub async fn handle_safety_receipt>( + &self, + receipt_data: R, + ) -> NetworkResult<()> { + let receipt_manager = self.receipt_manager(); + + let receipt = match Receipt::from_signed_data(receipt_data.as_ref()) { + Err(e) => { + return NetworkResult::invalid_message(e.to_string()); + } + Ok(v) => v, + }; + + receipt_manager + .handle_receipt(receipt, ReceiptReturned::Safety) + .await + } + /// Process a received private receipt #[instrument(level = "trace", skip(self, receipt_data), ret)] pub async fn handle_private_receipt>( @@ -1025,7 +1045,8 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedOutOfBand => { + | ReceiptEvent::ReturnedOutOfBand + | ReceiptEvent::ReturnedSafety => { return Ok(NetworkResult::invalid_message( "reverse connect receipt should be returned in-band", )); @@ -1127,7 +1148,8 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedOutOfBand => { + | ReceiptEvent::ReturnedOutOfBand + | ReceiptEvent::ReturnedSafety => { return Ok(NetworkResult::invalid_message( "hole punch receipt should be returned in-band", )); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index cd4989ce..c24ec733 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -79,7 +79,7 @@ impl DiscoveryContext { async fn request_public_address(&self, node_ref: NodeRef) -> Option { let rpc = self.routing_table.rpc_processor(); - let res = network_result_value_or_log!(debug match rpc.rpc_call_status(node_ref.clone()).await { + let res = network_result_value_or_log!(debug match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await { Ok(v) => v, Err(e) => { log_net!(error @@ -98,7 +98,7 @@ impl DiscoveryContext { node_ref, res.answer ); - res.answer.socket_address + res.answer.map(|si| si.socket_address) } // find fast peers with a particular address type, and ask them to tell us what our external address is diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index ddcc065d..a12d6018 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -343,7 +343,7 @@ impl NetworkManager { &self, cur_ts: u64, unord: &mut FuturesUnordered< - SendPinBoxFuture>, RPCError>>, + SendPinBoxFuture>>, RPCError>>, >, ) -> EyreResult<()> { let rpc = self.rpc_processor(); @@ -394,7 +394,7 @@ impl NetworkManager { nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); log_net!("--> Keepalive ping to {:?}", nr_filtered); unord.push( - async move { rpc.rpc_call_status(nr_filtered).await } + async move { rpc.rpc_call_status(Destination::direct(nr_filtered)).await } .instrument(Span::current()) .boxed(), ); @@ -408,7 +408,7 @@ impl NetworkManager { if !did_pings { let rpc = rpc.clone(); unord.push( - async move { rpc.rpc_call_status(nr).await } + async move { rpc.rpc_call_status(Destination::direct(nr)).await } .instrument(Span::current()) .boxed(), ); @@ -425,7 +425,7 @@ impl NetworkManager { &self, cur_ts: u64, unord: &mut FuturesUnordered< - SendPinBoxFuture>, RPCError>>, + SendPinBoxFuture>>, RPCError>>, >, ) -> EyreResult<()> { let rpc = self.rpc_processor(); @@ -440,7 +440,7 @@ impl NetworkManager { // Just do a single ping with the best protocol for all the nodes unord.push( - async move { rpc.rpc_call_status(nr).await } + async move { rpc.rpc_call_status(Destination::direct(nr)).await } .instrument(Span::current()) .boxed(), ); diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index 4ea815e8..3e9971a3 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -11,6 +11,7 @@ use xx::*; pub enum ReceiptEvent { ReturnedOutOfBand, ReturnedInBand { inbound_noderef: NodeRef }, + ReturnedSafety, ReturnedPrivate { private_route: DHTKey }, Expired, Cancelled, @@ -20,6 +21,7 @@ pub enum ReceiptEvent { pub enum ReceiptReturned { OutOfBand, InBand { inbound_noderef: NodeRef }, + Safety, Private { private_route: DHTKey }, } @@ -412,6 +414,7 @@ impl ReceiptManager { match receipt_returned { ReceiptReturned::OutOfBand => "OutOfBand".to_owned(), ReceiptReturned::InBand { ref inbound_noderef } => format!("InBand({})", inbound_noderef), + ReceiptReturned::Safety => "Safety".to_owned(), ReceiptReturned::Private { ref private_route } => format!("Private({})", private_route), }, if extra_data.is_empty() { @@ -445,6 +448,7 @@ impl ReceiptManager { // Get the receipt event to return let receipt_event = match receipt_returned { ReceiptReturned::OutOfBand => ReceiptEvent::ReturnedOutOfBand, + ReceiptReturned::Safety => ReceiptEvent::ReturnedSafety, ReceiptReturned::InBand { ref inbound_noderef, } => ReceiptEvent::ReturnedInBand { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 844e5a21..d99a5620 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -626,7 +626,7 @@ impl RouteSpecStore { signatures: &[DHTSignature], data: &[u8], last_hop_id: DHTKey, - ) -> EyreResult> { + ) -> EyreResult> { let inner = &*self.inner.lock(); let rsd = Self::detail(inner, &public_key).ok_or_else(|| eyre!("route does not exist"))?; @@ -656,12 +656,12 @@ impl RouteSpecStore { // We got the correct signatures, return a key ans Ok(Some(( rsd.secret_key, - SafetySelection::Safe(SafetySpec { + SafetySpec { preferred_route: Some(*public_key), hop_count: rsd.hops.len(), stability: rsd.stability, sequencing: rsd.sequencing, - }), + }, ))) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs index 7f4d3da7..71ed2117 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -3,42 +3,59 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationStatusQ { - pub node_status: NodeStatus, + pub node_status: Option, } impl RPCOperationStatusQ { pub fn decode( reader: &veilid_capnp::operation_status_q::Reader, ) -> Result { - let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?; - let node_status = decode_node_status(&ns_reader)?; + let node_status = if reader.has_node_status() { + let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?; + let node_status = decode_node_status(&ns_reader)?; + Some(node_status) + } else { + None + }; Ok(RPCOperationStatusQ { node_status }) } pub fn encode( &self, builder: &mut veilid_capnp::operation_status_q::Builder, ) -> Result<(), RPCError> { - let mut ns_builder = builder.reborrow().init_node_status(); - encode_node_status(&self.node_status, &mut ns_builder)?; + if let Some(ns) = &self.node_status { + let mut ns_builder = builder.reborrow().init_node_status(); + encode_node_status(&ns, &mut ns_builder)?; + } Ok(()) } } #[derive(Debug, Clone)] pub struct RPCOperationStatusA { - pub node_status: NodeStatus, - pub sender_info: SenderInfo, + pub node_status: Option, + pub sender_info: Option, } impl RPCOperationStatusA { pub fn decode( reader: &veilid_capnp::operation_status_a::Reader, ) -> Result { - let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?; - let node_status = decode_node_status(&ns_reader)?; + let node_status = if reader.has_node_status() { + let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?; + let node_status = decode_node_status(&ns_reader)?; + Some(node_status) + } else { + None + }; - let si_reader = reader.get_sender_info().map_err(RPCError::protocol)?; - let sender_info = decode_sender_info(&si_reader)?; + let sender_info = if reader.has_sender_info() { + let si_reader = reader.get_sender_info().map_err(RPCError::protocol)?; + let sender_info = decode_sender_info(&si_reader)?; + Some(sender_info) + } else { + None + }; Ok(RPCOperationStatusA { node_status, @@ -49,10 +66,14 @@ impl RPCOperationStatusA { &self, builder: &mut veilid_capnp::operation_status_a::Builder, ) -> Result<(), RPCError> { - let mut ns_builder = builder.reborrow().init_node_status(); - encode_node_status(&self.node_status, &mut ns_builder)?; - let mut si_builder = builder.reborrow().init_sender_info(); - encode_sender_info(&self.sender_info, &mut si_builder)?; + if let Some(ns) = &self.node_status { + let mut ns_builder = builder.reborrow().init_node_status(); + encode_node_status(&ns, &mut ns_builder)?; + } + if let Some(si) = &self.sender_info { + let mut si_builder = builder.reborrow().init_sender_info(); + encode_sender_info(&si, &mut si_builder)?; + } Ok(()) } } diff --git a/veilid-core/src/rpc_processor/coders/sender_info.rs b/veilid-core/src/rpc_processor/coders/sender_info.rs index ffea5be8..5fbea344 100644 --- a/veilid-core/src/rpc_processor/coders/sender_info.rs +++ b/veilid-core/src/rpc_processor/coders/sender_info.rs @@ -5,30 +5,21 @@ pub fn encode_sender_info( sender_info: &SenderInfo, builder: &mut veilid_capnp::sender_info::Builder, ) -> Result<(), RPCError> { - if let Some(socket_address) = &sender_info.socket_address { - let mut sab = builder.reborrow().init_socket_address(); - encode_socket_address(socket_address, &mut sab)?; - } + let mut sab = builder.reborrow().init_socket_address(); + encode_socket_address(&sender_info.socket_address, &mut sab)?; Ok(()) } pub fn decode_sender_info( reader: &veilid_capnp::sender_info::Reader, ) -> Result { - if !reader.has_socket_address() { - return Err(RPCError::internal("invalid socket address type")); - } - let socket_address = if reader.has_socket_address() { - Some(decode_socket_address( - &reader - .reborrow() - .get_socket_address() - .map_err(RPCError::map_internal( - "invalid socket address in sender_info", - ))?, - )?) - } else { - None - }; + let sa_reader = reader + .reborrow() + .get_socket_address() + .map_err(RPCError::map_internal( + "invalid socket address in sender_info", + ))?; + let socket_address = decode_socket_address(&sa_reader)?; + Ok(SenderInfo { socket_address }) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index ac21e054..4d43c1cc 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -25,6 +25,7 @@ pub use coders::*; pub use destination::*; pub use operation_waiter::*; pub use rpc_error::*; +pub use rpc_status::*; use super::*; use crate::crypto::*; @@ -64,8 +65,8 @@ struct RPCMessageHeaderDetailSafetyRouted { struct RPCMessageHeaderDetailPrivateRouted { /// The private route we received the rpc over private_route: DHTKey, - // The safety selection for replying to this private routed rpc - safety_selection: SafetySelection, + // The safety spec for replying to this private routed rpc + safety_spec: SafetySpec, } #[derive(Debug, Clone)] @@ -807,7 +808,7 @@ impl RPCProcessor { ); } RPCMessageHeaderDetail::SafetyRouted(detail) => { - // If this was sent via a safety route, but no received over our private route, don't respond with a safety route, + // If this was sent via a safety route, but not received over our private route, don't respond with a safety route, // it would give away which safety routes belong to this node NetworkResult::value(Destination::private_route( pr.clone(), @@ -818,7 +819,7 @@ impl RPCProcessor { // If this was received over our private route, it's okay to respond to a private route via our safety route NetworkResult::value(Destination::private_route( pr.clone(), - detail.safety_selection.clone(), + SafetySelection::Safe(detail.safety_spec.clone()), )) } } @@ -1067,19 +1068,15 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_safety_routed_message( - &self, xxx keep pushing this through - private_route: DHTKey, - safety_selection: SafetySelection, + &self, + sequencing: Sequencing, body: Vec, ) -> EyreResult<()> { let msg = RPCMessageEncoded { header: RPCMessageHeader { - detail: RPCMessageHeaderDetail::PrivateRouted( - RPCMessageHeaderDetailPrivateRouted { - private_route, - safety_selection, - }, - ), + detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { + sequencing, + }), timestamp: intf::get_timestamp(), body_len: body.len() as u64, }, @@ -1100,7 +1097,7 @@ impl RPCProcessor { pub fn enqueue_private_routed_message( &self, private_route: DHTKey, - safety_selection: SafetySelection, + safety_spec: SafetySpec, body: Vec, ) -> EyreResult<()> { let msg = RPCMessageEncoded { @@ -1108,7 +1105,7 @@ impl RPCProcessor { detail: RPCMessageHeaderDetail::PrivateRouted( RPCMessageHeaderDetailPrivateRouted { private_route, - safety_selection, + safety_spec, }, ), timestamp: intf::get_timestamp(), diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 493b4d5c..42d7c4d1 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -68,21 +68,14 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> Result<(), RPCError> { - // Ensure this never came over a private route - match msg.header.detail { - RPCMessageHeaderDetail::Direct(_) => todo!(), - RPCMessageHeaderDetail::PrivateRouted(_) => todo!(), - } - if matches!( - dest, - Destination::PrivateRoute { - private_route: _, - safety_selection: _ + // Ensure this never came over a private route, safety route is okay though + match &msg.header.detail { + RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} + RPCMessageHeaderDetail::PrivateRouted(_) => { + return Err(RPCError::protocol( + "not processing find node request over private route", + )) } - ) { - return Err(RPCError::internal( - "Never send find node requests over private routes", - )); } // Get the question diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index d14877b1..c5ec9f86 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -33,7 +33,7 @@ impl RPCProcessor { pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> { let detail = match msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => detail, - RPCMessageHeaderDetail::PrivateRouted(_) => { + RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { return Err(RPCError::protocol("node_info_update must be direct")); } }; diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 09d8f309..c4ae3b4c 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -42,6 +42,13 @@ impl RPCProcessor { .await => {} ); } + RPCMessageHeaderDetail::SafetyRouted(_) => { + network_result_value_or_log!(debug + network_manager + .handle_safety_receipt(receipt) + .await => {} + ); + } RPCMessageHeaderDetail::PrivateRouted(detail) => { network_result_value_or_log!(debug network_manager diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 63a56482..257a2b71 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -148,8 +148,102 @@ impl RPCProcessor { Ok(()) } + /// Process a routed operation that came in over a safety route but no private route + /// + /// Note: it is important that we never respond with a safety route to questions that come + /// in without a private route. Giving away a safety route when the node id is known is + /// a privacy violation! #[instrument(level = "trace", skip_all, err)] - async fn process_routed_operation( + fn process_safety_routed_operation( + &self, + detail: RPCMessageHeaderDetailDirect, + routed_operation: RoutedOperation, + safety_route: &SafetyRoute, + ) -> Result<(), RPCError> { + // Get sequencing preference + let sequencing = if detail + .connection_descriptor + .protocol_type() + .is_connection_oriented() + { + Sequencing::EnsureOrdered + } else { + Sequencing::NoPreference + }; + + // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) + // xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes? + let node_id_secret = self.routing_table.node_id_secret(); + let dh_secret = self + .crypto + .cached_dh(&safety_route.public_key, &node_id_secret) + .map_err(RPCError::protocol)?; + let body = Crypto::decrypt_aead( + &routed_operation.data, + &routed_operation.nonce, + &dh_secret, + None, + ) + .map_err(RPCError::map_internal( + "decryption of routed operation failed", + ))?; + + // Pass message to RPC system + self.enqueue_safety_routed_message(sequencing, body) + .map_err(RPCError::internal)?; + + Ok(()) + } + + /// Process a routed operation that came in over both a safety route and a private route + #[instrument(level = "trace", skip_all, err)] + fn process_private_routed_operation( + &self, + detail: RPCMessageHeaderDetailDirect, + routed_operation: RoutedOperation, + safety_route: &SafetyRoute, + private_route: &PrivateRoute, + ) -> Result<(), RPCError> { + // Get sender id + let sender_id = detail.envelope.get_sender_id(); + + // Look up the private route and ensure it's one in our spec store + let rss = self.routing_table.route_spec_store(); + let (secret_key, safety_spec) = rss + .validate_signatures( + &private_route.public_key, + &routed_operation.signatures, + &routed_operation.data, + sender_id, + ) + .map_err(RPCError::protocol)? + .ok_or_else(|| RPCError::protocol("signatures did not validate for private route"))?; + + // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) + // xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes? + let dh_secret = self + .crypto + .cached_dh(&safety_route.public_key, &secret_key) + .map_err(RPCError::protocol)?; + let body = Crypto::decrypt_aead( + &routed_operation.data, + &routed_operation.nonce, + &dh_secret, + None, + ) + .map_err(RPCError::map_internal( + "decryption of routed operation failed", + ))?; + + // Pass message to RPC system + self.enqueue_private_routed_message(private_route.public_key, safety_spec, body) + .map_err(RPCError::internal)?; + + Ok(()) + } + + #[instrument(level = "trace", skip_all, err)] + fn process_routed_operation( &self, detail: RPCMessageHeaderDetailDirect, routed_operation: RoutedOperation, @@ -170,67 +264,18 @@ impl RPCProcessor { // If the private route public key is our node id, then this was sent via safety route to our node directly // so there will be no signatures to validate - let (secret_key, safety_selection) = if private_route.public_key - == self.routing_table.node_id() - { + if private_route.public_key == self.routing_table.node_id() { // The private route was a stub - // Return our secret key and an appropriate safety selection - // - // Note: it is important that we never respond with a safety route to questions that come - // in without a private route. Giving away a safety route when the node id is known is - // a privacy violation! - - // Get sequencing preference - let sequencing = if detail - .connection_descriptor - .protocol_type() - .is_connection_oriented() - { - Sequencing::EnsureOrdered - } else { - Sequencing::NoPreference - }; - ( - self.routing_table.node_id_secret(), - SafetySelection::Unsafe(sequencing), - ) + self.process_safety_routed_operation(detail, routed_operation, safety_route) } else { - // Get sender id - let sender_id = detail.envelope.get_sender_id(); - - // Look up the private route and ensure it's one in our spec store - let rss = self.routing_table.route_spec_store(); - rss.validate_signatures( - &private_route.public_key, - &routed_operation.signatures, - &routed_operation.data, - sender_id, + // Both safety and private routes used, should reply with a safety route + self.process_private_routed_operation( + detail, + routed_operation, + safety_route, + private_route, ) - .map_err(RPCError::protocol)? - .ok_or_else(|| RPCError::protocol("signatures did not validate for private route"))? - }; - - // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) - // xxx: punish nodes that send messages that fail to decrypt eventually - let dh_secret = self - .crypto - .cached_dh(&safety_route.public_key, &secret_key) - .map_err(RPCError::protocol)?; - let body = Crypto::decrypt_aead( - &routed_operation.data, - &routed_operation.nonce, - &dh_secret, - None, - ) - .map_err(RPCError::map_internal( - "decryption of routed operation failed", - ))?; - - // Pass message to RPC system - self.enqueue_private_routed_message(private_route.public_key, safety_selection, body) - .map_err(RPCError::internal)?; - - Ok(()) + } } #[instrument(level = "trace", skip(self, msg), err)] @@ -312,8 +357,7 @@ impl RPCProcessor { route.operation, &route.safety_route, &private_route, - ) - .await?; + )?; } } else if blob_tag == 0 { // RouteHop @@ -383,8 +427,7 @@ impl RPCProcessor { route.operation, &route.safety_route, private_route, - ) - .await?; + )?; } } } diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 63fbcb79..0180fb6c 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -2,13 +2,26 @@ use super::*; impl RPCProcessor { // Sends a unidirectional signal to a node - // Can be sent via all methods including relays and routes + // Can be sent via relays but not routes. For routed 'signal' like capabilities, use AppMessage. #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_signal( self, dest: Destination, signal_info: SignalInfo, ) -> Result, RPCError> { + // Ensure destination never has a private route + if matches!( + dest, + Destination::PrivateRoute { + private_route: _, + safety_selection: _ + } + ) { + return Err(RPCError::internal( + "Never send signal requests over private routes", + )); + } + let signal = RPCOperationSignal { signal_info }; let statement = RPCStatement::new(RPCStatementDetail::Signal(signal)); @@ -20,6 +33,15 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_signal(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Can't allow anything other than direct packets here, as handling reverse connections + // or anything like via signals over private routes would deanonymize the route + match &msg.header.detail { + RPCMessageHeaderDetail::Direct(_) => {} + RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { + return Err(RPCError::protocol("signal must be direct")); + } + }; + // Get the statement let signal = match msg.operation.into_kind() { RPCOperationKind::Statement(s) => match s.into_detail() { diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index af02a6c3..ab8b83d8 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -1,30 +1,79 @@ use super::*; +#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)] +pub struct SenderInfo { + pub socket_address: SocketAddress, +} + impl RPCProcessor { // Send StatusQ RPC request, receive StatusA answer - // Can be sent via relays, but not via routes + // Can be sent via relays or routes, but will have less information via routes + // sender: + // unsafe -> node status + // safe -> nothing + // receiver: + // direct -> node status + sender info + // safety -> node status + // private -> nothing #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_status( self, - peer: NodeRef, - ) -> Result>, RPCError> { - let routing_domain = match peer.best_routing_domain() { - Some(rd) => rd, - None => { - return Ok(NetworkResult::no_connection_other( - "no routing domain for peer", - )) + dest: Destination, + ) -> Result>>, RPCError> { + let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() { + SafetySelection::Unsafe(_) => { + let (opt_target_nr, routing_domain) = match &dest { + Destination::Direct { + target, + safety_selection: _, + } => { + let routing_domain = match target.best_routing_domain() { + Some(rd) => rd, + None => { + return Ok(NetworkResult::no_connection_other( + "no routing domain for target", + )) + } + }; + (Some(target.clone()), routing_domain) + } + Destination::Relay { + relay, + target, + safety_selection: _, + } => { + let opt_target_nr = self.routing_table.lookup_node_ref(*target); + let routing_domain = match relay.best_routing_domain() { + Some(rd) => rd, + None => { + return Ok(NetworkResult::no_connection_other( + "no routing domain for peer", + )) + } + }; + (opt_target_nr, routing_domain) + } + Destination::PrivateRoute { + private_route: _, + safety_selection: _, + } => (None, RoutingDomain::PublicInternet), + }; + + let node_status = Some(self.network_manager().generate_node_status(routing_domain)); + (opt_target_nr, routing_domain, node_status) + } + SafetySelection::Safe(_) => { + let routing_domain = RoutingDomain::PublicInternet; + let node_status = None; + (None, routing_domain, node_status) } }; - let node_status = self.network_manager().generate_node_status(routing_domain); + let status_q = RPCOperationStatusQ { node_status }; let question = RPCQuestion::new(RespondTo::Sender, RPCQuestionDetail::StatusQ(status_q)); // Send the info request - let waitable_reply = network_result_try!( - self.question(Destination::direct(peer.clone()), question) - .await? - ); + let waitable_reply = network_result_try!(self.question(dest.clone(), question).await?); // Note what kind of ping this was and to what peer scope let send_data_kind = waitable_reply.send_data_kind; @@ -45,74 +94,90 @@ impl RPCProcessor { }; // Ensure the returned node status is the kind for the routing domain we asked for - match routing_domain { - RoutingDomain::PublicInternet => { - if !matches!(status_a.node_status, NodeStatus::PublicInternet(_)) { - return Ok(NetworkResult::invalid_message( - "node status doesn't match PublicInternet routing domain", - )); - } - } - RoutingDomain::LocalNetwork => { - if !matches!(status_a.node_status, NodeStatus::LocalNetwork(_)) { - return Ok(NetworkResult::invalid_message( - "node status doesn't match LocalNetwork routing domain", - )); + if let Some(target_nr) = opt_target_nr { + if let Some(node_status) = status_a.node_status { + match routing_domain { + RoutingDomain::PublicInternet => { + if !matches!(node_status, NodeStatus::PublicInternet(_)) { + return Ok(NetworkResult::invalid_message( + "node status doesn't match PublicInternet routing domain", + )); + } + } + RoutingDomain::LocalNetwork => { + if !matches!(node_status, NodeStatus::LocalNetwork(_)) { + return Ok(NetworkResult::invalid_message( + "node status doesn't match LocalNetwork routing domain", + )); + } + } } + + // Update latest node status in routing table + target_nr.update_node_status(node_status); } } - // Update latest node status in routing table - peer.update_node_status(status_a.node_status); - // Report sender_info IP addresses to network manager // Don't need to validate these addresses for the current routing domain // the address itself is irrelevant, and the remote node can lie anyway - if let Some(socket_address) = status_a.sender_info.socket_address { - match send_data_kind { - SendDataKind::Direct(connection_descriptor) => match routing_domain { - RoutingDomain::PublicInternet => self - .network_manager() - .report_public_internet_socket_address( - socket_address, - connection_descriptor, - peer, - ), - RoutingDomain::LocalNetwork => { - self.network_manager().report_local_network_socket_address( - socket_address, - connection_descriptor, - peer, - ) + let mut opt_sender_info = None; + match dest { + Destination::Direct { + target, + safety_selection, + } => { + if matches!(safety_selection, SafetySelection::Unsafe(_)) { + if let Some(sender_info) = status_a.sender_info { + match send_data_kind { + SendDataKind::Direct(connection_descriptor) => { + // Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as + // If this changes, we'd want to know about that to reset the networking stack + match routing_domain { + RoutingDomain::PublicInternet => self + .network_manager() + .report_public_internet_socket_address( + sender_info.socket_address, + connection_descriptor, + target, + ), + RoutingDomain::LocalNetwork => { + self.network_manager().report_local_network_socket_address( + sender_info.socket_address, + connection_descriptor, + target, + ) + } + } + opt_sender_info = Some(sender_info.clone()); + } + SendDataKind::Indirect => { + // Do nothing in this case, as the socket address returned here would be for any node other than ours + } + SendDataKind::Existing(_) => { + // Do nothing in this case, as an existing connection could not have a different public address or it would have been reset + } + }; } - }, - SendDataKind::Indirect => { - // Do nothing in this case, as the socket address returned here would be for any node other than ours - } - SendDataKind::Existing(_) => { - // Do nothing in this case, as an existing connection could not have a different public address or it would have been reset } } - } - - Ok(NetworkResult::value(Answer::new( - latency, - status_a.sender_info, - ))) + Destination::Relay { + relay: _, + target: _, + safety_selection: _, + } + | Destination::PrivateRoute { + private_route: _, + safety_selection: _, + } => { + // sender info is irrelevant over relays and routes + } + }; + Ok(NetworkResult::value(Answer::new(latency, opt_sender_info))) } #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { - let detail = match &msg.header.detail { - RPCMessageHeaderDetail::Direct(detail) => detail, - RPCMessageHeaderDetail::PrivateRouted(_) => { - return Err(RPCError::protocol("status_q must be direct")); - } - }; - - let connection_descriptor = detail.connection_descriptor; - let routing_domain = detail.routing_domain; - // Get the question let status_q = match msg.operation.kind() { RPCOperationKind::Question(q) => match q.detail() { @@ -122,36 +187,55 @@ impl RPCProcessor { _ => panic!("not a question"), }; - // Ensure the node status from the question is the kind for the routing domain we received the request in - match routing_domain { - RoutingDomain::PublicInternet => { - if !matches!(status_q.node_status, NodeStatus::PublicInternet(_)) { - log_rpc!(debug "node status doesn't match PublicInternet routing domain"); - return Ok(()); + let (node_status, sender_info) = match &msg.header.detail { + RPCMessageHeaderDetail::Direct(detail) => { + let connection_descriptor = detail.connection_descriptor; + let routing_domain = detail.routing_domain; + + // Ensure the node status from the question is the kind for the routing domain we received the request in + if let Some(node_status) = &status_q.node_status { + match routing_domain { + RoutingDomain::PublicInternet => { + if !matches!(node_status, NodeStatus::PublicInternet(_)) { + log_rpc!(debug "node status doesn't match PublicInternet routing domain"); + return Ok(()); + } + } + RoutingDomain::LocalNetwork => { + if !matches!(node_status, NodeStatus::LocalNetwork(_)) { + log_rpc!(debug "node status doesn't match LocalNetwork routing domain"); + return Ok(()); + } + } + } + + // update node status for the requesting node to our routing table + if let Some(sender_nr) = msg.opt_sender_nr.clone() { + // Update latest node status in routing table for the statusq sender + sender_nr.update_node_status(node_status.clone()); + } } + + // Get the peer address in the returned sender info + let sender_info = SenderInfo { + socket_address: *connection_descriptor.remote_address(), + }; + + // Make status answer + let node_status = self.network_manager().generate_node_status(routing_domain); + (Some(node_status), Some(sender_info)) } - RoutingDomain::LocalNetwork => { - if !matches!(status_q.node_status, NodeStatus::LocalNetwork(_)) { - log_rpc!(debug "node status doesn't match LocalNetwork routing domain"); - return Ok(()); - } + RPCMessageHeaderDetail::SafetyRouted(_) => { + // Make status answer + let node_status = self + .network_manager() + .generate_node_status(RoutingDomain::PublicInternet); + (Some(node_status), None) } - } - - // update node status for the requesting node to our routing table - if let Some(sender_nr) = msg.opt_sender_nr.clone() { - // Update latest node status in routing table for the statusq sender - sender_nr.update_node_status(status_q.node_status.clone()); - } - - // Make status answer - let node_status = self.network_manager().generate_node_status(routing_domain); - - // Get the peer address in the returned sender info - let sender_info = SenderInfo { - socket_address: Some(*connection_descriptor.remote_address()), + RPCMessageHeaderDetail::PrivateRouted(_) => (None, None), }; + // Make status answer let status_a = RPCOperationStatusA { node_status, sender_info, diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 11de3e23..14e3c905 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -35,7 +35,8 @@ impl RPCProcessor { // Wait for receipt match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => { + | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } + | ReceiptEvent::ReturnedSafety => { log_net!(debug "validate_dial_info receipt should be returned out-of-band".green()); Ok(false) } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index e990ebfc..5fe6173a 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -3,6 +3,7 @@ use super::*; use routing_table::*; +use rpc_processor::*; fn get_bucket_entry_state(text: &str) -> Option { if text == "dead" { @@ -397,7 +398,7 @@ impl VeilidAPI { // Dump routing table entry let out = match rpc - .rpc_call_status(nr) + .rpc_call_status(Destination::direct(nr)) .await .map_err(VeilidAPIError::internal)? { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 16a7ef4f..e30ef957 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -366,11 +366,6 @@ impl BlockId { ///////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default, Serialize, Deserialize)] -pub struct SenderInfo { - pub socket_address: Option, -} - // Keep member order appropriate for sorting < preference #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] pub enum DialInfoClass { @@ -420,7 +415,7 @@ pub enum Stability { Reliable, } -/// The choice of safety route including in compiled routes +/// The choice of safety route to include in compiled routes #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub enum SafetySelection { /// Don't use a safety route, only specify the sequencing preference