[ci skip] fix TTL race condition in hole punch. check relays with both unordered and ordered protocols in ping validator. fix 'make_not_dead' to only run when nodes are actualy dead

This commit is contained in:
Christien Rioux 2025-03-01 16:26:06 -05:00
parent f79198c545
commit 5ce238d4fd
6 changed files with 166 additions and 84 deletions

View File

@ -118,7 +118,8 @@ impl Network {
let socket_arc = Arc::new(udp_socket); let socket_arc = Arc::new(udp_socket);
// Create protocol handler // Create protocol handler
let protocol_handler = RawUdpProtocolHandler::new(self.registry(), socket_arc); let protocol_handler =
RawUdpProtocolHandler::new(self.registry(), socket_arc, addr.is_ipv6());
// Record protocol handler // Record protocol handler
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();

View File

@ -7,16 +7,30 @@ pub struct RawUdpProtocolHandler {
registry: VeilidComponentRegistry, registry: VeilidComponentRegistry,
socket: Arc<UdpSocket>, socket: Arc<UdpSocket>,
assembly_buffer: AssemblyBuffer, assembly_buffer: AssemblyBuffer,
is_ipv6: bool,
default_ttl: u32,
current_ttl: Arc<AsyncMutex<u32>>,
} }
impl_veilid_component_registry_accessor!(RawUdpProtocolHandler); impl_veilid_component_registry_accessor!(RawUdpProtocolHandler);
impl RawUdpProtocolHandler { impl RawUdpProtocolHandler {
pub fn new(registry: VeilidComponentRegistry, socket: Arc<UdpSocket>) -> Self { pub fn new(registry: VeilidComponentRegistry, socket: Arc<UdpSocket>, is_ipv6: bool) -> Self {
// Get original TTL
let default_ttl = if is_ipv6 {
socket2_operation(socket.as_ref(), |s| s.unicast_hops_v6())
.expect("getting IPV6_UNICAST_HOPS should not fail")
} else {
socket2_operation(socket.as_ref(), |s| s.ttl()).expect("getting IP_TTL should not fail")
};
Self { Self {
registry, registry,
socket, socket,
assembly_buffer: AssemblyBuffer::new(), assembly_buffer: AssemblyBuffer::new(),
is_ipv6,
default_ttl,
current_ttl: Arc::new(AsyncMutex::new(default_ttl)),
} }
} }
@ -104,24 +118,35 @@ impl RawUdpProtocolHandler {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
// Fragment and send // Ensure the TTL for sent packets is the default,
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| async move { // then fragment and send the packets
let len = network_result_try!(self {
.socket let current_ttl = self.current_ttl.lock().await;
.send_to(&framed_chunk, remote_addr) if *current_ttl != self.default_ttl {
.await veilid_log!(self error "Incorrect TTL on sent UDP packet ({} != {}): len={}, remote_addr={:?}", *current_ttl, self.default_ttl, data.len(), remote_addr);
.into_network_result()?);
if len != framed_chunk.len() {
bail_io_error_other!("UDP partial send")
} }
Ok(NetworkResult::value(()))
};
network_result_try!( // Fragment and send
self.assembly_buffer let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| async move {
.split_message(data, remote_addr, sender) let len = network_result_try!(self
.await? .socket
); .send_to(&framed_chunk, remote_addr)
.await
.into_network_result()?);
if len != framed_chunk.len() {
bail_io_error_other!("UDP partial send")
}
veilid_log!(self trace "udp::send_message:chunk(len={}) {:?}", len, remote_addr);
Ok(NetworkResult::value(()))
};
network_result_try!(
self.assembly_buffer
.split_message(data, remote_addr, sender)
.await?
);
}
// Return a flow for the sent message // Return a flow for the sent message
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
@ -157,22 +182,44 @@ impl RawUdpProtocolHandler {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
// Get synchronous socket // Ensure the TTL for sent packets is the default,
let res = socket2_operation(self.socket.as_ref(), |s| { // then fragment and send the packets
// Get original TTL let res = {
let original_ttl = s.ttl()?; let mut current_ttl = self.current_ttl.lock().await;
if *current_ttl != self.default_ttl {
veilid_log!(self error "Incorrect TTL before sending holepunch UDP packet ({} != {}): remote_addr={:?}", *current_ttl, self.default_ttl, remote_addr);
}
// Set TTL // Get synchronous socket
s.set_ttl(ttl)?; socket2_operation(self.socket.as_ref(), |s| {
// Set TTL
let ttl_res = if self.is_ipv6 {
s.set_unicast_hops_v6(ttl)
} else {
s.set_ttl(ttl)
};
ttl_res.inspect_err(|e| {
veilid_log!(self error "Failed to set TTL on holepunch UDP socket: {} remote_addr={:?}", e, remote_addr);
})?;
*current_ttl = ttl;
// Send zero length packet // Send zero length packet
let res = s.send_to(&[], &remote_addr.into()); let res = s.send_to(&[], &remote_addr.into());
// Restore TTL immediately // Restore TTL immediately
s.set_ttl(original_ttl)?; let ttl_res = if self.is_ipv6 {
s.set_unicast_hops_v6(self.default_ttl)
} else {
s.set_ttl(self.default_ttl)
};
ttl_res.inspect_err(|e| {
veilid_log!(self error "Failed to reset TTL on holepunch UDP socket: {} remote_addr={:?}", e, remote_addr);
})?;
*current_ttl = self.default_ttl;
res res
}); })
};
// Check for errors // Check for errors
let len = network_result_try!(res.into_network_result()?); let len = network_result_try!(res.into_network_result()?);
@ -208,6 +255,10 @@ impl RawUdpProtocolHandler {
let local_socket_addr = compatible_unspecified_socket_addr(socket_addr); let local_socket_addr = compatible_unspecified_socket_addr(socket_addr);
let socket = bind_async_udp_socket(local_socket_addr)? let socket = bind_async_udp_socket(local_socket_addr)?
.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?; .ok_or(io::Error::from(io::ErrorKind::AddrInUse))?;
Ok(RawUdpProtocolHandler::new(registry, Arc::new(socket))) Ok(RawUdpProtocolHandler::new(
registry,
Arc::new(socket),
local_socket_addr.is_ipv6(),
))
} }
} }

View File

@ -23,7 +23,7 @@ const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
/// - Number of consecutive lost answers on an unordered protocol we will /// - Number of consecutive lost answers on an unordered protocol we will
/// tolerate before we call something unreliable /// tolerate before we call something unreliable
const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 1; const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 2;
/// - Number of consecutive lost answers on an ordered protocol we will /// - Number of consecutive lost answers on an ordered protocol we will
/// tolerate before we call something unreliable /// tolerate before we call something unreliable
const UNRELIABLE_LOST_ANSWERS_ORDERED: u32 = 0; const UNRELIABLE_LOST_ANSWERS_ORDERED: u32 = 0;
@ -1068,11 +1068,14 @@ impl BucketEntryInner {
} }
pub(super) fn make_not_dead(&mut self, cur_ts: Timestamp) { pub(super) fn make_not_dead(&mut self, cur_ts: Timestamp) {
self.peer_stats.rpc_stats.last_seen_ts = None; if self.check_dead(cur_ts).is_some() {
self.peer_stats.rpc_stats.failed_to_send = 0; self.peer_stats.rpc_stats.last_seen_ts = None;
self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0; self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0; self.peer_stats.rpc_stats.failed_to_send = 0;
assert!(self.check_dead(cur_ts).is_none()); self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0;
self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0;
assert!(self.check_dead(cur_ts).is_none());
}
} }
pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String { pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String {

View File

@ -95,39 +95,17 @@ impl RoutingTable {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Ping the relay to keep it alive, over every protocol it is relaying for us // Get protocol-specific noderefs for a relay to determine its liveness
#[instrument(level = "trace", skip(self, futurequeue), err)] // Relays get pinged over more protocols than non-relay nodes because we need to ensure
async fn relay_keepalive_public_internet( // that they can reliably forward packets with 'all' sequencing, not just over 'any' sequencing
fn get_relay_specific_noderefs(
&self, &self,
cur_ts: Timestamp, relay_nr: FilteredNodeRef,
futurequeue: &mut VecDeque<PingValidatorFuture>, routing_domain: RoutingDomain,
) -> EyreResult<()> { ) -> Vec<FilteredNodeRef> {
// Get the PublicInternet relay if we are using one
let Some(relay_nr) = self.relay_node(RoutingDomain::PublicInternet) else {
return Ok(());
};
// Get our publicinternet dial info // Get our publicinternet dial info
let dids = self.all_filtered_dial_info_details( let dids =
RoutingDomain::PublicInternet.into(), self.all_filtered_dial_info_details(routing_domain.into(), &DialInfoFilter::all());
&DialInfoFilter::all(),
);
let opt_relay_keepalive_ts = self.relay_node_last_keepalive(RoutingDomain::PublicInternet);
let relay_needs_keepalive = opt_relay_keepalive_ts
.map(|kts| {
cur_ts.saturating_sub(kts).as_u64()
>= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
})
.unwrap_or(true);
if !relay_needs_keepalive {
return Ok(());
}
// Say we're doing this keepalive now
self.inner
.write()
.set_relay_node_last_keepalive(RoutingDomain::PublicInternet, cur_ts);
// We need to keep-alive at one connection per ordering for relays // We need to keep-alive at one connection per ordering for relays
// but also one per NAT mapping that we need to keep open for our inbound dial info // but also one per NAT mapping that we need to keep open for our inbound dial info
@ -180,6 +158,41 @@ impl RoutingTable {
relay_noderefs.push(relay_nr); relay_noderefs.push(relay_nr);
} }
relay_noderefs
}
// Ping the relay to keep it alive, over every protocol it is relaying for us
#[instrument(level = "trace", skip(self, futurequeue), err)]
async fn relay_keepalive_public_internet(
&self,
cur_ts: Timestamp,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
// Get the PublicInternet relay if we are using one
let Some(relay_nr) = self.relay_node(RoutingDomain::PublicInternet) else {
return Ok(());
};
let opt_relay_keepalive_ts = self.relay_node_last_keepalive(RoutingDomain::PublicInternet);
let relay_needs_keepalive = opt_relay_keepalive_ts
.map(|kts| {
cur_ts.saturating_sub(kts).as_u64()
>= (RELAY_KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
})
.unwrap_or(true);
if !relay_needs_keepalive {
return Ok(());
}
// Say we're doing this keepalive now
self.inner
.write()
.set_relay_node_last_keepalive(RoutingDomain::PublicInternet, cur_ts);
// Get the sequencing-specific relay noderefs for this relay
let relay_noderefs =
self.get_relay_specific_noderefs(relay_nr, RoutingDomain::PublicInternet);
for relay_nr_filtered in relay_noderefs { for relay_nr_filtered in relay_noderefs {
futurequeue.push_back( futurequeue.push_back(
async move { async move {
@ -249,24 +262,36 @@ impl RoutingTable {
futurequeue: &mut VecDeque<PingValidatorFuture>, futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> { ) -> EyreResult<()> {
// Get all nodes needing pings in the PublicInternet routing domain // Get all nodes needing pings in the PublicInternet routing domain
let relay_node_filter = self.make_public_internet_relay_node_filter();
let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts);
// Just do a single ping with the best protocol for all the other nodes to check for liveness // Just do a single ping with the best protocol for all the other nodes to check for liveness
for nr in node_refs { for nr in node_refs {
let nr = nr.sequencing_clone(Sequencing::PreferOrdered); // If the node is relay-capable, we should ping it over ALL sequencing types
// instead of just a simple liveness check on ANY best contact method
futurequeue.push_back( let all_noderefs = if nr.operate(|_rti, e| !relay_node_filter(e)) {
async move { // If this is a relay capable node, get all the sequencing specific noderefs
#[cfg(feature = "verbose-tracing")] self.get_relay_specific_noderefs(nr, RoutingDomain::PublicInternet)
veilid_log!(nr debug "--> PublicInternet Validator ping to {:?}", nr); } else {
let rpc_processor = nr.rpc_processor(); // If a non-relay node, ping with the normal ping type
let _ = rpc_processor vec![nr.sequencing_clone(Sequencing::PreferOrdered)]
.rpc_call_status(Destination::direct(nr)) };
.await?;
Ok(()) for nr in all_noderefs {
} futurequeue.push_back(
.boxed(), async move {
); #[cfg(feature = "verbose-tracing")]
veilid_log!(nr debug "--> PublicInternet Validator ping to {:?}", nr);
let rpc_processor = nr.rpc_processor();
let _ = rpc_processor
.rpc_call_status(Destination::direct(nr))
.await?;
Ok(())
}
.boxed(),
);
}
} }
Ok(()) Ok(())

View File

@ -206,7 +206,7 @@ impl Destination {
} }
if opt_routing_domain.is_none() { if opt_routing_domain.is_none() {
// In the case of an unexpected relay, log it and don't pass any sender peer info into an unexpected relay // In the case of an unexpected relay, log it and don't pass any sender peer info into an unexpected relay
veilid_log!(node warn "No routing domain for relay: relay={}, node={}", relay, node); veilid_log!(node debug "Unexpected relay: relay={}, node={}", relay, node);
}; };
( (

View File

@ -817,8 +817,10 @@ impl RPCProcessor {
return SenderPeerInfo::default(); return SenderPeerInfo::default();
}; };
let Some(routing_domain) = opt_routing_domain else { let Some(routing_domain) = opt_routing_domain else {
// No routing domain for target, no node info // No routing domain for target, no node info is safe to send here
// Only a stale connection or no connection exists // Only a stale connection or no connection exists, or an unexpected
// relay was used, possibly due to the destination switching relays
// in a race condition with our send
return SenderPeerInfo::default(); return SenderPeerInfo::default();
}; };