diff --git a/Cargo.lock b/Cargo.lock index 0e23de54..b3ffdc12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6524,6 +6524,7 @@ dependencies = [ "cfg-if 1.0.0", "chrono", "console_error_panic_hook", + "ctrlc", "eyre", "flume", "fn_name", diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index bbb0dcf6..993d87a7 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -2,6 +2,7 @@ /// Also performs UPNP/IGD mapping if enabled and possible use super::*; use futures_util::stream::FuturesUnordered; +use igd_manager::{IGDAddressType, IGDProtocolType}; const PORT_MAP_VALIDATE_TRY_COUNT: usize = 3; const PORT_MAP_VALIDATE_DELAY_MS: u32 = 500; @@ -318,7 +319,15 @@ impl DiscoveryContext { let address_type = self.unlocked_inner.config.address_type; let local_port = self.unlocked_inner.config.port; - let low_level_protocol_type = protocol_type.low_level_protocol_type(); + let igd_protocol_type = match protocol_type.low_level_protocol_type() { + LowLevelProtocolType::UDP => IGDProtocolType::UDP, + LowLevelProtocolType::TCP => IGDProtocolType::TCP, + }; + let igd_address_type = match address_type { + AddressType::IPV6 => IGDAddressType::IPV6, + AddressType::IPV4 => IGDAddressType::IPV4, + }; + let external_1 = self.inner.lock().external_info.first().unwrap().clone(); let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone(); @@ -329,8 +338,8 @@ impl DiscoveryContext { // Attempt a port mapping. If this doesn't succeed, it's not going to let mapped_external_address = igd_manager .map_any_port( - low_level_protocol_type, - address_type, + igd_protocol_type, + igd_address_type, local_port, Some(external_1.address.ip_addr()), ) @@ -361,10 +370,7 @@ impl DiscoveryContext { if validate_tries != PORT_MAP_VALIDATE_TRY_COUNT { log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n", - local_port, match low_level_protocol_type { - LowLevelProtocolType::UDP => "udp", - LowLevelProtocolType::TCP => "tcp", - }); + local_port, igd_protocol_type); sleep(PORT_MAP_VALIDATE_DELAY_MS).await } else { break; @@ -374,18 +380,15 @@ impl DiscoveryContext { // Release the mapping if we're still unreachable let _ = igd_manager .unmap_port( - low_level_protocol_type, - address_type, + igd_protocol_type, + igd_address_type, external_1.address.port(), ) .await; if tries == PORT_MAP_TRY_COUNT { warn!("UPNP port mapping succeeded but port {}/{} is still unreachable.\nYou may need to add a local firewall allowed port on this machine.\n", - local_port, match low_level_protocol_type { - LowLevelProtocolType::UDP => "udp", - LowLevelProtocolType::TCP => "tcp", - } + local_port, igd_protocol_type ); break; } diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index e40c47a4..3aa288b3 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -5,13 +5,12 @@ use std::net::UdpSocket; const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000; const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000; const UPNP_MAPPING_ATTEMPTS: u32 = 3; -const UPNP_MAPPING_LIFETIME_US: TimestampDuration = - TimestampDuration::new(UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64); +const UPNP_MAPPING_LIFETIME_US: u64 = UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] struct PortMapKey { - llpt: LowLevelProtocolType, - at: AddressType, + protocol_type: IGDProtocolType, + address_type: IGDAddressType, local_port: u16, } @@ -19,36 +18,67 @@ struct PortMapKey { struct PortMapValue { ext_ip: IpAddr, mapped_port: u16, - timestamp: Timestamp, - renewal_lifetime: TimestampDuration, + timestamp: u64, + renewal_lifetime: u64, renewal_attempts: u32, } struct IGDManagerInner { - local_ip_addrs: BTreeMap, + local_ip_addrs: BTreeMap, gateways: BTreeMap>, port_maps: BTreeMap, } #[derive(Clone)] pub struct IGDManager { - config: VeilidConfig, + program_name: String, inner: Arc>, } -fn convert_llpt(llpt: LowLevelProtocolType) -> PortMappingProtocol { - match llpt { - LowLevelProtocolType::UDP => PortMappingProtocol::UDP, - LowLevelProtocolType::TCP => PortMappingProtocol::TCP, +fn convert_protocol_type(igdpt: IGDProtocolType) -> PortMappingProtocol { + match igdpt { + IGDProtocolType::UDP => PortMappingProtocol::UDP, + IGDProtocolType::TCP => PortMappingProtocol::TCP, + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum IGDAddressType { + IPV6, + IPV4, +} + +impl fmt::Display for IGDAddressType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IGDAddressType::IPV6 => write!(f, "IPV6"), + IGDAddressType::IPV4 => write!(f, "IPV4"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum IGDProtocolType { + UDP, + TCP, +} + +impl fmt::Display for IGDProtocolType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IGDProtocolType::UDP => write!(f, "UDP"), + IGDProtocolType::TCP => write!(f, "TCP"), + } } } impl IGDManager { - // + ///////////////////////////////////////////////////////////////////// + // Public Interface - pub fn new(config: VeilidConfig) -> Self { + pub fn new(program_name: String) -> Self { Self { - config, + program_name, inner: Arc::new(Mutex::new(IGDManagerInner { local_ip_addrs: BTreeMap::new(), gateways: BTreeMap::new(), @@ -58,10 +88,306 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn get_routed_local_ip_address(address_type: AddressType) -> Option { + pub async fn unmap_port( + &self, + protocol_type: IGDProtocolType, + address_type: IGDAddressType, + mapped_port: u16, + ) -> Option<()> { + let this = self.clone(); + blocking_wrapper( + "igd unmap_port", + move || { + let mut inner = this.inner.lock(); + + // If we already have this port mapped, just return the existing portmap + let mut found = None; + for (pmk, pmv) in &inner.port_maps { + if pmk.protocol_type == protocol_type + && pmk.address_type == address_type + && pmv.mapped_port == mapped_port + { + found = Some(*pmk); + break; + } + } + let pmk = found?; + let _pmv = inner + .port_maps + .remove(&pmk) + .expect("key found but remove failed"); + + // Get local ip address + let local_ip = Self::find_local_ip(&mut inner, address_type)?; + + // Find gateway + let gw = Self::find_gateway(&mut inner, local_ip)?; + + // Unmap port + match gw.remove_port(convert_protocol_type(protocol_type), mapped_port) { + Ok(()) => (), + Err(e) => { + // Failed to map external port + log_net!(debug "upnp failed to remove external port: {}", e); + return None; + } + }; + Some(()) + }, + None, + ) + .await + } + + #[instrument(level = "trace", target = "net", skip_all)] + pub async fn map_any_port( + &self, + protocol_type: IGDProtocolType, + address_type: IGDAddressType, + local_port: u16, + expected_external_address: Option, + ) -> Option { + let this = self.clone(); + blocking_wrapper("igd map_any_port", move || { + let mut inner = this.inner.lock(); + + // If we already have this port mapped, just return the existing portmap + let pmkey = PortMapKey { + protocol_type, + address_type, + local_port, + }; + if let Some(pmval) = inner.port_maps.get(&pmkey) { + return Some(SocketAddr::new(pmval.ext_ip, pmval.mapped_port)); + } + + // Get local ip address + let local_ip = Self::find_local_ip(&mut inner, address_type)?; + + // Find gateway + let gw = Self::find_gateway(&mut inner, local_ip)?; + + // Get external address + let ext_ip = match gw.get_external_ip() { + Ok(ip) => ip, + Err(e) => { + log_net!(debug "couldn't get external ip from igd: {}", e); + return None; + } + }; + + // Ensure external IP matches address type + if ext_ip.is_ipv4() && address_type != IGDAddressType::IPV4 { + log_net!(debug "mismatched ip address type from igd, wanted v4, got v6"); + return None; + } else if ext_ip.is_ipv6() && address_type != IGDAddressType::IPV6 { + log_net!(debug "mismatched ip address type from igd, wanted v6, got v4"); + return None; + } + + if let Some(expected_external_address) = expected_external_address { + if ext_ip != expected_external_address { + log_net!(debug "gateway external address does not match calculated external address: expected={} vs gateway={}", expected_external_address, ext_ip); + return None; + } + } + + // Map any port + let desc = this.get_description(protocol_type, local_port); + let mapped_port = match gw.add_any_port(convert_protocol_type(protocol_type), SocketAddr::new(local_ip, local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) { + Ok(mapped_port) => mapped_port, + Err(e) => { + // Failed to map external port + log_net!(debug "upnp failed to map external port: {}", e); + return None; + } + }; + + // Add to mapping list to keep alive + let timestamp = get_timestamp(); + inner.port_maps.insert(PortMapKey { + protocol_type, + address_type, + local_port, + }, PortMapValue { + ext_ip, + mapped_port, + timestamp, + renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64, + renewal_attempts: 0, + }); + + // Succeeded, return the externally mapped port + Some(SocketAddr::new(ext_ip, mapped_port)) + }, None) + .await + } + + #[instrument( + level = "trace", + target = "net", + name = "IGDManager::tick", + skip_all, + err + )] + pub async fn tick(&self) -> EyreResult { + // Refresh mappings if we have them + // If an error is received, then return false to restart the local network + let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); + let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); + { + let inner = self.inner.lock(); + let now = get_timestamp(); + + for (k, v) in &inner.port_maps { + let mapping_lifetime = now.saturating_sub(v.timestamp); + if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US + || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS + { + // Past expiration time or tried N times, do a full renew and fail out if we can't + full_renews.push((*k, *v)); + } else if mapping_lifetime >= v.renewal_lifetime { + // Attempt a normal renewal + renews.push((*k, *v)); + } + } + + // See if we need to do some blocking operations + if full_renews.is_empty() && renews.is_empty() { + // Just return now since there's nothing to renew + return Ok(true); + } + } + + let this = self.clone(); + blocking_wrapper( + "igd tick", + move || { + let mut inner = this.inner.lock(); + + // Process full renewals + for (k, v) in full_renews { + // Get local ip for address type + let local_ip = match Self::get_local_ip(&mut inner, k.address_type) { + Some(ip) => ip, + None => { + return Err(eyre!("local ip missing for address type")); + } + }; + + // Get gateway for interface + let gw = match Self::get_gateway(&mut inner, local_ip) { + Some(gw) => gw, + None => { + return Err(eyre!("gateway missing for interface")); + } + }; + + // Delete the mapping if it exists, ignore any errors here + let _ = gw.remove_port(convert_protocol_type(k.protocol_type), v.mapped_port); + inner.port_maps.remove(&k); + + let desc = this.get_description(k.protocol_type, k.local_port); + match gw.add_any_port( + convert_protocol_type(k.protocol_type), + SocketAddr::new(local_ip, k.local_port), + (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, + &desc, + ) { + Ok(mapped_port) => { + log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k); + inner.port_maps.insert( + k, + PortMapValue { + ext_ip: v.ext_ip, + mapped_port, + timestamp: get_timestamp(), + renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 + * 1000u64, + renewal_attempts: 0, + }, + ); + } + Err(e) => { + info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e); + + // Must restart network now :( + return Ok(false); + } + }; + } + // Process normal renewals + for (k, mut v) in renews { + // Get local ip for address type + let local_ip = match Self::get_local_ip(&mut inner, k.address_type) { + Some(ip) => ip, + None => { + return Err(eyre!("local ip missing for address type")); + } + }; + + // Get gateway for interface + let gw = match Self::get_gateway(&mut inner, local_ip) { + Some(gw) => gw, + None => { + return Err(eyre!("gateway missing for address type")); + } + }; + + let desc = this.get_description(k.protocol_type, k.local_port); + match gw.add_port( + convert_protocol_type(k.protocol_type), + v.mapped_port, + SocketAddr::new(local_ip, k.local_port), + (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, + &desc, + ) { + Ok(()) => { + log_net!("renewed mapped port {:?} -> {:?}", v, k); + + inner.port_maps.insert( + k, + PortMapValue { + ext_ip: v.ext_ip, + mapped_port: v.mapped_port, + timestamp: get_timestamp(), + renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 + * 1000u64, + renewal_attempts: 0, + }, + ); + } + Err(e) => { + log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e); + + // Get closer to the maximum renewal timeline by a factor of two each time + v.renewal_lifetime = + (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64; + v.renewal_attempts += 1; + + // Store new value to try again + inner.port_maps.insert(k, v); + } + }; + } + + // Normal exit, no restart + Ok(true) + }, + Err(eyre!("failed to process blocking task")), + ) + .instrument(tracing::trace_span!("igd tick fut")) + .await + } + + ///////////////////////////////////////////////////////////////////// + // Private Implementation + + #[instrument(level = "trace", target = "net", skip_all)] + fn get_routed_local_ip_address(address_type: IGDAddressType) -> Option { let socket = match UdpSocket::bind(match address_type { - AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), - AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), + IGDAddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), + IGDAddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), }) { Ok(s) => s, Err(e) => { @@ -75,8 +401,8 @@ impl IGDManager { // using google's dns, but it wont actually send any packets to it socket .connect(match address_type { - AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 80), - AddressType::IPV6 => SocketAddr::new( + IGDAddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 80), + IGDAddressType::IPV6 => SocketAddr::new( IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)), 80, ), @@ -91,7 +417,7 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn find_local_ip(inner: &mut IGDManagerInner, address_type: AddressType) -> Option { + fn find_local_ip(inner: &mut IGDManagerInner, address_type: IGDAddressType) -> Option { if let Some(ip) = inner.local_ip_addrs.get(&address_type) { return Some(*ip); } @@ -109,7 +435,7 @@ impl IGDManager { } #[instrument(level = "trace", target = "net", skip_all)] - fn get_local_ip(inner: &mut IGDManagerInner, address_type: AddressType) -> Option { + fn get_local_ip(inner: &mut IGDManagerInner, address_type: IGDAddressType) -> Option { if let Some(ip) = inner.local_ip_addrs.get(&address_type) { return Some(*ip); } @@ -164,304 +490,10 @@ impl IGDManager { None } - fn get_description(&self, llpt: LowLevelProtocolType, local_port: u16) -> String { + fn get_description(&self, protocol_type: IGDProtocolType, local_port: u16) -> String { format!( "{} map {} for port {}", - self.config.get().program_name, - convert_llpt(llpt), - local_port + self.program_name, protocol_type, local_port ) } - - #[instrument(level = "trace", target = "net", skip_all)] - pub async fn unmap_port( - &self, - llpt: LowLevelProtocolType, - at: AddressType, - mapped_port: u16, - ) -> Option<()> { - let this = self.clone(); - blocking_wrapper( - "igd unmap_port", - move || { - let mut inner = this.inner.lock(); - - // If we already have this port mapped, just return the existing portmap - let mut found = None; - for (pmk, pmv) in &inner.port_maps { - if pmk.llpt == llpt && pmk.at == at && pmv.mapped_port == mapped_port { - found = Some(*pmk); - break; - } - } - let pmk = found?; - let _pmv = inner - .port_maps - .remove(&pmk) - .expect("key found but remove failed"); - - // Get local ip address - let local_ip = Self::find_local_ip(&mut inner, at)?; - - // Find gateway - let gw = Self::find_gateway(&mut inner, local_ip)?; - - // Unmap port - match gw.remove_port(convert_llpt(llpt), mapped_port) { - Ok(()) => (), - Err(e) => { - // Failed to map external port - log_net!(debug "upnp failed to remove external port: {}", e); - return None; - } - }; - Some(()) - }, - None, - ) - .await - } - - #[instrument(level = "trace", target = "net", skip_all)] - pub async fn map_any_port( - &self, - llpt: LowLevelProtocolType, - at: AddressType, - local_port: u16, - expected_external_address: Option, - ) -> Option { - let this = self.clone(); - blocking_wrapper("igd map_any_port", move || { - let mut inner = this.inner.lock(); - - // If we already have this port mapped, just return the existing portmap - let pmkey = PortMapKey { - llpt, - at, - local_port, - }; - if let Some(pmval) = inner.port_maps.get(&pmkey) { - return Some(SocketAddr::new(pmval.ext_ip, pmval.mapped_port)); - } - - // Get local ip address - let local_ip = Self::find_local_ip(&mut inner, at)?; - - // Find gateway - let gw = Self::find_gateway(&mut inner, local_ip)?; - - // Get external address - let ext_ip = match gw.get_external_ip() { - Ok(ip) => ip, - Err(e) => { - log_net!(debug "couldn't get external ip from igd: {}", e); - return None; - } - }; - - // Ensure external IP matches address type - if ext_ip.is_ipv4() && at != AddressType::IPV4 { - log_net!(debug "mismatched ip address type from igd, wanted v4, got v6"); - return None; - } else if ext_ip.is_ipv6() && at != AddressType::IPV6 { - log_net!(debug "mismatched ip address type from igd, wanted v6, got v4"); - return None; - } - - if let Some(expected_external_address) = expected_external_address { - if ext_ip != expected_external_address { - log_net!(debug "gateway external address does not match calculated external address: expected={} vs gateway={}", expected_external_address, ext_ip); - return None; - } - } - - // Map any port - let desc = this.get_description(llpt, local_port); - let mapped_port = match gw.add_any_port(convert_llpt(llpt), SocketAddr::new(local_ip, local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) { - Ok(mapped_port) => mapped_port, - Err(e) => { - // Failed to map external port - log_net!(debug "upnp failed to map external port: {}", e); - return None; - } - }; - - // Add to mapping list to keep alive - let timestamp = Timestamp::now(); - inner.port_maps.insert(PortMapKey { - llpt, - at, - local_port, - }, PortMapValue { - ext_ip, - mapped_port, - timestamp, - renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(), - renewal_attempts: 0, - }); - - // Succeeded, return the externally mapped port - Some(SocketAddr::new(ext_ip, mapped_port)) - }, None) - .await - } - - #[instrument( - level = "trace", - target = "net", - name = "IGDManager::tick", - skip_all, - err - )] - pub async fn tick(&self) -> EyreResult { - // Refresh mappings if we have them - // If an error is received, then return false to restart the local network - let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); - let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); - { - let inner = self.inner.lock(); - let now = Timestamp::now(); - - for (k, v) in &inner.port_maps { - let mapping_lifetime = now.saturating_sub(v.timestamp); - if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US - || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS - { - // Past expiration time or tried N times, do a full renew and fail out if we can't - full_renews.push((*k, *v)); - } else if mapping_lifetime >= v.renewal_lifetime { - // Attempt a normal renewal - renews.push((*k, *v)); - } - } - - // See if we need to do some blocking operations - if full_renews.is_empty() && renews.is_empty() { - // Just return now since there's nothing to renew - return Ok(true); - } - } - - let this = self.clone(); - blocking_wrapper( - "igd tick", - move || { - let mut inner = this.inner.lock(); - - // Process full renewals - for (k, v) in full_renews { - // Get local ip for address type - let local_ip = match Self::get_local_ip(&mut inner, k.at) { - Some(ip) => ip, - None => { - return Err(eyre!("local ip missing for address type")); - } - }; - - // Get gateway for interface - let gw = match Self::get_gateway(&mut inner, local_ip) { - Some(gw) => gw, - None => { - return Err(eyre!("gateway missing for interface")); - } - }; - - // Delete the mapping if it exists, ignore any errors here - let _ = gw.remove_port(convert_llpt(k.llpt), v.mapped_port); - inner.port_maps.remove(&k); - - let desc = this.get_description(k.llpt, k.local_port); - match gw.add_any_port( - convert_llpt(k.llpt), - SocketAddr::new(local_ip, k.local_port), - (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, - &desc, - ) { - Ok(mapped_port) => { - log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k); - inner.port_maps.insert( - k, - PortMapValue { - ext_ip: v.ext_ip, - mapped_port, - timestamp: Timestamp::now(), - renewal_lifetime: TimestampDuration::new( - (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64, - ), - renewal_attempts: 0, - }, - ); - } - Err(e) => { - info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e); - - // Must restart network now :( - return Ok(false); - } - }; - } - // Process normal renewals - for (k, mut v) in renews { - // Get local ip for address type - let local_ip = match Self::get_local_ip(&mut inner, k.at) { - Some(ip) => ip, - None => { - return Err(eyre!("local ip missing for address type")); - } - }; - - // Get gateway for interface - let gw = match Self::get_gateway(&mut inner, local_ip) { - Some(gw) => gw, - None => { - return Err(eyre!("gateway missing for address type")); - } - }; - - let desc = this.get_description(k.llpt, k.local_port); - match gw.add_port( - convert_llpt(k.llpt), - v.mapped_port, - SocketAddr::new(local_ip, k.local_port), - (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, - &desc, - ) { - Ok(()) => { - log_net!("renewed mapped port {:?} -> {:?}", v, k); - - inner.port_maps.insert( - k, - PortMapValue { - ext_ip: v.ext_ip, - mapped_port: v.mapped_port, - timestamp: Timestamp::now(), - renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 - * 1000u64) - .into(), - renewal_attempts: 0, - }, - ); - } - Err(e) => { - log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e); - - // Get closer to the maximum renewal timeline by a factor of two each time - v.renewal_lifetime = - (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64; - v.renewal_attempts += 1; - - // Store new value to try again - inner.port_maps.insert(k, v); - } - }; - } - - // Normal exit, no restart - Ok(true) - }, - Err(eyre!("failed to process blocking task")), - ) - .instrument(tracing::trace_span!("igd tick fut")) - .await - } } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 5308cbb8..60b26d0a 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -167,6 +167,7 @@ impl Network { connection_manager: ConnectionManager, ) -> NetworkUnlockedInner { let config = network_manager.config(); + let program_name = config.get().program_name.clone(); NetworkUnlockedInner { startup_lock: StartupLock::new(), network_manager, @@ -183,7 +184,7 @@ impl Network { ), upnp_task: TickTask::new("upnp_task", UPNP_TASK_TICK_PERIOD_SECS), network_task_lock: AsyncMutex::new(()), - igd_manager: igd_manager::IGDManager::new(config.clone()), + igd_manager: igd_manager::IGDManager::new(program_name), } } diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index b43a3fa3..2fe0d7cd 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -79,6 +79,7 @@ flume = { version = "0.11.0", features = ["async"] } async-io = { version = "1.13.0" } async-std = { version = "1.12.0", features = ["unstable"], optional = true } chrono = "0.4.38" +ctrlc = "^3" futures-util = { version = "0.3.30", default-features = false, features = [ "async-await", "sink", diff --git a/veilid-tools/src/bin/virtual_router.rs b/veilid-tools/src/bin/virtual_router.rs new file mode 100644 index 00000000..0d4ccf73 --- /dev/null +++ b/veilid-tools/src/bin/virtual_router.rs @@ -0,0 +1,45 @@ +#![cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] + +use cfg_if::*; +use parking_lot::*; +use stop_token::StopSource; +use veilid_tools::*; + +const VERSION: &str = env!("CARGO_PKG_VERSION"); + +cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub fn block_on, T>(f: F) -> T { + async_std::task::block_on(f) + } + } else if #[cfg(feature="rt-tokio")] { + pub fn block_on, T>(f: F) -> T { + let rt = tokio::runtime::Runtime::new().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, f) + } + } else { + compile_error!("needs executor implementation"); + } +} + +fn main() -> Result<(), String> { + let stop_source = StopSource::new(); + let stop_token = stop_source.token(); + let stop_mutex = Mutex::new(Some(stop_source)); + + ctrlc::set_handler(move || { + *(stop_mutex.lock()) = None; + }) + .expect("Error setting Ctrl-C handler"); + + block_on(async { + println!("Veilid VirtualRouter v{}", VERSION); + + let router_server = virtual_network::RouterServer::new(); + router_server + .run(stop_token) + .await + .map_err(|e| e.to_string()) + }) +} diff --git a/veilid-tools/src/virtual_network/mod.rs b/veilid-tools/src/virtual_network/mod.rs index 069caa9c..04ee90d8 100644 --- a/veilid-tools/src/virtual_network/mod.rs +++ b/veilid-tools/src/virtual_network/mod.rs @@ -27,6 +27,7 @@ //! [VirtualUdpSocket] //! [VirtualTcpListener] //! [VirtualTcpListenerStream] +//! [VirtualGateway] //! [VirtualWsMeta] //! [VirtualWsStream] //! @@ -42,8 +43,10 @@ mod machine; mod router_client; mod router_op_table; +#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] mod router_server; mod serde_io_error; +mod virtual_gateway; mod virtual_network_error; mod virtual_tcp_listener; mod virtual_tcp_listener_stream; @@ -54,7 +57,9 @@ use super::*; pub use machine::*; pub use router_client::*; +#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] pub use router_server::*; +pub use virtual_gateway::*; pub use virtual_network_error::*; pub use virtual_tcp_listener::*; pub use virtual_tcp_listener_stream::*; diff --git a/veilid-tools/src/virtual_network/router_client.rs b/veilid-tools/src/virtual_network/router_client.rs index d6691639..6b6f2b09 100644 --- a/veilid-tools/src/virtual_network/router_client.rs +++ b/veilid-tools/src/virtual_network/router_client.rs @@ -6,7 +6,6 @@ use futures_util::{ }; use postcard::{from_bytes, to_stdvec}; use router_op_table::*; -use serde::*; use std::io; use stop_token::future::FutureExt as _; @@ -25,7 +24,7 @@ impl fmt::Debug for RouterClientInner { } struct RouterClientUnlockedInner { - sender: flume::Sender, + sender: flume::Sender, next_message_id: AtomicU64, router_op_waiter: RouterOpWaiter, } @@ -40,145 +39,6 @@ impl fmt::Debug for RouterClientUnlockedInner { } } -pub type MessageId = u64; -pub type SocketId = u64; - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -enum ServerProcessorRequest { - AllocateMachine, - ReleaseMachine { - machine_id: MachineId, - }, - GetInterfaces { - machine_id: MachineId, - }, - TcpConnect { - machine_id: MachineId, - local_address: Option, - remote_address: SocketAddr, - timeout_ms: u32, - options: VirtualTcpOptions, - }, - TcpBind { - machine_id: MachineId, - local_address: Option, - options: VirtualTcpOptions, - }, - TcpAccept { - machine_id: MachineId, - listen_socket_id: SocketId, - }, - TcpShutdown { - machine_id: MachineId, - socket_id: SocketId, - }, - UdpBind { - machine_id: MachineId, - local_address: Option, - options: VirtualUdpOptions, - }, - Send { - machine_id: MachineId, - socket_id: SocketId, - data: Vec, - }, - SendTo { - machine_id: MachineId, - socket_id: SocketId, - remote_address: SocketAddr, - data: Vec, - }, - Recv { - machine_id: MachineId, - socket_id: u64, - len: u32, - }, - RecvFrom { - machine_id: MachineId, - socket_id: u64, - len: u32, - }, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -struct ServerProcessorMessage { - message_id: MessageId, - request: ServerProcessorRequest, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -enum ServerProcessorCommand { - Message(ServerProcessorMessage), - CloseSocket { - machine_id: MachineId, - socket_id: SocketId, - }, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -enum ServerProcessorReplyValue { - AllocateMachine { - machine_id: MachineId, - }, - ReleaseMachine, - GetInterfaces { - interfaces: BTreeMap, - }, - TcpConnect { - socket_id: SocketId, - local_address: SocketAddr, - }, - TcpBind { - socket_id: SocketId, - local_address: SocketAddr, - }, - TcpAccept { - socket_id: SocketId, - address: SocketAddr, - }, - TcpShutdown, - UdpBind { - socket_id: SocketId, - local_address: SocketAddr, - }, - Send { - len: u32, - }, - SendTo { - len: u32, - }, - Recv { - data: Vec, - }, - RecvFrom { - remote_address: SocketAddr, - data: Vec, - }, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -enum ServerProcessorReplyResult { - Value(ServerProcessorReplyValue), - InvalidMachineId, - InvalidSocketId, - IoError(#[serde(with = "serde_io_error::SerdeIoErrorKindDef")] io::ErrorKind), -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -struct ServerProcessorReply { - message_id: MessageId, - status: ServerProcessorReplyResult, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -enum ServerProcessorEvent { - Reply(ServerProcessorReply), - // DeadSocket { - // machine_id: MachineId, - // socket_id: SocketId, - // }, -} - #[derive(Debug, Clone)] pub struct RouterClient { unlocked_inner: Arc, @@ -213,7 +73,7 @@ impl RouterClient { } // Create channels - let (client_sender, server_receiver) = flume::unbounded::(); + let (client_sender, server_receiver) = flume::unbounded::(); // Create stopper let stop_source = StopSource::new(); @@ -289,6 +149,29 @@ impl RouterClient { )) } + pub(super) fn local_router_client( + client_sender: flume::Sender, + server_receiver: flume::Receiver, + ) -> RouterClient { + // Create stopper + let stop_source = StopSource::new(); + + // Create router operation waiter + let router_op_waiter = RouterOpWaiter::new(); + + // Spawn a client connection handler + let jh_handler = spawn( + "RouterClient local processor", + Self::run_local_processor( + server_receiver, + router_op_waiter.clone(), + stop_source.token(), + ), + ); + + Self::new(client_sender, router_op_waiter, jh_handler, stop_source) + } + pub async fn disconnect(self) { drop(self.inner.lock().stop_source.take()); let jh_handler = self.inner.lock().jh_handler.take(); @@ -466,7 +349,7 @@ impl RouterClient { pub async fn recv( self, machine_id: MachineId, - socket_id: u64, + socket_id: SocketId, len: usize, ) -> VirtualNetworkResult> { let request = ServerProcessorRequest::Recv { @@ -483,7 +366,7 @@ impl RouterClient { pub async fn recv_from( self, machine_id: MachineId, - socket_id: u64, + socket_id: SocketId, len: usize, ) -> VirtualNetworkResult<(Vec, SocketAddr)> { let request = ServerProcessorRequest::RecvFrom { @@ -501,11 +384,93 @@ impl RouterClient { Ok((data, remote_address)) } + pub async fn get_routed_local_address( + self, + machine_id: MachineId, + address_type: VirtualAddressType, + ) -> VirtualNetworkResult { + let request = ServerProcessorRequest::GetRoutedLocalAddress { + machine_id, + address_type, + }; + let ServerProcessorReplyValue::GetRoutedLocalAddress { address } = + self.perform_request(request).await? + else { + return Err(VirtualNetworkError::ResponseMismatch); + }; + Ok(address) + } + + pub async fn find_gateway( + self, + machine_id: MachineId, + ) -> VirtualNetworkResult> { + let request = ServerProcessorRequest::FindGateway { machine_id }; + let ServerProcessorReplyValue::FindGateway { opt_gateway_id } = + self.perform_request(request).await? + else { + return Err(VirtualNetworkError::ResponseMismatch); + }; + Ok(opt_gateway_id) + } + + pub async fn get_external_address(self, gateway_id: GatewayId) -> VirtualNetworkResult { + let request = ServerProcessorRequest::GetExternalAddress { gateway_id }; + let ServerProcessorReplyValue::GetExternalAddress { address } = + self.perform_request(request).await? + else { + return Err(VirtualNetworkError::ResponseMismatch); + }; + Ok(address) + } + + pub async fn add_port( + self, + gateway_id: GatewayId, + protocol: VirtualProtocolType, + external_port: Option, + local_address: SocketAddr, + lease_duration_ms: u32, + description: String, + ) -> VirtualNetworkResult { + let request = ServerProcessorRequest::AddPort { + gateway_id, + protocol, + external_port, + local_address, + lease_duration_ms, + description, + }; + let ServerProcessorReplyValue::AddPort { external_port } = + self.perform_request(request).await? + else { + return Err(VirtualNetworkError::ResponseMismatch); + }; + Ok(external_port) + } + + pub async fn remove_port( + self, + gateway_id: GatewayId, + protocol: VirtualProtocolType, + external_port: u16, + ) -> VirtualNetworkResult<()> { + let request = ServerProcessorRequest::RemovePort { + gateway_id, + protocol, + external_port, + }; + let ServerProcessorReplyValue::RemovePort = self.perform_request(request).await? else { + return Err(VirtualNetworkError::ResponseMismatch); + }; + Ok(()) + } + ////////////////////////////////////////////////////////////////////////// // Private implementation fn new( - sender: flume::Sender, + sender: flume::Sender, router_op_waiter: RouterOpWaiter, jh_handler: MustJoinHandle<()>, stop_source: StopSource, @@ -528,19 +493,11 @@ impl RouterClient { machine_id, socket_id, }; - let command_vec = match to_stdvec(&command).map_err(VirtualNetworkError::SerializationError) - { - Ok(v) => Bytes::from(v), - Err(e) => { - error!("{}", e); - return; - } - }; if let Err(e) = self .unlocked_inner .sender - .send(command_vec) + .send(command) .map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe)) { error!("{}", e); @@ -563,26 +520,25 @@ impl RouterClient { &self, request: ServerProcessorRequest, ) -> VirtualNetworkResult { - let message_id = self - .unlocked_inner - .next_message_id - .fetch_add(1, Ordering::AcqRel); + let message_id = MessageId( + self.unlocked_inner + .next_message_id + .fetch_add(1, Ordering::AcqRel), + ); let command = ServerProcessorCommand::Message(ServerProcessorMessage { message_id, request, }); - let command_vec = - Bytes::from(to_stdvec(&command).map_err(VirtualNetworkError::SerializationError)?); self.unlocked_inner .sender - .send_async(command_vec) + .send_async(command) .await .map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe))?; let handle = self .unlocked_inner .router_op_waiter - .add_op_waiter(message_id, ()); + .add_op_waiter(message_id.0, ()); let status = self .unlocked_inner @@ -608,7 +564,7 @@ impl RouterClient { async fn run_server_processor( reader: R, writer: W, - receiver: flume::Receiver, + receiver: flume::Receiver, router_op_waiter: RouterOpWaiter, stop_token: StopToken, ) where @@ -621,7 +577,16 @@ impl RouterClient { let framed_writer = FramedWrite::new(writer, BytesCodec); let framed_writer_fut = system_boxed(async move { - if let Err(e) = receiver.into_stream().map(Ok).forward(framed_writer).await { + if let Err(e) = receiver + .into_stream() + .map(|command| { + to_stdvec(&command) + .map_err(io::Error::other) + .map(Bytes::from) + }) + .forward(framed_writer) + .await + { error!("{}", e); } }); @@ -631,20 +596,7 @@ impl RouterClient { let evt = from_bytes::(&x) .map_err(VirtualNetworkError::SerializationError)?; - match evt { - ServerProcessorEvent::Reply(reply) => { - router_op_waiter - .complete_op_waiter(reply.message_id, reply.status) - .map_err(io::Error::other)?; - } // ServerProcessorEvent::DeadSocket { - // machine_id, - // socket_id, - // } => { - // // - // } - } - - Ok(()) + Self::process_event(evt, router_op_waiter.clone()).await }); if let Err(e) = fut.await { error!("{}", e); @@ -655,4 +607,45 @@ impl RouterClient { unord.push(framed_reader_fut); while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} } + + async fn run_local_processor( + receiver: flume::Receiver, + router_op_waiter: RouterOpWaiter, + stop_token: StopToken, + ) { + let mut unord = FuturesUnordered::new(); + let receiver = receiver + .into_stream() + .map(io::Result::::Ok); + + let framed_reader_fut = system_boxed(async move { + let fut = + receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone())); + if let Err(e) = fut.await { + error!("{}", e); + } + }); + unord.push(framed_reader_fut); + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + } + + async fn process_event( + evt: ServerProcessorEvent, + router_op_waiter: RouterOpWaiter, + ) -> io::Result<()> { + match evt { + ServerProcessorEvent::Reply(reply) => { + router_op_waiter + .complete_op_waiter(reply.message_id.0, reply.status) + .map_err(io::Error::other)?; + } // ServerProcessorEvent::DeadSocket { + // machine_id, + // socket_id, + // } => { + // // + // } + } + + Ok(()) + } } diff --git a/veilid-tools/src/virtual_network/router_server/commands.rs b/veilid-tools/src/virtual_network/router_server/commands.rs new file mode 100644 index 00000000..4563eaa6 --- /dev/null +++ b/veilid-tools/src/virtual_network/router_server/commands.rs @@ -0,0 +1,217 @@ +use super::*; +use serde::*; +use std::io; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[repr(transparent)] +pub struct MessageId(pub u64); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[repr(transparent)] +pub struct SocketId(pub u64); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[repr(transparent)] +pub struct GatewayId(pub u64); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub enum VirtualAddressType { + IPV6, + IPV4, +} + +impl fmt::Display for VirtualAddressType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + VirtualAddressType::IPV6 => write!(f, "IPV6"), + VirtualAddressType::IPV4 => write!(f, "IPV4"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub enum VirtualProtocolType { + UDP, + TCP, +} + +impl fmt::Display for VirtualProtocolType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + VirtualProtocolType::UDP => write!(f, "UDP"), + VirtualProtocolType::TCP => write!(f, "TCP"), + } + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum ServerProcessorRequest { + AllocateMachine, + ReleaseMachine { + machine_id: MachineId, + }, + GetInterfaces { + machine_id: MachineId, + }, + TcpConnect { + machine_id: MachineId, + local_address: Option, + remote_address: SocketAddr, + timeout_ms: u32, + options: VirtualTcpOptions, + }, + TcpBind { + machine_id: MachineId, + local_address: Option, + options: VirtualTcpOptions, + }, + TcpAccept { + machine_id: MachineId, + listen_socket_id: SocketId, + }, + TcpShutdown { + machine_id: MachineId, + socket_id: SocketId, + }, + UdpBind { + machine_id: MachineId, + local_address: Option, + options: VirtualUdpOptions, + }, + Send { + machine_id: MachineId, + socket_id: SocketId, + data: Vec, + }, + SendTo { + machine_id: MachineId, + socket_id: SocketId, + remote_address: SocketAddr, + data: Vec, + }, + Recv { + machine_id: MachineId, + socket_id: SocketId, + len: u32, + }, + RecvFrom { + machine_id: MachineId, + socket_id: SocketId, + len: u32, + }, + GetRoutedLocalAddress { + machine_id: MachineId, + address_type: VirtualAddressType, + }, + FindGateway { + machine_id: MachineId, + }, + GetExternalAddress { + gateway_id: GatewayId, + }, + AddPort { + gateway_id: GatewayId, + protocol: VirtualProtocolType, + external_port: Option, + local_address: SocketAddr, + lease_duration_ms: u32, + description: String, + }, + RemovePort { + gateway_id: GatewayId, + protocol: VirtualProtocolType, + external_port: u16, + }, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct ServerProcessorMessage { + pub message_id: MessageId, + pub request: ServerProcessorRequest, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum ServerProcessorCommand { + Message(ServerProcessorMessage), + CloseSocket { + machine_id: MachineId, + socket_id: SocketId, + }, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum ServerProcessorReplyValue { + AllocateMachine { + machine_id: MachineId, + }, + ReleaseMachine, + GetInterfaces { + interfaces: BTreeMap, + }, + TcpConnect { + socket_id: SocketId, + local_address: SocketAddr, + }, + TcpBind { + socket_id: SocketId, + local_address: SocketAddr, + }, + TcpAccept { + socket_id: SocketId, + address: SocketAddr, + }, + TcpShutdown, + UdpBind { + socket_id: SocketId, + local_address: SocketAddr, + }, + Send { + len: u32, + }, + SendTo { + len: u32, + }, + Recv { + data: Vec, + }, + RecvFrom { + remote_address: SocketAddr, + data: Vec, + }, + GetRoutedLocalAddress { + address: IpAddr, + }, + FindGateway { + opt_gateway_id: Option, + }, + GetExternalAddress { + address: IpAddr, + }, + AddPort { + external_port: u16, + }, + RemovePort, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum ServerProcessorReplyResult { + Value(ServerProcessorReplyValue), + InvalidMachineId, + InvalidSocketId, + IoError(#[serde(with = "serde_io_error::SerdeIoErrorKindDef")] io::ErrorKind), +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct ServerProcessorReply { + pub message_id: MessageId, + pub status: ServerProcessorReplyResult, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum ServerProcessorEvent { + Reply(ServerProcessorReply), + // DeadSocket { + // machine_id: MachineId, + // socket_id: SocketId, + // }, +} diff --git a/veilid-tools/src/virtual_network/router_server/mod.rs b/veilid-tools/src/virtual_network/router_server/mod.rs index e69de29b..e63f2410 100644 --- a/veilid-tools/src/virtual_network/router_server/mod.rs +++ b/veilid-tools/src/virtual_network/router_server/mod.rs @@ -0,0 +1,70 @@ +mod commands; + +pub(super) use commands::*; + +use super::*; +use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite}; + +#[derive(ThisError, Debug, Clone, PartialEq, Eq)] +pub enum RouterServerError { + #[error("Serialization Error: {0}")] + SerializationError(postcard::Error), +} + +pub type RouterServerResult = Result; + +/// Router server for virtual networking +/// +/// Connect to this with a `RouterClient`. Simulates machines, allocates sockets +/// and gateways, manages a virtual simulated Internet and routes packets +/// virtually between `Machines` associated with `RouterClient`s. + +#[derive(Debug)] +pub struct RouterServer { + //tcp_connections: HashMap< + client_inbound_sender: flume::Sender, + client_inbound_receiver: flume::Receiver, + local_inbound_sender: flume::Sender, + local_inbound_receiver: flume::Receiver, +} + +impl RouterServer { + /// Create a router server for virtual networking + pub fn new() -> Self { + Self {} + } + + /// Accept RouterClient connections on a TCP socket + pub fn listen_tcp(&self, addr: Option) -> RouterServerResult { + Ok(()) + } + + /// Accept RouterClient connections on a WebSocket + pub fn listen_ws(&self, addr: Option) -> RouterServerResult { + Ok(()) + } + + /// Return a local RouterClient + pub fn router_client(&self) -> RouterClient { + // Create the outbound channel + xxxx get these channels right + let (local_outbound_sender, local_outbound_receiver) = flume::unbounded(); + + // Create a RouterClient directly connected to this RouterServer + RouterClient::local_router_client( + self.local_inbound_sender.clone(), + local_outbound_receiver, + ) + } + + /// Run the router server until a stop is requested + pub async fn run(&self, stop_token: StopToken) -> RouterServerResult<()> { + Ok(()) + } +} + +impl Default for RouterServer { + fn default() -> Self { + Self::new() + } +} diff --git a/veilid-tools/src/virtual_network/virtual_gateway.rs b/veilid-tools/src/virtual_network/virtual_gateway.rs new file mode 100644 index 00000000..00698502 --- /dev/null +++ b/veilid-tools/src/virtual_network/virtual_gateway.rs @@ -0,0 +1,89 @@ +use super::*; + +#[derive(Debug)] +pub struct VirtualGateway { + machine: Machine, + gateway_id: GatewayId, +} + +impl VirtualGateway { + ///////////////////////////////////////////////////////////// + // Public Interface + + pub async fn find() -> VirtualNetworkResult> { + let machine = default_machine().unwrap(); + Self::find_with_machine(machine).await + } + + pub async fn find_with_machine(machine: Machine) -> VirtualNetworkResult> { + machine + .router_client + .clone() + .find_gateway(machine.id) + .await + .map(|opt_gateway_id| opt_gateway_id.map(|gateway_id| Self::new(machine, gateway_id))) + } + + pub async fn get_routed_local_address( + &self, + address_type: VirtualAddressType, + ) -> VirtualNetworkResult { + self.machine + .router_client + .clone() + .get_routed_local_address(self.machine.id, address_type) + .await + } + + pub async fn get_external_address(&self) -> VirtualNetworkResult { + self.machine + .router_client + .clone() + .get_external_address(self.gateway_id) + .await + } + + pub async fn add_port( + &self, + protocol: VirtualProtocolType, + external_port: Option, + local_address: SocketAddr, + lease_duration_ms: u32, + description: String, + ) -> VirtualNetworkResult { + self.machine + .router_client + .clone() + .add_port( + self.gateway_id, + protocol, + external_port, + local_address, + lease_duration_ms, + description, + ) + .await + } + + pub async fn remove_port( + &self, + protocol: VirtualProtocolType, + external_port: u16, + ) -> VirtualNetworkResult<()> { + self.machine + .router_client + .clone() + .remove_port(self.gateway_id, protocol, external_port) + .await + } + + ///////////////////////////////////////////////////////////// + // Private Implementation + + fn new(machine: Machine, gateway_id: GatewayId) -> Self { + Self { + machine, + gateway_id, + } + } +}