mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
refactor net bind, wip
This commit is contained in:
parent
82d107f446
commit
03e872c128
@ -15,6 +15,7 @@ use protocol::tcp::RawTcpProtocolHandler;
|
|||||||
use protocol::udp::RawUdpProtocolHandler;
|
use protocol::udp::RawUdpProtocolHandler;
|
||||||
use protocol::ws::WebsocketProtocolHandler;
|
use protocol::ws::WebsocketProtocolHandler;
|
||||||
pub(in crate::network_manager) use protocol::*;
|
pub(in crate::network_manager) use protocol::*;
|
||||||
|
use start_protocols::*;
|
||||||
|
|
||||||
use async_tls::TlsAcceptor;
|
use async_tls::TlsAcceptor;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
@ -104,18 +105,12 @@ struct NetworkInner {
|
|||||||
network_already_cleared: bool,
|
network_already_cleared: bool,
|
||||||
/// the punishment closure to enax
|
/// the punishment closure to enax
|
||||||
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
||||||
/// udp socket record for bound-first sockets, which are used to guarantee a port is available before
|
|
||||||
/// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using
|
|
||||||
bound_first_udp: BTreeMap<u16, (Option<socket2::Socket>, Option<socket2::Socket>)>,
|
|
||||||
/// mapping of protocol handlers to accept messages from a set of bound socket addresses
|
/// mapping of protocol handlers to accept messages from a set of bound socket addresses
|
||||||
inbound_udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
|
udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
|
||||||
/// outbound udp protocol handler for udpv4
|
/// outbound udp protocol handler for udpv4
|
||||||
outbound_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
|
default_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
|
||||||
/// outbound udp protocol handler for udpv6
|
/// outbound udp protocol handler for udpv6
|
||||||
outbound_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
|
default_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
|
||||||
/// tcp socket record for bound-first sockets, which are used to guarantee a port is available before
|
|
||||||
/// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using
|
|
||||||
bound_first_tcp: BTreeMap<u16, (Option<socket2::Socket>, Option<socket2::Socket>)>,
|
|
||||||
/// TLS handling socket controller
|
/// TLS handling socket controller
|
||||||
tls_acceptor: Option<TlsAcceptor>,
|
tls_acceptor: Option<TlsAcceptor>,
|
||||||
/// Multiplexer record for protocols on low level TCP sockets
|
/// Multiplexer record for protocols on low level TCP sockets
|
||||||
@ -164,11 +159,9 @@ impl Network {
|
|||||||
enable_ipv4: false,
|
enable_ipv4: false,
|
||||||
enable_ipv6_global: false,
|
enable_ipv6_global: false,
|
||||||
enable_ipv6_local: false,
|
enable_ipv6_local: false,
|
||||||
bound_first_udp: BTreeMap::new(),
|
udp_protocol_handlers: BTreeMap::new(),
|
||||||
inbound_udp_protocol_handlers: BTreeMap::new(),
|
default_udpv4_protocol_handler: None,
|
||||||
outbound_udpv4_protocol_handler: None,
|
default_udpv6_protocol_handler: None,
|
||||||
outbound_udpv6_protocol_handler: None,
|
|
||||||
bound_first_tcp: BTreeMap::new(),
|
|
||||||
tls_acceptor: None,
|
tls_acceptor: None,
|
||||||
listener_states: BTreeMap::new(),
|
listener_states: BTreeMap::new(),
|
||||||
}
|
}
|
||||||
@ -332,17 +325,6 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option<u16> {
|
|
||||||
let inner = self.inner.lock();
|
|
||||||
let local_port = match protocol_type {
|
|
||||||
ProtocolType::UDP => inner.udp_port,
|
|
||||||
ProtocolType::TCP => inner.tcp_port,
|
|
||||||
ProtocolType::WS => inner.ws_port,
|
|
||||||
ProtocolType::WSS => inner.wss_port,
|
|
||||||
};
|
|
||||||
Some(local_port)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> {
|
pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> {
|
||||||
let inner = self.inner.lock();
|
let inner = self.inner.lock();
|
||||||
|
|
||||||
@ -846,7 +828,7 @@ impl Network {
|
|||||||
|
|
||||||
// start listeners
|
// start listeners
|
||||||
if protocol_config.inbound.contains(ProtocolType::UDP) {
|
if protocol_config.inbound.contains(ProtocolType::UDP) {
|
||||||
self.start_udp_listeners(&mut editor_public_internet, &mut editor_local_network)
|
self.bind_udp_protocol_handlers(&mut editor_public_internet, &mut editor_local_network)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
if protocol_config.inbound.contains(ProtocolType::WS) {
|
if protocol_config.inbound.contains(ProtocolType::WS) {
|
||||||
@ -862,11 +844,6 @@ impl Network {
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// release caches of available listener ports
|
|
||||||
// this releases the 'first bound' ports we use to guarantee
|
|
||||||
// that we have ports available to us
|
|
||||||
self.free_bound_first_ports();
|
|
||||||
|
|
||||||
editor_public_internet.setup_network(
|
editor_public_internet.setup_network(
|
||||||
protocol_config.outbound,
|
protocol_config.outbound,
|
||||||
protocol_config.inbound,
|
protocol_config.inbound,
|
||||||
|
@ -237,7 +237,7 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<()> {
|
async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult<bool> {
|
||||||
// Get config
|
// Get config
|
||||||
let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = {
|
let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -248,12 +248,15 @@ impl Network {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Create a reusable socket with no linger time, and no delay
|
// Create a reusable socket with no linger time, and no delay
|
||||||
let socket = new_bound_shared_tcp_socket(addr)
|
let Some(socket) = new_bound_shared_tcp_socket(addr)
|
||||||
.wrap_err("failed to create bound shared tcp socket")?;
|
.wrap_err("failed to create bound shared tcp socket")?
|
||||||
|
else {
|
||||||
|
return Ok(false);
|
||||||
|
};
|
||||||
// Listen on the socket
|
// Listen on the socket
|
||||||
socket
|
if socket.listen(128).is_err() {
|
||||||
.listen(128)
|
return Ok(false);
|
||||||
.wrap_err("Couldn't listen on TCP socket")?;
|
}
|
||||||
|
|
||||||
// Make an async tcplistener from the socket2 socket
|
// Make an async tcplistener from the socket2 socket
|
||||||
let std_listener: std::net::TcpListener = socket.into();
|
let std_listener: std::net::TcpListener = socket.into();
|
||||||
@ -324,7 +327,7 @@ impl Network {
|
|||||||
// Add to join handles
|
// Add to join handles
|
||||||
self.add_to_join_handles(jh);
|
self.add_to_join_handles(jh);
|
||||||
|
|
||||||
Ok(())
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////
|
||||||
@ -332,51 +335,76 @@ impl Network {
|
|||||||
// TCP listener that multiplexes ports so multiple protocols can exist on a single port
|
// TCP listener that multiplexes ports so multiple protocols can exist on a single port
|
||||||
pub(super) async fn start_tcp_listener(
|
pub(super) async fn start_tcp_listener(
|
||||||
&self,
|
&self,
|
||||||
ip_addrs: Vec<IpAddr>,
|
bind_set: NetworkBindSet,
|
||||||
port: u16,
|
|
||||||
is_tls: bool,
|
is_tls: bool,
|
||||||
new_protocol_accept_handler: Box<NewProtocolAcceptHandler>,
|
new_protocol_accept_handler: Box<NewProtocolAcceptHandler>,
|
||||||
) -> EyreResult<Vec<SocketAddress>> {
|
) -> EyreResult<Vec<SocketAddress>> {
|
||||||
let mut out = Vec::<SocketAddress>::new();
|
let mut out = Vec::<SocketAddress>::new();
|
||||||
|
|
||||||
for ip_addr in ip_addrs {
|
for ip_addr in bind_set.addrs {
|
||||||
let addr = SocketAddr::new(ip_addr, port);
|
let mut port = bind_set.port;
|
||||||
let idi_addrs = self.translate_unspecified_address(&addr);
|
loop {
|
||||||
|
let addr = SocketAddr::new(ip_addr, port);
|
||||||
|
|
||||||
// see if we've already bound to this already
|
// see if we've already bound to this already
|
||||||
// if not, spawn a listener
|
// if not, spawn a listener
|
||||||
if !self.inner.lock().listener_states.contains_key(&addr) {
|
let mut got_listener = false;
|
||||||
self.clone().spawn_socket_listener(addr).await?;
|
if !self.inner.lock().listener_states.contains_key(&addr) {
|
||||||
}
|
if self.clone().spawn_socket_listener(addr).await? {
|
||||||
|
got_listener = true;
|
||||||
let ls = if let Some(ls) = self.inner.lock().listener_states.get_mut(&addr) {
|
}
|
||||||
ls.clone()
|
} else {
|
||||||
} else {
|
got_listener = true;
|
||||||
panic!("this shouldn't happen");
|
|
||||||
};
|
|
||||||
|
|
||||||
if is_tls {
|
|
||||||
if ls.read().tls_acceptor.is_none() {
|
|
||||||
ls.write().tls_acceptor = Some(self.clone().get_or_create_tls_acceptor()?);
|
|
||||||
}
|
}
|
||||||
ls.write()
|
|
||||||
.tls_protocol_handlers
|
|
||||||
.push(new_protocol_accept_handler(
|
|
||||||
self.network_manager().config(),
|
|
||||||
true,
|
|
||||||
));
|
|
||||||
} else {
|
|
||||||
ls.write()
|
|
||||||
.protocol_accept_handlers
|
|
||||||
.push(new_protocol_accept_handler(
|
|
||||||
self.network_manager().config(),
|
|
||||||
false,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return interface dial infos we listen on
|
if got_listener {
|
||||||
for idi_addr in idi_addrs {
|
let ls = if let Some(ls) = self.inner.lock().listener_states.get_mut(&addr) {
|
||||||
out.push(SocketAddress::from_socket_addr(idi_addr));
|
ls.clone()
|
||||||
|
} else {
|
||||||
|
panic!("this shouldn't happen");
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_tls {
|
||||||
|
if ls.read().tls_acceptor.is_none() {
|
||||||
|
ls.write().tls_acceptor =
|
||||||
|
Some(self.clone().get_or_create_tls_acceptor()?);
|
||||||
|
}
|
||||||
|
ls.write()
|
||||||
|
.tls_protocol_handlers
|
||||||
|
.push(new_protocol_accept_handler(
|
||||||
|
self.network_manager().config(),
|
||||||
|
true,
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
ls.write()
|
||||||
|
.protocol_accept_handlers
|
||||||
|
.push(new_protocol_accept_handler(
|
||||||
|
self.network_manager().config(),
|
||||||
|
false,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return interface dial infos we listen on
|
||||||
|
let idi_addrs = self.translate_unspecified_address(&addr);
|
||||||
|
for idi_addr in idi_addrs {
|
||||||
|
out.push(SocketAddress::from_socket_addr(idi_addr));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bind_set.search {
|
||||||
|
bail!("unable to bind to tcp {}", addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
if port == 65535u16 {
|
||||||
|
port = 1024;
|
||||||
|
} else {
|
||||||
|
port += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if port == bind_set.port {
|
||||||
|
bail!("unable to find a free port for tcp {}", ip_addr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,19 +27,13 @@ impl Network {
|
|||||||
log_net!("UDP listener task spawned");
|
log_net!("UDP listener task spawned");
|
||||||
|
|
||||||
// Collect all our protocol handlers into a vector
|
// Collect all our protocol handlers into a vector
|
||||||
let mut protocol_handlers: Vec<RawUdpProtocolHandler> = this
|
let protocol_handlers: Vec<RawUdpProtocolHandler> = this
|
||||||
.inner
|
.inner
|
||||||
.lock()
|
.lock()
|
||||||
.inbound_udp_protocol_handlers
|
.udp_protocol_handlers
|
||||||
.values()
|
.values()
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect();
|
.collect();
|
||||||
if let Some(ph) = this.inner.lock().outbound_udpv4_protocol_handler.clone() {
|
|
||||||
protocol_handlers.push(ph);
|
|
||||||
}
|
|
||||||
if let Some(ph) = this.inner.lock().outbound_udpv6_protocol_handler.clone() {
|
|
||||||
protocol_handlers.push(ph);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn a local async task for each socket
|
// Spawn a local async task for each socket
|
||||||
let mut protocol_handlers_unordered = FuturesUnordered::new();
|
let mut protocol_handlers_unordered = FuturesUnordered::new();
|
||||||
@ -114,77 +108,13 @@ impl Network {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn create_udp_outbound_sockets(&self) -> EyreResult<()> {
|
async fn create_udp_protocol_handler(&self, addr: SocketAddr) -> EyreResult<bool> {
|
||||||
let mut inner = self.inner.lock();
|
log_net!("create_udp_protocol_handler on {:?}", &addr);
|
||||||
let mut port = inner.udp_port;
|
|
||||||
// v4
|
|
||||||
let socket_addr_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
|
|
||||||
if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v4) {
|
|
||||||
// Pull the port if we randomly bound, so v6 can be on the same port
|
|
||||||
port = socket
|
|
||||||
.local_addr()
|
|
||||||
.wrap_err("failed to get local address")?
|
|
||||||
.as_socket_ipv4()
|
|
||||||
.ok_or_else(|| eyre!("expected ipv4 address type"))?
|
|
||||||
.port();
|
|
||||||
|
|
||||||
// Make an async UdpSocket from the socket2 socket
|
|
||||||
let std_udp_socket: std::net::UdpSocket = socket.into();
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(feature="rt-async-std")] {
|
|
||||||
let udp_socket = UdpSocket::from(std_udp_socket);
|
|
||||||
} else if #[cfg(feature="rt-tokio")] {
|
|
||||||
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
|
|
||||||
let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make outbound v4 tokio udpsocket")?;
|
|
||||||
} else {
|
|
||||||
compile_error!("needs executor implementation")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let socket_arc = Arc::new(udp_socket);
|
|
||||||
|
|
||||||
// Create protocol handler
|
|
||||||
let udpv4_handler = RawUdpProtocolHandler::new(
|
|
||||||
socket_arc,
|
|
||||||
Some(self.network_manager().address_filter()),
|
|
||||||
);
|
|
||||||
|
|
||||||
inner.outbound_udpv4_protocol_handler = Some(udpv4_handler);
|
|
||||||
}
|
|
||||||
//v6
|
|
||||||
let socket_addr_v6 =
|
|
||||||
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), port);
|
|
||||||
if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v6) {
|
|
||||||
// Make an async UdpSocket from the socket2 socket
|
|
||||||
let std_udp_socket: std::net::UdpSocket = socket.into();
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(feature="rt-async-std")] {
|
|
||||||
let udp_socket = UdpSocket::from(std_udp_socket);
|
|
||||||
} else if #[cfg(feature="rt-tokio")] {
|
|
||||||
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
|
|
||||||
let udp_socket = UdpSocket::from_std(std_udp_socket).wrap_err("failed to make outbound v6 tokio udpsocket")?;
|
|
||||||
} else {
|
|
||||||
compile_error!("needs executor implementation")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let socket_arc = Arc::new(udp_socket);
|
|
||||||
|
|
||||||
// Create protocol handler
|
|
||||||
let udpv6_handler = RawUdpProtocolHandler::new(
|
|
||||||
socket_arc,
|
|
||||||
Some(self.network_manager().address_filter()),
|
|
||||||
);
|
|
||||||
|
|
||||||
inner.outbound_udpv6_protocol_handler = Some(udpv6_handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn create_udp_inbound_socket(&self, addr: SocketAddr) -> EyreResult<()> {
|
|
||||||
log_net!("create_udp_inbound_socket on {:?}", &addr);
|
|
||||||
|
|
||||||
// Create a reusable socket
|
// Create a reusable socket
|
||||||
let socket = new_bound_shared_udp_socket(addr)?;
|
let Some(socket) = new_bound_default_udp_socket(addr)? else {
|
||||||
|
return Ok(false);
|
||||||
|
};
|
||||||
|
|
||||||
// Make an async UdpSocket from the socket2 socket
|
// Make an async UdpSocket from the socket2 socket
|
||||||
let std_udp_socket: std::net::UdpSocket = socket.into();
|
let std_udp_socket: std::net::UdpSocket = socket.into();
|
||||||
@ -204,40 +134,58 @@ impl Network {
|
|||||||
let protocol_handler =
|
let protocol_handler =
|
||||||
RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter()));
|
RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter()));
|
||||||
|
|
||||||
// Create message_handler records
|
// Record protocol handler
|
||||||
self.inner
|
let mut inner = self.inner.lock();
|
||||||
.lock()
|
inner
|
||||||
.inbound_udp_protocol_handlers
|
.udp_protocol_handlers
|
||||||
.insert(addr, protocol_handler);
|
.insert(addr, protocol_handler.clone());
|
||||||
|
if addr.is_ipv4() && inner.default_udpv4_protocol_handler.is_none() {
|
||||||
|
inner.default_udpv4_protocol_handler = Some(protocol_handler);
|
||||||
|
} else if addr.is_ipv6() && inner.default_udpv6_protocol_handler.is_none() {
|
||||||
|
inner.default_udpv6_protocol_handler = Some(protocol_handler);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn create_udp_inbound_sockets(
|
pub(super) async fn create_udp_protocol_handlers(
|
||||||
&self,
|
&self,
|
||||||
ip_addrs: Vec<IpAddr>,
|
bind_set: NetworkBindSet,
|
||||||
port: u16,
|
|
||||||
) -> EyreResult<Vec<DialInfo>> {
|
) -> EyreResult<Vec<DialInfo>> {
|
||||||
let mut out = Vec::<DialInfo>::new();
|
let mut out = Vec::<DialInfo>::new();
|
||||||
|
|
||||||
for ip_addr in ip_addrs {
|
for ip_addr in bind_set.addrs {
|
||||||
let addr = SocketAddr::new(ip_addr, port);
|
let mut port = bind_set.port;
|
||||||
|
loop {
|
||||||
|
let addr = SocketAddr::new(ip_addr, port);
|
||||||
|
|
||||||
// see if we've already bound to this already
|
// see if we've already bound to this already
|
||||||
// if not, spawn a listener
|
// if not, spawn a listener
|
||||||
if !self
|
if !self.inner.lock().udp_protocol_handlers.contains_key(&addr) {
|
||||||
.inner
|
let bound = self.clone().create_udp_protocol_handler(addr).await?;
|
||||||
.lock()
|
|
||||||
.inbound_udp_protocol_handlers
|
|
||||||
.contains_key(&addr)
|
|
||||||
{
|
|
||||||
let idi_addrs = self.translate_unspecified_address(&addr);
|
|
||||||
|
|
||||||
self.clone().create_udp_inbound_socket(addr).await?;
|
// Return interface dial infos we listen on
|
||||||
|
if bound {
|
||||||
|
let idi_addrs = self.translate_unspecified_address(&addr);
|
||||||
|
for idi_addr in idi_addrs {
|
||||||
|
out.push(DialInfo::udp_from_socketaddr(idi_addr));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return interface dial infos we listen on
|
if !bind_set.search {
|
||||||
for idi_addr in idi_addrs {
|
bail!("unable to bind to udp {}", addr);
|
||||||
out.push(DialInfo::udp_from_socketaddr(idi_addr));
|
}
|
||||||
|
|
||||||
|
if port == 65535u16 {
|
||||||
|
port = 1024;
|
||||||
|
} else {
|
||||||
|
port += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if port == bind_set.port {
|
||||||
|
bail!("unable to find a free port for udp {}", ip_addr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,18 +199,31 @@ impl Network {
|
|||||||
peer_socket_addr: &SocketAddr,
|
peer_socket_addr: &SocketAddr,
|
||||||
local_socket_addr: &Option<SocketAddr>,
|
local_socket_addr: &Option<SocketAddr>,
|
||||||
) -> Option<RawUdpProtocolHandler> {
|
) -> Option<RawUdpProtocolHandler> {
|
||||||
|
let inner = self.inner.lock();
|
||||||
// if our last communication with this peer came from a particular inbound udp protocol handler, use it
|
// if our last communication with this peer came from a particular inbound udp protocol handler, use it
|
||||||
if let Some(sa) = local_socket_addr {
|
if let Some(sa) = local_socket_addr {
|
||||||
if let Some(ph) = self.inner.lock().inbound_udp_protocol_handlers.get(sa) {
|
if let Some(ph) = inner.udp_protocol_handlers.get(sa) {
|
||||||
return Some(ph.clone());
|
return Some(ph.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise find the outbound udp protocol handler that matches the ip protocol version of the peer addr
|
// otherwise find the first outbound udp protocol handler that matches the ip protocol version of the peer addr
|
||||||
let inner = self.inner.lock();
|
let inner = self.inner.lock();
|
||||||
match peer_socket_addr {
|
match peer_socket_addr {
|
||||||
SocketAddr::V4(_) => inner.outbound_udpv4_protocol_handler.clone(),
|
SocketAddr::V4(_) => inner.udp_protocol_handlers.iter().find_map(|x| {
|
||||||
SocketAddr::V6(_) => inner.outbound_udpv6_protocol_handler.clone(),
|
if x.0.is_ipv4() {
|
||||||
|
Some(x.1.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
SocketAddr::V6(_) => inner.udp_protocol_handlers.iter().find_map(|x| {
|
||||||
|
if x.0.is_ipv6() {
|
||||||
|
Some(x.1.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ cfg_if! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", ret)]
|
#[instrument(level = "trace", ret)]
|
||||||
pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
|
pub fn new_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
|
||||||
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
|
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
|
||||||
if domain == Domain::IPV6 {
|
if domain == Domain::IPV6 {
|
||||||
socket.set_only_v6(true)?;
|
socket.set_only_v6(true)?;
|
||||||
@ -52,56 +52,28 @@ pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", ret)]
|
#[instrument(level = "trace", ret)]
|
||||||
pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
|
pub fn new_default_udp_socket(domain: Domain) -> io::Result<Socket> {
|
||||||
let domain = Domain::for_address(local_address);
|
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
|
||||||
let socket = new_unbound_shared_udp_socket(domain)?;
|
if domain == Domain::IPV6 {
|
||||||
let socket2_addr = SockAddr::from(local_address);
|
socket.set_only_v6(true)?;
|
||||||
socket.bind(&socket2_addr)?;
|
}
|
||||||
|
|
||||||
log_net!("created bound shared udp socket on {:?}", &local_address);
|
|
||||||
|
|
||||||
Ok(socket)
|
Ok(socket)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", ret)]
|
#[instrument(level = "trace", ret)]
|
||||||
pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
|
pub fn new_bound_default_udp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
|
||||||
let domain = Domain::for_address(local_address);
|
let domain = Domain::for_address(local_address);
|
||||||
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
|
let socket = new_default_udp_socket(domain)?;
|
||||||
if domain == Domain::IPV6 {
|
|
||||||
socket.set_only_v6(true)?;
|
|
||||||
}
|
|
||||||
// Bind the socket -first- before turning on 'reuse address' this way it will
|
|
||||||
// fail if the port is already taken
|
|
||||||
let socket2_addr = SockAddr::from(local_address);
|
let socket2_addr = SockAddr::from(local_address);
|
||||||
|
|
||||||
// On windows, do SO_EXCLUSIVEADDRUSE before the bind to ensure the port is fully available
|
if socket.bind(&socket2_addr).is_err() {
|
||||||
cfg_if! {
|
return Ok(None);
|
||||||
if #[cfg(windows)] {
|
|
||||||
set_exclusiveaddruse(&socket)?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bind the socket -first- without turning on SO_REUSEPORT this way it will
|
log_net!("created bound default udp socket on {:?}", &local_address);
|
||||||
// fail if the port is already taken
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
socket
|
|
||||||
.set_reuse_address(true)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.bind(&socket2_addr)?;
|
Ok(Some(socket))
|
||||||
|
|
||||||
// Set 'reuse address' so future binds to this port will succeed
|
|
||||||
// This does not work on Windows, where reuse options can not be set after the bind
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
socket.set_reuse_port(true)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log_net!("created bound first udp socket on {:?}", &local_address);
|
|
||||||
|
|
||||||
Ok(socket)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", ret)]
|
#[instrument(level = "trace", ret)]
|
||||||
@ -139,62 +111,17 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", ret)]
|
#[instrument(level = "trace", ret)]
|
||||||
pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
|
pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Option<Socket>> {
|
||||||
let domain = Domain::for_address(local_address);
|
let domain = Domain::for_address(local_address);
|
||||||
let socket = new_unbound_shared_tcp_socket(domain)?;
|
let socket = new_unbound_shared_tcp_socket(domain)?;
|
||||||
let socket2_addr = SockAddr::from(local_address);
|
let socket2_addr = SockAddr::from(local_address);
|
||||||
socket.bind(&socket2_addr)?;
|
if socket.bind(&socket2_addr).is_err() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
log_net!("created bound shared tcp socket on {:?}", &local_address);
|
log_net!("created bound shared tcp socket on {:?}", &local_address);
|
||||||
|
|
||||||
Ok(socket)
|
Ok(Some(socket))
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(level = "trace", ret)]
|
|
||||||
pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
|
|
||||||
let domain = Domain::for_address(local_address);
|
|
||||||
|
|
||||||
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
|
|
||||||
// if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
|
|
||||||
// log_net!(error "Couldn't set TCP linger: {}", e);
|
|
||||||
// }
|
|
||||||
if let Err(e) = socket.set_nodelay(true) {
|
|
||||||
log_net!(error "Couldn't set TCP nodelay: {}", e);
|
|
||||||
}
|
|
||||||
if domain == Domain::IPV6 {
|
|
||||||
socket.set_only_v6(true)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// On windows, do SO_EXCLUSIVEADDRUSE before the bind to ensure the port is fully available
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(windows)] {
|
|
||||||
set_exclusiveaddruse(&socket)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bind the socket -first- without turning on SO_REUSEPORT this way it will
|
|
||||||
// fail if the port is already taken
|
|
||||||
let socket2_addr = SockAddr::from(local_address);
|
|
||||||
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
socket
|
|
||||||
.set_reuse_address(true)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.bind(&socket2_addr)?;
|
|
||||||
|
|
||||||
// Set 'reuse address' so future binds to this port will succeed
|
|
||||||
// This does not work on Windows, where reuse options can not be set after the bind
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
socket.set_reuse_port(true)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log_net!("created bound first tcp socket on {:?}", &local_address);
|
|
||||||
|
|
||||||
Ok(socket)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Non-blocking connect is tricky when you want to start with a prepared socket
|
// Non-blocking connect is tricky when you want to start with a prepared socket
|
||||||
|
@ -169,7 +169,9 @@ impl RawTcpProtocolHandler {
|
|||||||
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
|
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
|
||||||
// Make a shared socket
|
// Make a shared socket
|
||||||
let socket = match local_address {
|
let socket = match local_address {
|
||||||
Some(a) => new_bound_shared_tcp_socket(a)?,
|
Some(a) => {
|
||||||
|
new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?
|
||||||
|
}
|
||||||
None => new_unbound_tcp_socket(socket2::Domain::for_address(socket_addr))?,
|
None => new_unbound_tcp_socket(socket2::Domain::for_address(socket_addr))?,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -128,6 +128,8 @@ impl RawUdpProtocolHandler {
|
|||||||
SocketAddress::from_socket_addr(local_socket_addr),
|
SocketAddress::from_socket_addr(local_socket_addr),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
eprintln!("udp::send_message: {:?}", flow);
|
||||||
|
|
||||||
#[cfg(feature = "verbose-tracing")]
|
#[cfg(feature = "verbose-tracing")]
|
||||||
tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
|
tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
|
||||||
Ok(NetworkResult::value(flow))
|
Ok(NetworkResult::value(flow))
|
||||||
|
@ -319,7 +319,9 @@ impl WebsocketProtocolHandler {
|
|||||||
|
|
||||||
// Make a shared socket
|
// Make a shared socket
|
||||||
let socket = match local_address {
|
let socket = match local_address {
|
||||||
Some(a) => new_bound_shared_tcp_socket(a)?,
|
Some(a) => {
|
||||||
|
new_bound_shared_tcp_socket(a)?.ok_or(io::Error::from(io::ErrorKind::AddrInUse))?
|
||||||
|
}
|
||||||
None => new_unbound_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?,
|
None => new_unbound_tcp_socket(socket2::Domain::for_address(remote_socket_addr))?,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use super::sockets::*;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use lazy_static::*;
|
use lazy_static::*;
|
||||||
|
|
||||||
@ -74,152 +73,29 @@ lazy_static! {
|
|||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) struct NetworkBindSet {
|
||||||
|
pub port: u16,
|
||||||
|
pub addrs: Vec<IpAddr>,
|
||||||
|
pub search: bool,
|
||||||
|
}
|
||||||
|
|
||||||
impl Network {
|
impl Network {
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
// Support for binding first on ports to ensure nobody binds ahead of us
|
|
||||||
// or two copies of the app don't accidentally collide. This is tricky
|
|
||||||
// because we use 'reuseaddr/port' and we can accidentally bind in front of ourselves :P
|
|
||||||
|
|
||||||
fn bind_first_udp_port(&self, udp_port: u16) -> bool {
|
// Returns a port, a set of ip addresses to bind to, and a
|
||||||
let mut inner = self.inner.lock();
|
// bool specifying if multiple ports should be tried
|
||||||
if inner.bound_first_udp.contains_key(&udp_port) {
|
async fn convert_listen_address_to_bind_set(
|
||||||
return true;
|
&self,
|
||||||
}
|
listen_address: String,
|
||||||
|
) -> EyreResult<NetworkBindSet> {
|
||||||
// Check for ipv6
|
|
||||||
let has_v6 = is_ipv6_supported();
|
|
||||||
|
|
||||||
// If the address is specified, only use the specified port and fail otherwise
|
|
||||||
let mut bound_first_socket_v4 = None;
|
|
||||||
let mut bound_first_socket_v6 = None;
|
|
||||||
if let Ok(bfs4) =
|
|
||||||
new_bound_first_udp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port))
|
|
||||||
{
|
|
||||||
if has_v6 {
|
|
||||||
if let Ok(bfs6) = new_bound_first_udp_socket(SocketAddr::new(
|
|
||||||
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
|
|
||||||
udp_port,
|
|
||||||
)) {
|
|
||||||
bound_first_socket_v4 = Some(bfs4);
|
|
||||||
bound_first_socket_v6 = Some(bfs6);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
bound_first_socket_v4 = Some(bfs4);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if bound_first_socket_v4.is_none() && (has_v6 && bound_first_socket_v6.is_none()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(windows)] {
|
|
||||||
// On windows, drop the socket. This is a race condition, but there's
|
|
||||||
// no way around it. This isn't for security anyway, it's to prevent multiple copies of the
|
|
||||||
// app from binding on the same port.
|
|
||||||
inner.bound_first_udp.insert(udp_port, (None, None));
|
|
||||||
} else {
|
|
||||||
inner.bound_first_udp.insert(udp_port, (bound_first_socket_v4, bound_first_socket_v6));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn bind_first_tcp_port(&self, tcp_port: u16) -> bool {
|
|
||||||
let mut inner = self.inner.lock();
|
|
||||||
if inner.bound_first_tcp.contains_key(&tcp_port) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for ipv6
|
|
||||||
let has_v6 = is_ipv6_supported();
|
|
||||||
|
|
||||||
// If the address is specified, only use the specified port and fail otherwise
|
|
||||||
let mut bound_first_socket_v4 = None;
|
|
||||||
let mut bound_first_socket_v6 = None;
|
|
||||||
if let Ok(bfs4) =
|
|
||||||
new_bound_first_tcp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port))
|
|
||||||
{
|
|
||||||
if has_v6 {
|
|
||||||
if let Ok(bfs6) = new_bound_first_tcp_socket(SocketAddr::new(
|
|
||||||
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
|
|
||||||
tcp_port,
|
|
||||||
)) {
|
|
||||||
bound_first_socket_v4 = Some(bfs4);
|
|
||||||
bound_first_socket_v6 = Some(bfs6);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
bound_first_socket_v4 = Some(bfs4);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if bound_first_socket_v4.is_none() && (has_v6 && bound_first_socket_v6.is_none()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(windows)] {
|
|
||||||
// On windows, drop the socket. This is a race condition, but there's
|
|
||||||
// no way around it. This isn't for security anyway, it's to prevent multiple copies of the
|
|
||||||
// app from binding on the same port.
|
|
||||||
inner.bound_first_tcp.insert(tcp_port, (None, None));
|
|
||||||
} else {
|
|
||||||
inner.bound_first_tcp.insert(tcp_port, (bound_first_socket_v4, bound_first_socket_v6));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn free_bound_first_ports(&self) {
|
|
||||||
let mut inner = self.inner.lock();
|
|
||||||
inner.bound_first_udp.clear();
|
|
||||||
inner.bound_first_tcp.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
fn find_available_udp_port(&self, start_port: u16) -> EyreResult<u16> {
|
|
||||||
// If the address is empty, iterate ports until we find one we can use.
|
|
||||||
let mut udp_port = start_port;
|
|
||||||
loop {
|
|
||||||
if BAD_PORTS.contains(&udp_port) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if self.bind_first_udp_port(udp_port) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if udp_port == 65535 {
|
|
||||||
bail!("Could not find free udp port to listen on");
|
|
||||||
}
|
|
||||||
udp_port += 1;
|
|
||||||
}
|
|
||||||
Ok(udp_port)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_available_tcp_port(&self, start_port: u16) -> EyreResult<u16> {
|
|
||||||
// If the address is empty, iterate ports until we find one we can use.
|
|
||||||
let mut tcp_port = start_port;
|
|
||||||
loop {
|
|
||||||
if BAD_PORTS.contains(&tcp_port) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if self.bind_first_tcp_port(tcp_port) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if tcp_port == 65535 {
|
|
||||||
bail!("Could not find free tcp port to listen on");
|
|
||||||
}
|
|
||||||
tcp_port += 1;
|
|
||||||
}
|
|
||||||
Ok(tcp_port)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn allocate_udp_port(&self, listen_address: String) -> EyreResult<(u16, Vec<IpAddr>)> {
|
|
||||||
if listen_address.is_empty() {
|
if listen_address.is_empty() {
|
||||||
// If listen address is empty, find us a port iteratively
|
// If listen address is empty, start with port 5150 and iterate
|
||||||
let port = self.find_available_udp_port(5150)?;
|
|
||||||
let ip_addrs = available_unspecified_addresses();
|
let ip_addrs = available_unspecified_addresses();
|
||||||
Ok((port, ip_addrs))
|
Ok(NetworkBindSet {
|
||||||
|
port: 5150,
|
||||||
|
addrs: ip_addrs,
|
||||||
|
search: true,
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
|
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
|
||||||
// If the address is specified, only use the specified port and fail otherwise
|
// If the address is specified, only use the specified port and fail otherwise
|
||||||
@ -229,58 +105,30 @@ impl Network {
|
|||||||
bail!("No valid listen address: {}", listen_address);
|
bail!("No valid listen address: {}", listen_address);
|
||||||
}
|
}
|
||||||
let port = sockaddrs[0].port();
|
let port = sockaddrs[0].port();
|
||||||
|
if port == 0 {
|
||||||
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
|
Ok(NetworkBindSet {
|
||||||
}
|
port: 5150,
|
||||||
}
|
addrs: sockaddrs.iter().map(|s| s.ip()).collect(),
|
||||||
|
search: true,
|
||||||
async fn allocate_tcp_port(&self, listen_address: String) -> EyreResult<(u16, Vec<IpAddr>)> {
|
})
|
||||||
if listen_address.is_empty() {
|
} else {
|
||||||
// If listen address is empty, find us a port iteratively
|
Ok(NetworkBindSet {
|
||||||
let port = self.find_available_tcp_port(5150)?;
|
port,
|
||||||
let ip_addrs = available_unspecified_addresses();
|
addrs: sockaddrs.iter().map(|s| s.ip()).collect(),
|
||||||
Ok((port, ip_addrs))
|
search: false,
|
||||||
} else {
|
})
|
||||||
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
|
|
||||||
// If the address is specified, only use the specified port and fail otherwise
|
|
||||||
let sockaddrs =
|
|
||||||
listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?;
|
|
||||||
if sockaddrs.is_empty() {
|
|
||||||
bail!("No valid listen address: {}", listen_address);
|
|
||||||
}
|
}
|
||||||
let port = sockaddrs[0].port();
|
|
||||||
|
|
||||||
let mut attempts = 10;
|
|
||||||
let mut success = false;
|
|
||||||
while attempts >= 0 {
|
|
||||||
if self.bind_first_tcp_port(port) {
|
|
||||||
success = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
attempts -= 1;
|
|
||||||
|
|
||||||
// Wait 5 seconds before trying again
|
|
||||||
log_net!(debug
|
|
||||||
"Binding TCP port at {} failed, waiting. Attempts remaining = {}",
|
|
||||||
port, attempts
|
|
||||||
);
|
|
||||||
sleep(5000).await
|
|
||||||
}
|
|
||||||
if !success {
|
|
||||||
bail!("Could not find free tcp port to listen on");
|
|
||||||
}
|
|
||||||
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
|
|
||||||
pub(super) async fn start_udp_listeners(
|
pub(super) async fn bind_udp_protocol_handlers(
|
||||||
&self,
|
&self,
|
||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
log_net!("starting udp listeners");
|
log_net!("UDP: binding protocol handlers");
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let (listen_address, public_address, detect_address_changes) = {
|
let (listen_address, public_address, detect_address_changes) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -291,28 +139,30 @@ impl Network {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Pick out UDP port we're going to use everywhere
|
// Get the binding parameters from the user-specified listen address
|
||||||
// Keep sockets around until the end of this function
|
let bind_set = self
|
||||||
// to keep anyone else from binding in front of us
|
.convert_listen_address_to_bind_set(listen_address.clone())
|
||||||
let (udp_port, ip_addrs) = self.allocate_udp_port(listen_address.clone()).await?;
|
.await?;
|
||||||
|
|
||||||
// Save the bound udp port for use later on
|
|
||||||
self.inner.lock().udp_port = udp_port;
|
|
||||||
|
|
||||||
// First, create outbound sockets
|
|
||||||
// (unlike tcp where we create sockets for every connection)
|
|
||||||
// and we'll add protocol handlers for them too
|
|
||||||
self.create_udp_outbound_sockets().await?;
|
|
||||||
|
|
||||||
// Now create udp inbound sockets for whatever interfaces we're listening on
|
// Now create udp inbound sockets for whatever interfaces we're listening on
|
||||||
info!(
|
if bind_set.search {
|
||||||
"UDP: starting listeners on port {} at {:?}",
|
info!(
|
||||||
udp_port, ip_addrs
|
"UDP: searching for free port starting with {} on {:?}",
|
||||||
);
|
bind_set.port, bind_set.addrs
|
||||||
let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?;
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"UDP: binding protocol handlers at port {} on {:?}",
|
||||||
|
bind_set.port, bind_set.addrs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?;
|
||||||
let mut static_public = false;
|
let mut static_public = false;
|
||||||
|
|
||||||
log_net!("UDP: listener started on {:#?}", local_dial_info_list);
|
log_net!(
|
||||||
|
"UDP: protocol handlers bound to {:#?}",
|
||||||
|
local_dial_info_list
|
||||||
|
);
|
||||||
|
|
||||||
// Register local dial info
|
// Register local dial info
|
||||||
for di in &local_dial_info_list {
|
for di in &local_dial_info_list {
|
||||||
@ -374,6 +224,8 @@ impl Network {
|
|||||||
.insert(ProtocolType::UDP);
|
.insert(ProtocolType::UDP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// xxx compile all dialinfo from editor into map of protocoltype+addresstype -> port for 'best port selection' code
|
||||||
|
|
||||||
// Now create tasks for udp listeners
|
// Now create tasks for udp listeners
|
||||||
self.create_udp_listener_tasks().await
|
self.create_udp_listener_tasks().await
|
||||||
}
|
}
|
||||||
@ -383,7 +235,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
log_net!("starting ws listeners");
|
log_net!("WS: binding protocol handlers");
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let (listen_address, url, path, detect_address_changes) = {
|
let (listen_address, url, path, detect_address_changes) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -395,27 +247,30 @@ impl Network {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Pick out TCP port we're going to use everywhere
|
// Get the binding parameters from the user-specified listen address
|
||||||
// Keep sockets around until the end of this function
|
let bind_set = self
|
||||||
// to keep anyone else from binding in front of us
|
.convert_listen_address_to_bind_set(listen_address.clone())
|
||||||
let (ws_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?;
|
.await?;
|
||||||
|
|
||||||
// Save the bound ws port for use later on
|
if bind_set.search {
|
||||||
self.inner.lock().ws_port = ws_port;
|
info!(
|
||||||
|
"WS: searching for free port starting with {} on {:?}",
|
||||||
info!(
|
bind_set.port, bind_set.addrs
|
||||||
"WS: starting listener on port {} at {:?}",
|
);
|
||||||
ws_port, ip_addrs
|
} else {
|
||||||
);
|
info!(
|
||||||
|
"WS: binding protocol handlers at port {} on {:?}",
|
||||||
|
bind_set.port, bind_set.addrs
|
||||||
|
);
|
||||||
|
}
|
||||||
let socket_addresses = self
|
let socket_addresses = self
|
||||||
.start_tcp_listener(
|
.start_tcp_listener(
|
||||||
ip_addrs,
|
bind_set,
|
||||||
ws_port,
|
|
||||||
false,
|
false,
|
||||||
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
log_net!("WS: listener started on {:#?}", socket_addresses);
|
log_net!("WS: protocol handlers started on {:#?}", socket_addresses);
|
||||||
|
|
||||||
let mut static_public = false;
|
let mut static_public = false;
|
||||||
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
||||||
@ -493,7 +348,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
log_net!("starting wss listeners");
|
log_net!("WSS: binding protocol handlers");
|
||||||
|
|
||||||
let (listen_address, url, detect_address_changes) = {
|
let (listen_address, url, detect_address_changes) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -504,27 +359,31 @@ impl Network {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Pick out TCP port we're going to use everywhere
|
// Get the binding parameters from the user-specified listen address
|
||||||
// Keep sockets around until the end of this function
|
let bind_set = self
|
||||||
// to keep anyone else from binding in front of us
|
.convert_listen_address_to_bind_set(listen_address.clone())
|
||||||
let (wss_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?;
|
.await?;
|
||||||
|
|
||||||
// Save the bound wss port for use later on
|
if bind_set.search {
|
||||||
self.inner.lock().wss_port = wss_port;
|
info!(
|
||||||
|
"WSS: searching for free port starting with {} on {:?}",
|
||||||
|
bind_set.port, bind_set.addrs
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"WSS: binding protocol handlers at port {} on {:?}",
|
||||||
|
bind_set.port, bind_set.addrs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
|
||||||
"WSS: starting listener on port {} at {:?}",
|
|
||||||
wss_port, ip_addrs
|
|
||||||
);
|
|
||||||
let socket_addresses = self
|
let socket_addresses = self
|
||||||
.start_tcp_listener(
|
.start_tcp_listener(
|
||||||
ip_addrs,
|
bind_set,
|
||||||
wss_port,
|
|
||||||
true,
|
true,
|
||||||
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
log_net!("WSS: listener started on {:#?}", socket_addresses);
|
log_net!("WSS: protocol handlers started on {:#?}", socket_addresses);
|
||||||
|
|
||||||
// NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS
|
// NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS
|
||||||
// If the hostname is specified, it is the public dialinfo via the URL. If no hostname
|
// If the hostname is specified, it is the public dialinfo via the URL. If no hostname
|
||||||
@ -586,7 +445,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
log_net!("starting tcp listeners");
|
log_net!("TCP: binding protocol handlers");
|
||||||
|
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let (listen_address, public_address, detect_address_changes) = {
|
let (listen_address, public_address, detect_address_changes) = {
|
||||||
@ -598,27 +457,30 @@ impl Network {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Pick out TCP port we're going to use everywhere
|
// Get the binding parameters from the user-specified listen address
|
||||||
// Keep sockets around until the end of this function
|
let bind_set = self
|
||||||
// to keep anyone else from binding in front of us
|
.convert_listen_address_to_bind_set(listen_address.clone())
|
||||||
let (tcp_port, ip_addrs) = self.allocate_tcp_port(listen_address.clone()).await?;
|
.await?;
|
||||||
|
|
||||||
// Save the bound tcp port for use later on
|
if bind_set.search {
|
||||||
self.inner.lock().tcp_port = tcp_port;
|
info!(
|
||||||
|
"TCP: searching for free port starting with {} on {:?}",
|
||||||
info!(
|
bind_set.port, bind_set.addrs
|
||||||
"TCP: starting listener on port {} at {:?}",
|
);
|
||||||
tcp_port, ip_addrs
|
} else {
|
||||||
);
|
info!(
|
||||||
|
"TCP: binding protocol handlers at port {} on {:?}",
|
||||||
|
bind_set.port, bind_set.addrs
|
||||||
|
);
|
||||||
|
}
|
||||||
let socket_addresses = self
|
let socket_addresses = self
|
||||||
.start_tcp_listener(
|
.start_tcp_listener(
|
||||||
ip_addrs,
|
bind_set,
|
||||||
tcp_port,
|
|
||||||
false,
|
false,
|
||||||
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
|
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
log_net!("TCP: listener started on {:#?}", socket_addresses);
|
log_net!("TCP: protocol handlers started on {:#?}", socket_addresses);
|
||||||
|
|
||||||
let mut static_public = false;
|
let mut static_public = false;
|
||||||
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
||||||
|
@ -458,7 +458,7 @@ packages:
|
|||||||
path: ".."
|
path: ".."
|
||||||
relative: true
|
relative: true
|
||||||
source: path
|
source: path
|
||||||
version: "0.3.0"
|
version: "0.3.1"
|
||||||
vm_service:
|
vm_service:
|
||||||
dependency: transitive
|
dependency: transitive
|
||||||
description:
|
description:
|
||||||
|
@ -274,12 +274,17 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<Socket
|
|||||||
} else if let Ok(port) = listen_address.parse::<u16>() {
|
} else if let Ok(port) = listen_address.parse::<u16>() {
|
||||||
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
|
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
|
||||||
} else {
|
} else {
|
||||||
|
let listen_address_with_port = if listen_address.contains(':') {
|
||||||
|
listen_address.to_string()
|
||||||
|
} else {
|
||||||
|
format!("{}:0", listen_address)
|
||||||
|
};
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(target_arch = "wasm32")] {
|
if #[cfg(target_arch = "wasm32")] {
|
||||||
use core::str::FromStr;
|
use core::str::FromStr;
|
||||||
vec![SocketAddr::from_str(listen_address).map_err(|e| format!("Unable to parse address: {}",e))?]
|
vec![SocketAddr::from_str(listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?]
|
||||||
} else {
|
} else {
|
||||||
listen_address
|
listen_address_with_port
|
||||||
.to_socket_addrs()
|
.to_socket_addrs()
|
||||||
.map_err(|e| format!("Unable to resolve address: {}", e))?
|
.map_err(|e| format!("Unable to resolve address: {}", e))?
|
||||||
.collect()
|
.collect()
|
||||||
|
Loading…
Reference in New Issue
Block a user