diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index 29234728..10ae1a95 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -82,6 +82,7 @@ impl ConnectionManager { } // Returns a network connection if one already is established + pub async fn get_connection( &self, descriptor: ConnectionDescriptor, diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index a287827d..c31d4dbc 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -432,7 +432,7 @@ impl NetworkManager { } // Process a received out-of-band receipt - pub async fn process_out_of_band_receipt>( + pub async fn handle_out_of_band_receipt>( &self, receipt_data: R, descriptor: ConnectionDescriptor, @@ -455,7 +455,7 @@ impl NetworkManager { } // Process a received in-band receipt - pub async fn process_in_band_receipt>( + pub async fn handle_in_band_receipt>( &self, receipt_data: R, inbound_nr: NodeRef, @@ -468,6 +468,57 @@ impl NetworkManager { receipt_manager.handle_receipt(inbound_nr, receipt).await } + // Process a received signal + pub async fn handle_signal(&self, signal_info: SignalInfo) -> Result<(), String> { + match signal_info { + SignalInfo::ReverseConnect { receipt, peer_info } => { + let routing_table = self.routing_table(); + let rpc = self.rpc_processor(); + + // Add the peer info to our routing table + let peer_nr = routing_table + .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; + + // Make a reverse connection to the peer and send the receipt to it + rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) + .await + .map_err(map_to_string)?; + } + SignalInfo::HolePunch { receipt, peer_info } => { + let routing_table = self.routing_table(); + + // Add the peer info to our routing table + let peer_nr = routing_table + .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; + + // Get the udp direct dialinfo for the hole punch + let hole_punch_dial_info = if let Some(hpdi) = peer_nr + .node_info() + .first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP)) + { + hpdi + } else { + return Err("No hole punch capable dialinfo found for node".to_owned()); + }; + + // Do our half of the hole punch by sending an empty packet + // Both sides will do this and then the receipt will get sent over the punched hole + self.net() + .send_data_to_dial_info(hole_punch_dial_info.clone(), Vec::new()) + .await?; + + // XXX: do we need a delay here? or another hole punch packet? + + // Return the receipt over the direct channel since we want to use exactly the same dial info + self.send_direct_receipt(hole_punch_dial_info, receipt, false) + .await + .map_err(map_to_string)?; + } + } + + Ok(()) + } + // Builds an envelope for sending over the network fn build_envelope>( &self, @@ -703,7 +754,7 @@ impl NetworkManager { } // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection() { + if let Some(descriptor) = inbound_nr.last_connection().await { match self .net() .send_data_to_existing_connection(descriptor, data) @@ -787,7 +838,7 @@ impl NetworkManager { } // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection() { + if let Some(descriptor) = inbound_nr.last_connection().await { match self .net() .send_data_to_existing_connection(descriptor, data) @@ -819,7 +870,7 @@ impl NetworkManager { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on - let data = if let Some(descriptor) = node_ref.last_connection() { + let data = if let Some(descriptor) = node_ref.last_connection().await { match this .net() .send_data_to_existing_connection(descriptor, data) @@ -874,7 +925,7 @@ impl NetworkManager { // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { - self.process_out_of_band_receipt(data, descriptor).await?; + self.handle_out_of_band_receipt(data, descriptor).await?; return Ok(true); } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index ede93f87..783ba043 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -21,7 +21,7 @@ const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; // Keepalive pings are done occasionally to ensure holepunched public dialinfo // remains valid, as well as to make sure we remain in any relay node's routing table -const KEEPALIVE_PING_INTERVAL_SECS: u32 = 30; +const KEEPALIVE_PING_INTERVAL_SECS: u32 = 20; // Do not change order here, it will mess up other sorts #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -131,8 +131,8 @@ impl BucketEntry { self.last_connection = Some((last_connection, timestamp)); } - pub fn last_connection(&self) -> Option { - self.last_connection.as_ref().map(|x| x.0) + pub fn last_connection(&self) -> Option<(ConnectionDescriptor, u64)> { + self.last_connection } pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) { diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index ef86bd0b..6d8c96c9 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -2,6 +2,10 @@ use super::*; use crate::dht::*; use alloc::fmt; +// Connectionless protocols like UDP are dependent on a NAT translation timeout +// We should ping them with some frequency and 30 seconds is typical timeout +const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29; + pub struct NodeRef { routing_table: RoutingTable, node_id: DHTKey, @@ -42,9 +46,35 @@ impl NodeRef { pub fn set_seen_our_node_info(&self) { self.operate(|e| e.set_seen_our_node_info(true)); } - pub fn last_connection(&self) -> Option { - self.operate(|e| e.last_connection()) + pub async fn last_connection(&self) -> Option { + // Get the last connection and the last time we saw anything with this connection + let (last_connection, last_seen) = self.operate(|e| { + if let Some((last_connection, connection_ts)) = e.last_connection() { + if let Some(last_seen) = e.peer_stats().last_seen { + Some((last_connection, u64::max(last_seen, connection_ts))) + } else { + Some((last_connection, connection_ts)) + } + } else { + None + } + })?; + // Should we check the connection table? + if last_connection.protocol_type().is_connection_oriented() { + // Look the connection up in the connection manager and see if it's still there + let connection_manager = self.routing_table.network_manager().connection_manager(); + connection_manager.get_connection(last_connection).await?; + } else { + // If this is not connection oriented, then we check our last seen time + // to see if this mapping has expired (beyond our timeout) + let cur_ts = intf::get_timestamp(); + if (last_seen + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) < cur_ts { + return None; + } + } + Some(last_connection) } + pub fn has_any_dial_info(&self) -> bool { self.operate(|e| e.node_info().has_any_dial_info() || e.local_node_info().has_dial_info()) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 6808c989..3f4cd92d 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -791,16 +791,18 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// - fn generate_sender_info(&self, rpcreader: &RPCMessageReader) -> SenderInfo { - let socket_address = rpcreader - .header - .peer_noderef - .operate(|entry| entry.last_connection().map(|c| c.remote.socket_address)); + async fn generate_sender_info(&self, peer_noderef: NodeRef) -> SenderInfo { + let socket_address = peer_noderef + .last_connection() + .await + .map(|c| c.remote.socket_address); SenderInfo { socket_address } } async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - // + let peer_noderef = rpcreader.header.peer_noderef.clone(); + let sender_info = self.generate_sender_info(peer_noderef).await; + let reply_msg = { let operation = rpcreader .reader @@ -849,7 +851,6 @@ impl RPCProcessor { encode_node_status(&node_status, &mut nsb)?; // Add sender info - let sender_info = self.generate_sender_info(&rpcreader); let mut sib = info_a.reborrow().init_sender_info(); encode_sender_info(&sender_info, &mut sib)?; @@ -1078,8 +1079,35 @@ impl RPCProcessor { Err(rpc_error_unimplemented("process_find_block_q")) } - async fn process_signal(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_signal")) + async fn process_signal(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { + let signal_info = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // This should never want an answer + if self.wants_answer(&operation)? { + return Err(RPCError::InvalidFormat); + } + + // get signal reader + let sig_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::Signal(Ok(x))) => x, + _ => panic!("invalid operation type in process_signal"), + }; + + // Get signal info + decode_signal_info(&sig_reader)? + }; + + // Handle it + let network_manager = self.network_manager(); + network_manager + .handle_signal(signal_info) + .await + .map_err(map_error_string!()) } async fn process_return_receipt(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { @@ -1112,7 +1140,7 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); network_manager - .process_in_band_receipt(rcpt_data, rpcreader.header.peer_noderef) + .handle_in_band_receipt(rcpt_data, rpcreader.header.peer_noderef) .await .map_err(map_error_string!()) } @@ -1612,5 +1640,41 @@ impl RPCProcessor { Ok(()) } + // Sends a unidirectional in-band return receipt + // Can be sent via all methods including relays and routes + pub async fn rpc_call_return_receipt>( + &self, + dest: Destination, + safety_route: Option<&SafetyRouteSpec>, + rcpt_data: B, + ) -> Result<(), RPCError> { + // Validate receipt before we send it, otherwise this may be arbitrary data! + let _ = Receipt::from_signed_data(rcpt_data.as_ref()) + .map_err(|_| "failed to validate direct receipt".to_owned()) + .map_err(map_error_string!())?; + + let rr_msg = { + let mut rr_msg = ::capnp::message::Builder::new_default(); + let mut question = rr_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let rr_builder = detail.init_return_receipt(); + + let r_builder = rr_builder.init_receipt(rcpt_data.as_ref().len().try_into().map_err( + map_error_protocol!("invalid receipt length in return receipt"), + )?); + r_builder.copy_from_slice(rcpt_data.as_ref()); + + rr_msg.into_reader() + }; + + // Send the return receipt request + self.request(dest, rr_msg, safety_route).await?; + + Ok(()) + } + // xxx do not process latency for routed messages } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index fb06fa90..5782e70d 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -421,6 +421,15 @@ pub enum ProtocolType { WSS, } +impl ProtocolType { + pub fn is_connection_oriented(&self) -> bool { + matches!( + self, + ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS + ) + } +} + #[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] pub struct ProtocolSet { pub udp: bool,