From 5ce238d4fd2d9e1587271978154118a7e30eebbb Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 1 Mar 2025 16:26:06 -0500 Subject: [PATCH] [ci skip] fix TTL race condition in hole punch. check relays with both unordered and ordered protocols in ping validator. fix 'make_not_dead' to only run when nodes are actualy dead --- .../src/network_manager/native/network_udp.rs | 3 +- .../network_manager/native/protocol/udp.rs | 111 ++++++++++++----- veilid-core/src/routing_table/bucket_entry.rs | 15 ++- .../src/routing_table/tasks/ping_validator.rs | 113 +++++++++++------- veilid-core/src/rpc_processor/destination.rs | 2 +- veilid-core/src/rpc_processor/mod.rs | 6 +- 6 files changed, 166 insertions(+), 84 deletions(-) diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index a3d7267f..2e370ed9 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -118,7 +118,8 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let protocol_handler = RawUdpProtocolHandler::new(self.registry(), socket_arc); + let protocol_handler = + RawUdpProtocolHandler::new(self.registry(), socket_arc, addr.is_ipv6()); // Record protocol handler let mut inner = self.inner.lock(); diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 81cee3f5..d152c1a9 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -7,16 +7,30 @@ pub struct RawUdpProtocolHandler { registry: VeilidComponentRegistry, socket: Arc, assembly_buffer: AssemblyBuffer, + is_ipv6: bool, + default_ttl: u32, + current_ttl: Arc>, } impl_veilid_component_registry_accessor!(RawUdpProtocolHandler); impl RawUdpProtocolHandler { - pub fn new(registry: VeilidComponentRegistry, socket: Arc) -> Self { + pub fn new(registry: VeilidComponentRegistry, socket: Arc, is_ipv6: bool) -> Self { + // Get original TTL + let default_ttl = if is_ipv6 { + socket2_operation(socket.as_ref(), |s| s.unicast_hops_v6()) + .expect("getting IPV6_UNICAST_HOPS should not fail") + } else { + socket2_operation(socket.as_ref(), |s| s.ttl()).expect("getting IP_TTL should not fail") + }; + Self { registry, socket, assembly_buffer: AssemblyBuffer::new(), + is_ipv6, + default_ttl, + current_ttl: Arc::new(AsyncMutex::new(default_ttl)), } } @@ -104,24 +118,35 @@ impl RawUdpProtocolHandler { return Ok(NetworkResult::no_connection_other("punished")); } - // Fragment and send - let sender = |framed_chunk: Vec, remote_addr: SocketAddr| async move { - let len = network_result_try!(self - .socket - .send_to(&framed_chunk, remote_addr) - .await - .into_network_result()?); - if len != framed_chunk.len() { - bail_io_error_other!("UDP partial send") + // Ensure the TTL for sent packets is the default, + // then fragment and send the packets + { + let current_ttl = self.current_ttl.lock().await; + if *current_ttl != self.default_ttl { + veilid_log!(self error "Incorrect TTL on sent UDP packet ({} != {}): len={}, remote_addr={:?}", *current_ttl, self.default_ttl, data.len(), remote_addr); } - Ok(NetworkResult::value(())) - }; - network_result_try!( - self.assembly_buffer - .split_message(data, remote_addr, sender) - .await? - ); + // Fragment and send + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| async move { + let len = network_result_try!(self + .socket + .send_to(&framed_chunk, remote_addr) + .await + .into_network_result()?); + if len != framed_chunk.len() { + bail_io_error_other!("UDP partial send") + } + + veilid_log!(self trace "udp::send_message:chunk(len={}) {:?}", len, remote_addr); + Ok(NetworkResult::value(())) + }; + + network_result_try!( + self.assembly_buffer + .split_message(data, remote_addr, sender) + .await? + ); + } // Return a flow for the sent message let peer_addr = PeerAddress::new( @@ -157,22 +182,44 @@ impl RawUdpProtocolHandler { return Ok(NetworkResult::no_connection_other("punished")); } - // Get synchronous socket - let res = socket2_operation(self.socket.as_ref(), |s| { - // Get original TTL - let original_ttl = s.ttl()?; + // Ensure the TTL for sent packets is the default, + // then fragment and send the packets + let res = { + let mut current_ttl = self.current_ttl.lock().await; + if *current_ttl != self.default_ttl { + veilid_log!(self error "Incorrect TTL before sending holepunch UDP packet ({} != {}): remote_addr={:?}", *current_ttl, self.default_ttl, remote_addr); + } - // Set TTL - s.set_ttl(ttl)?; + // Get synchronous socket + socket2_operation(self.socket.as_ref(), |s| { + // Set TTL + let ttl_res = if self.is_ipv6 { + s.set_unicast_hops_v6(ttl) + } else { + s.set_ttl(ttl) + }; + ttl_res.inspect_err(|e| { + veilid_log!(self error "Failed to set TTL on holepunch UDP socket: {} remote_addr={:?}", e, remote_addr); + })?; + *current_ttl = ttl; - // Send zero length packet - let res = s.send_to(&[], &remote_addr.into()); + // Send zero length packet + let res = s.send_to(&[], &remote_addr.into()); - // Restore TTL immediately - s.set_ttl(original_ttl)?; + // Restore TTL immediately + let ttl_res = if self.is_ipv6 { + s.set_unicast_hops_v6(self.default_ttl) + } else { + s.set_ttl(self.default_ttl) + }; + ttl_res.inspect_err(|e| { + veilid_log!(self error "Failed to reset TTL on holepunch UDP socket: {} remote_addr={:?}", e, remote_addr); + })?; + *current_ttl = self.default_ttl; - res - }); + res + }) + }; // Check for errors let len = network_result_try!(res.into_network_result()?); @@ -208,6 +255,10 @@ impl RawUdpProtocolHandler { let local_socket_addr = compatible_unspecified_socket_addr(socket_addr); let socket = bind_async_udp_socket(local_socket_addr)? .ok_or(io::Error::from(io::ErrorKind::AddrInUse))?; - Ok(RawUdpProtocolHandler::new(registry, Arc::new(socket))) + Ok(RawUdpProtocolHandler::new( + registry, + Arc::new(socket), + local_socket_addr.is_ipv6(), + )) } } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 7f4bb9b4..c0f81088 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -23,7 +23,7 @@ const UNRELIABLE_PING_SPAN_SECS: u32 = 60; const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; /// - Number of consecutive lost answers on an unordered protocol we will /// tolerate before we call something unreliable -const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 1; +const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 2; /// - Number of consecutive lost answers on an ordered protocol we will /// tolerate before we call something unreliable const UNRELIABLE_LOST_ANSWERS_ORDERED: u32 = 0; @@ -1068,11 +1068,14 @@ impl BucketEntryInner { } pub(super) fn make_not_dead(&mut self, cur_ts: Timestamp) { - self.peer_stats.rpc_stats.last_seen_ts = None; - self.peer_stats.rpc_stats.failed_to_send = 0; - self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0; - self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0; - assert!(self.check_dead(cur_ts).is_none()); + if self.check_dead(cur_ts).is_some() { + self.peer_stats.rpc_stats.last_seen_ts = None; + self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; + self.peer_stats.rpc_stats.failed_to_send = 0; + self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0; + self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0; + assert!(self.check_dead(cur_ts).is_none()); + } } pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String { diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index d71be651..6824beff 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -95,39 +95,17 @@ impl RoutingTable { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Ping the relay to keep it alive, over every protocol it is relaying for us - #[instrument(level = "trace", skip(self, futurequeue), err)] - async fn relay_keepalive_public_internet( + // Get protocol-specific noderefs for a relay to determine its liveness + // Relays get pinged over more protocols than non-relay nodes because we need to ensure + // that they can reliably forward packets with 'all' sequencing, not just over 'any' sequencing + fn get_relay_specific_noderefs( &self, - cur_ts: Timestamp, - futurequeue: &mut VecDeque, - ) -> EyreResult<()> { - // Get the PublicInternet relay if we are using one - let Some(relay_nr) = self.relay_node(RoutingDomain::PublicInternet) else { - return Ok(()); - }; - + relay_nr: FilteredNodeRef, + routing_domain: RoutingDomain, + ) -> Vec { // Get our publicinternet dial info - let dids = self.all_filtered_dial_info_details( - RoutingDomain::PublicInternet.into(), - &DialInfoFilter::all(), - ); - - let opt_relay_keepalive_ts = self.relay_node_last_keepalive(RoutingDomain::PublicInternet); - let relay_needs_keepalive = opt_relay_keepalive_ts - .map(|kts| { - cur_ts.saturating_sub(kts).as_u64() - >= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64) - }) - .unwrap_or(true); - - if !relay_needs_keepalive { - return Ok(()); - } - // Say we're doing this keepalive now - self.inner - .write() - .set_relay_node_last_keepalive(RoutingDomain::PublicInternet, cur_ts); + let dids = + self.all_filtered_dial_info_details(routing_domain.into(), &DialInfoFilter::all()); // We need to keep-alive at one connection per ordering for relays // but also one per NAT mapping that we need to keep open for our inbound dial info @@ -180,6 +158,41 @@ impl RoutingTable { relay_noderefs.push(relay_nr); } + relay_noderefs + } + + // Ping the relay to keep it alive, over every protocol it is relaying for us + #[instrument(level = "trace", skip(self, futurequeue), err)] + async fn relay_keepalive_public_internet( + &self, + cur_ts: Timestamp, + futurequeue: &mut VecDeque, + ) -> EyreResult<()> { + // Get the PublicInternet relay if we are using one + let Some(relay_nr) = self.relay_node(RoutingDomain::PublicInternet) else { + return Ok(()); + }; + + let opt_relay_keepalive_ts = self.relay_node_last_keepalive(RoutingDomain::PublicInternet); + let relay_needs_keepalive = opt_relay_keepalive_ts + .map(|kts| { + cur_ts.saturating_sub(kts).as_u64() + >= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64) + }) + .unwrap_or(true); + + if !relay_needs_keepalive { + return Ok(()); + } + // Say we're doing this keepalive now + self.inner + .write() + .set_relay_node_last_keepalive(RoutingDomain::PublicInternet, cur_ts); + + // Get the sequencing-specific relay noderefs for this relay + let relay_noderefs = + self.get_relay_specific_noderefs(relay_nr, RoutingDomain::PublicInternet); + for relay_nr_filtered in relay_noderefs { futurequeue.push_back( async move { @@ -249,24 +262,36 @@ impl RoutingTable { futurequeue: &mut VecDeque, ) -> EyreResult<()> { // Get all nodes needing pings in the PublicInternet routing domain + let relay_node_filter = self.make_public_internet_relay_node_filter(); let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); // Just do a single ping with the best protocol for all the other nodes to check for liveness for nr in node_refs { - let nr = nr.sequencing_clone(Sequencing::PreferOrdered); + // If the node is relay-capable, we should ping it over ALL sequencing types + // instead of just a simple liveness check on ANY best contact method - futurequeue.push_back( - async move { - #[cfg(feature = "verbose-tracing")] - veilid_log!(nr debug "--> PublicInternet Validator ping to {:?}", nr); - let rpc_processor = nr.rpc_processor(); - let _ = rpc_processor - .rpc_call_status(Destination::direct(nr)) - .await?; - Ok(()) - } - .boxed(), - ); + let all_noderefs = if nr.operate(|_rti, e| !relay_node_filter(e)) { + // If this is a relay capable node, get all the sequencing specific noderefs + self.get_relay_specific_noderefs(nr, RoutingDomain::PublicInternet) + } else { + // If a non-relay node, ping with the normal ping type + vec![nr.sequencing_clone(Sequencing::PreferOrdered)] + }; + + for nr in all_noderefs { + futurequeue.push_back( + async move { + #[cfg(feature = "verbose-tracing")] + veilid_log!(nr debug "--> PublicInternet Validator ping to {:?}", nr); + let rpc_processor = nr.rpc_processor(); + let _ = rpc_processor + .rpc_call_status(Destination::direct(nr)) + .await?; + Ok(()) + } + .boxed(), + ); + } } Ok(()) diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index ef063993..6c75db82 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -206,7 +206,7 @@ impl Destination { } if opt_routing_domain.is_none() { // In the case of an unexpected relay, log it and don't pass any sender peer info into an unexpected relay - veilid_log!(node warn "No routing domain for relay: relay={}, node={}", relay, node); + veilid_log!(node debug "Unexpected relay: relay={}, node={}", relay, node); }; ( diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 49260ae3..b463c752 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -817,8 +817,10 @@ impl RPCProcessor { return SenderPeerInfo::default(); }; let Some(routing_domain) = opt_routing_domain else { - // No routing domain for target, no node info - // Only a stale connection or no connection exists + // No routing domain for target, no node info is safe to send here + // Only a stale connection or no connection exists, or an unexpected + // relay was used, possibly due to the destination switching relays + // in a race condition with our send return SenderPeerInfo::default(); };