Improved UDP hole punching with TTL

This commit is contained in:
Christien Rioux 2025-02-14 04:21:00 +00:00
parent 4ce0c2ccaf
commit 7a7bb0ad2e
9 changed files with 154 additions and 19 deletions

View File

@ -50,6 +50,7 @@ build_cache:
extends: .base
stage: prepare
script:
- 'echo "CI_PIPELINE_SOURCE: $CI_PIPELINE_SOURCE"'
- apk update && apk add jq && apk add curl
- if ! docker manifest inspect $CI_REGISTRY_IMAGE/build-cache:latest > /dev/null; then
- CACHE_EPOCH=0

View File

@ -259,7 +259,6 @@ impl Settings {
if let Some(ipc_path) = ipc_path.or(self.ipc_path.clone()) {
if is_ipc_socket_path(&ipc_path) {
// try direct path
enable_network = false;
client_api_ipc_path = Some(ipc_path);
} else {
// try subnode index inside path

View File

@ -766,10 +766,7 @@ impl NetworkManager {
// Both sides will do this and then the receipt will get sent over the punched hole
let unique_flow = network_result_try!(
self.net()
.send_data_to_dial_info(
hole_punch_dial_info_detail.dial_info.clone(),
Vec::new(),
)
.send_hole_punch(hole_punch_dial_info_detail.dial_info.clone(),)
.await?
);

View File

@ -34,7 +34,7 @@ use std::path::{Path, PathBuf};
pub const UPDATE_NETWORK_CLASS_TASK_TICK_PERIOD_SECS: u32 = 1;
pub const NETWORK_INTERFACES_TASK_TICK_PERIOD_SECS: u32 = 1;
pub const UPNP_TASK_TICK_PERIOD_SECS: u32 = 1;
pub const HOLE_PUNCH_TTL: u32 = 3;
pub const PEEK_DETECT_LEN: usize = 64;
cfg_if! {
@ -652,6 +652,55 @@ impl Network {
.await
}
// Send hole punch attempt to a specific dialinfo. May not be appropriate for all protocols.
// Returns a flow for the connection used to send the data
#[instrument(level = "trace", target = "net", err, skip(self))]
pub async fn send_hole_punch(
&self,
dial_info: DialInfo,
) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.startup_lock.enter()?;
self.record_dial_info_failure(
dial_info.clone(),
async move {
let unique_flow;
if dial_info.protocol_type() == ProtocolType::UDP {
// Handle connectionless protocol
let peer_socket_addr = dial_info.to_socket_addr();
let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
Some(ph) => ph,
None => {
return Ok(NetworkResult::no_connection_other(
"no appropriate UDP protocol handler for dial_info",
));
}
};
let flow = network_result_try!(ph
.send_hole_punch(peer_socket_addr, HOLE_PUNCH_TTL)
.await
.wrap_err("failed to send hole punch to dial info")?);
unique_flow = UniqueFlow {
flow,
connection_id: None,
};
} else {
return Ok(NetworkResult::ServiceUnavailable(
"unimplemented for this protocol".to_owned(),
));
}
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(0));
Ok(NetworkResult::value(unique_flow))
}
.in_current_span(),
)
.await
}
/////////////////////////////////////////////////////////////////
pub async fn startup_internal(&self) -> EyreResult<StartupDisposition> {

View File

@ -140,6 +140,63 @@ impl RawUdpProtocolHandler {
Ok(NetworkResult::value(flow))
}
#[instrument(level = "trace", target = "protocol", err, skip(self), fields(ret.flow))]
pub async fn send_hole_punch(
&self,
remote_addr: SocketAddr,
ttl: u32,
) -> io::Result<NetworkResult<Flow>> {
// Check to see if it is punished
if self
.network_manager()
.address_filter()
.is_ip_addr_punished(remote_addr.ip())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
// Get synchronous socket
let res = socket2_operation(self.socket.as_ref(), |s| {
// Get original TTL
let original_ttl = s.ttl()?;
// Set TTL
s.set_ttl(ttl)?;
// Send zero length packet
let res = s.send_to(&[], &remote_addr.into());
// Restore TTL immediately
s.set_ttl(original_ttl)?;
res
});
// Check for errors
let len = network_result_try!(res.into_network_result()?);
if len != 0 {
bail_io_error_other!("wrong size send");
}
// Return a flow for the sent message
let peer_addr = PeerAddress::new(
SocketAddress::from_socket_addr(remote_addr),
ProtocolType::UDP,
);
let local_socket_addr = self.socket.local_addr()?;
let flow = Flow::new(
peer_addr,
SocketAddress::from_socket_addr(local_socket_addr),
);
log_net!("udp::send_hole_punch: {:?}", flow);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
Ok(NetworkResult::value(flow))
}
#[instrument(level = "trace", target = "protocol", err)]
pub async fn new_unspecified_bound_handler(
registry: VeilidComponentRegistry,

View File

@ -806,13 +806,10 @@ impl NetworkManager {
// punch should come through and create a real 'last connection' for us if this succeeds
network_result_try!(
self.net()
.send_data_to_dial_info(hole_punch_did.dial_info.clone(), Vec::new())
.send_hole_punch(hole_punch_did.dial_info.clone())
.await?
);
// Add small delay to encourage packets to be delivered in order
sleep(HOLE_PUNCH_DELAY_MS).await;
// Issue the signal
let rpc = self.rpc_processor();
network_result_try!(rpc
@ -826,13 +823,6 @@ impl NetworkManager {
.await
.wrap_err("failed to send signal")?);
// Another hole punch after the signal for UDP redundancy
network_result_try!(
self.net()
.send_data_to_dial_info(hole_punch_did.dial_info, Vec::new())
.await?
);
// Wait for the return receipt
let inbound_nr = match eventual_value
.timeout_at(stop_token)

View File

@ -5,3 +5,6 @@
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock
.flutter-plugins/
.flutter-plugins-dependencies/

View File

@ -31,6 +31,12 @@ fn io_error_kind_from_error<T>(e: io::Error) -> io::Result<NetworkResult<T>> {
return Ok(NetworkResult::NoConnection(e));
}
}
#[cfg(windows)]
if let Some(os_err) = e.raw_os_error() {
if os_err == winapi::um::winsock2::WSAENETRESET {
return Ok(NetworkResult::NoConnection(e));
}
}
match e.kind() {
io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout),
io::ErrorKind::UnexpectedEof

View File

@ -98,7 +98,7 @@ pub fn set_tcp_stream_linger(
unsafe {
let s = socket2::Socket::from_raw_fd(tcp_stream.as_raw_fd());
let res = s.set_linger(linger);
s.into_raw_fd();
let _ = s.into_raw_fd();
res
}
}
@ -109,7 +109,7 @@ pub fn set_tcp_stream_linger(
unsafe {
let s = socket2::Socket::from_raw_socket(tcp_stream.as_raw_socket());
let res = s.set_linger(linger);
s.into_raw_socket();
let _ = s.into_raw_socket();
res
}
}
@ -282,3 +282,36 @@ async fn nonblocking_connect(
pub fn domain_for_address(address: SocketAddr) -> core::ffi::c_int {
socket2::Domain::for_address(address).into()
}
// Run operations on underlying socket
cfg_if! {
if #[cfg(unix)] {
pub fn socket2_operation<S: std::os::fd::AsRawFd, F: FnOnce(&mut socket2::Socket) -> R, R>(
s: &S,
callback: F,
) -> R {
use std::os::fd::{FromRawFd, IntoRawFd};
let mut s = unsafe { socket2::Socket::from_raw_fd(s.as_raw_fd()) };
let res = callback(&mut s);
let _ = s.into_raw_fd();
res
}
} else if #[cfg(windows)] {
pub fn socket2_operation<
S: std::os::windows::io::AsRawSocket,
F: FnOnce(&mut socket2::Socket) -> R,
R,
>(
s: &S,
callback: F,
) -> R {
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
let mut s = unsafe { socket2::Socket::from_raw_socket(s.as_raw_socket()) };
let res = callback(&mut s);
let _ = s.into_raw_socket();
res
}
} else {
#[compile_error("unimplemented")]
}
}