virtual gateway, router server work

This commit is contained in:
Christien Rioux 2024-11-07 23:00:31 -05:00
parent 91e1acb1d6
commit 51b78de530
11 changed files with 967 additions and 510 deletions

1
Cargo.lock generated
View File

@ -6524,6 +6524,7 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"chrono", "chrono",
"console_error_panic_hook", "console_error_panic_hook",
"ctrlc",
"eyre", "eyre",
"flume", "flume",
"fn_name", "fn_name",

View File

@ -2,6 +2,7 @@
/// Also performs UPNP/IGD mapping if enabled and possible /// Also performs UPNP/IGD mapping if enabled and possible
use super::*; use super::*;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use igd_manager::{IGDAddressType, IGDProtocolType};
const PORT_MAP_VALIDATE_TRY_COUNT: usize = 3; const PORT_MAP_VALIDATE_TRY_COUNT: usize = 3;
const PORT_MAP_VALIDATE_DELAY_MS: u32 = 500; const PORT_MAP_VALIDATE_DELAY_MS: u32 = 500;
@ -318,7 +319,15 @@ impl DiscoveryContext {
let address_type = self.unlocked_inner.config.address_type; let address_type = self.unlocked_inner.config.address_type;
let local_port = self.unlocked_inner.config.port; let local_port = self.unlocked_inner.config.port;
let low_level_protocol_type = protocol_type.low_level_protocol_type(); let igd_protocol_type = match protocol_type.low_level_protocol_type() {
LowLevelProtocolType::UDP => IGDProtocolType::UDP,
LowLevelProtocolType::TCP => IGDProtocolType::TCP,
};
let igd_address_type = match address_type {
AddressType::IPV6 => IGDAddressType::IPV6,
AddressType::IPV4 => IGDAddressType::IPV4,
};
let external_1 = self.inner.lock().external_info.first().unwrap().clone(); let external_1 = self.inner.lock().external_info.first().unwrap().clone();
let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone(); let igd_manager = self.unlocked_inner.net.unlocked_inner.igd_manager.clone();
@ -329,8 +338,8 @@ impl DiscoveryContext {
// Attempt a port mapping. If this doesn't succeed, it's not going to // Attempt a port mapping. If this doesn't succeed, it's not going to
let mapped_external_address = igd_manager let mapped_external_address = igd_manager
.map_any_port( .map_any_port(
low_level_protocol_type, igd_protocol_type,
address_type, igd_address_type,
local_port, local_port,
Some(external_1.address.ip_addr()), Some(external_1.address.ip_addr()),
) )
@ -361,10 +370,7 @@ impl DiscoveryContext {
if validate_tries != PORT_MAP_VALIDATE_TRY_COUNT { if validate_tries != PORT_MAP_VALIDATE_TRY_COUNT {
log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n", log_net!(debug "UPNP port mapping succeeded but port {}/{} is still unreachable.\nretrying\n",
local_port, match low_level_protocol_type { local_port, igd_protocol_type);
LowLevelProtocolType::UDP => "udp",
LowLevelProtocolType::TCP => "tcp",
});
sleep(PORT_MAP_VALIDATE_DELAY_MS).await sleep(PORT_MAP_VALIDATE_DELAY_MS).await
} else { } else {
break; break;
@ -374,18 +380,15 @@ impl DiscoveryContext {
// Release the mapping if we're still unreachable // Release the mapping if we're still unreachable
let _ = igd_manager let _ = igd_manager
.unmap_port( .unmap_port(
low_level_protocol_type, igd_protocol_type,
address_type, igd_address_type,
external_1.address.port(), external_1.address.port(),
) )
.await; .await;
if tries == PORT_MAP_TRY_COUNT { if tries == PORT_MAP_TRY_COUNT {
warn!("UPNP port mapping succeeded but port {}/{} is still unreachable.\nYou may need to add a local firewall allowed port on this machine.\n", warn!("UPNP port mapping succeeded but port {}/{} is still unreachable.\nYou may need to add a local firewall allowed port on this machine.\n",
local_port, match low_level_protocol_type { local_port, igd_protocol_type
LowLevelProtocolType::UDP => "udp",
LowLevelProtocolType::TCP => "tcp",
}
); );
break; break;
} }

View File

@ -5,13 +5,12 @@ use std::net::UdpSocket;
const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000; const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000;
const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000; const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000;
const UPNP_MAPPING_ATTEMPTS: u32 = 3; const UPNP_MAPPING_ATTEMPTS: u32 = 3;
const UPNP_MAPPING_LIFETIME_US: TimestampDuration = const UPNP_MAPPING_LIFETIME_US: u64 = UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64;
TimestampDuration::new(UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct PortMapKey { struct PortMapKey {
llpt: LowLevelProtocolType, protocol_type: IGDProtocolType,
at: AddressType, address_type: IGDAddressType,
local_port: u16, local_port: u16,
} }
@ -19,36 +18,67 @@ struct PortMapKey {
struct PortMapValue { struct PortMapValue {
ext_ip: IpAddr, ext_ip: IpAddr,
mapped_port: u16, mapped_port: u16,
timestamp: Timestamp, timestamp: u64,
renewal_lifetime: TimestampDuration, renewal_lifetime: u64,
renewal_attempts: u32, renewal_attempts: u32,
} }
struct IGDManagerInner { struct IGDManagerInner {
local_ip_addrs: BTreeMap<AddressType, IpAddr>, local_ip_addrs: BTreeMap<IGDAddressType, IpAddr>,
gateways: BTreeMap<IpAddr, Arc<Gateway>>, gateways: BTreeMap<IpAddr, Arc<Gateway>>,
port_maps: BTreeMap<PortMapKey, PortMapValue>, port_maps: BTreeMap<PortMapKey, PortMapValue>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct IGDManager { pub struct IGDManager {
config: VeilidConfig, program_name: String,
inner: Arc<Mutex<IGDManagerInner>>, inner: Arc<Mutex<IGDManagerInner>>,
} }
fn convert_llpt(llpt: LowLevelProtocolType) -> PortMappingProtocol { fn convert_protocol_type(igdpt: IGDProtocolType) -> PortMappingProtocol {
match llpt { match igdpt {
LowLevelProtocolType::UDP => PortMappingProtocol::UDP, IGDProtocolType::UDP => PortMappingProtocol::UDP,
LowLevelProtocolType::TCP => PortMappingProtocol::TCP, IGDProtocolType::TCP => PortMappingProtocol::TCP,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum IGDAddressType {
IPV6,
IPV4,
}
impl fmt::Display for IGDAddressType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IGDAddressType::IPV6 => write!(f, "IPV6"),
IGDAddressType::IPV4 => write!(f, "IPV4"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum IGDProtocolType {
UDP,
TCP,
}
impl fmt::Display for IGDProtocolType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IGDProtocolType::UDP => write!(f, "UDP"),
IGDProtocolType::TCP => write!(f, "TCP"),
}
} }
} }
impl IGDManager { impl IGDManager {
// /////////////////////////////////////////////////////////////////////
// Public Interface
pub fn new(config: VeilidConfig) -> Self { pub fn new(program_name: String) -> Self {
Self { Self {
config, program_name,
inner: Arc::new(Mutex::new(IGDManagerInner { inner: Arc::new(Mutex::new(IGDManagerInner {
local_ip_addrs: BTreeMap::new(), local_ip_addrs: BTreeMap::new(),
gateways: BTreeMap::new(), gateways: BTreeMap::new(),
@ -58,10 +88,306 @@ impl IGDManager {
} }
#[instrument(level = "trace", target = "net", skip_all)] #[instrument(level = "trace", target = "net", skip_all)]
fn get_routed_local_ip_address(address_type: AddressType) -> Option<IpAddr> { pub async fn unmap_port(
&self,
protocol_type: IGDProtocolType,
address_type: IGDAddressType,
mapped_port: u16,
) -> Option<()> {
let this = self.clone();
blocking_wrapper(
"igd unmap_port",
move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
let mut found = None;
for (pmk, pmv) in &inner.port_maps {
if pmk.protocol_type == protocol_type
&& pmk.address_type == address_type
&& pmv.mapped_port == mapped_port
{
found = Some(*pmk);
break;
}
}
let pmk = found?;
let _pmv = inner
.port_maps
.remove(&pmk)
.expect("key found but remove failed");
// Get local ip address
let local_ip = Self::find_local_ip(&mut inner, address_type)?;
// Find gateway
let gw = Self::find_gateway(&mut inner, local_ip)?;
// Unmap port
match gw.remove_port(convert_protocol_type(protocol_type), mapped_port) {
Ok(()) => (),
Err(e) => {
// Failed to map external port
log_net!(debug "upnp failed to remove external port: {}", e);
return None;
}
};
Some(())
},
None,
)
.await
}
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn map_any_port(
&self,
protocol_type: IGDProtocolType,
address_type: IGDAddressType,
local_port: u16,
expected_external_address: Option<IpAddr>,
) -> Option<SocketAddr> {
let this = self.clone();
blocking_wrapper("igd map_any_port", move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
let pmkey = PortMapKey {
protocol_type,
address_type,
local_port,
};
if let Some(pmval) = inner.port_maps.get(&pmkey) {
return Some(SocketAddr::new(pmval.ext_ip, pmval.mapped_port));
}
// Get local ip address
let local_ip = Self::find_local_ip(&mut inner, address_type)?;
// Find gateway
let gw = Self::find_gateway(&mut inner, local_ip)?;
// Get external address
let ext_ip = match gw.get_external_ip() {
Ok(ip) => ip,
Err(e) => {
log_net!(debug "couldn't get external ip from igd: {}", e);
return None;
}
};
// Ensure external IP matches address type
if ext_ip.is_ipv4() && address_type != IGDAddressType::IPV4 {
log_net!(debug "mismatched ip address type from igd, wanted v4, got v6");
return None;
} else if ext_ip.is_ipv6() && address_type != IGDAddressType::IPV6 {
log_net!(debug "mismatched ip address type from igd, wanted v6, got v4");
return None;
}
if let Some(expected_external_address) = expected_external_address {
if ext_ip != expected_external_address {
log_net!(debug "gateway external address does not match calculated external address: expected={} vs gateway={}", expected_external_address, ext_ip);
return None;
}
}
// Map any port
let desc = this.get_description(protocol_type, local_port);
let mapped_port = match gw.add_any_port(convert_protocol_type(protocol_type), SocketAddr::new(local_ip, local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) {
Ok(mapped_port) => mapped_port,
Err(e) => {
// Failed to map external port
log_net!(debug "upnp failed to map external port: {}", e);
return None;
}
};
// Add to mapping list to keep alive
let timestamp = get_timestamp();
inner.port_maps.insert(PortMapKey {
protocol_type,
address_type,
local_port,
}, PortMapValue {
ext_ip,
mapped_port,
timestamp,
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
renewal_attempts: 0,
});
// Succeeded, return the externally mapped port
Some(SocketAddr::new(ext_ip, mapped_port))
}, None)
.await
}
#[instrument(
level = "trace",
target = "net",
name = "IGDManager::tick",
skip_all,
err
)]
pub async fn tick(&self) -> EyreResult<bool> {
// Refresh mappings if we have them
// If an error is received, then return false to restart the local network
let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
{
let inner = self.inner.lock();
let now = get_timestamp();
for (k, v) in &inner.port_maps {
let mapping_lifetime = now.saturating_sub(v.timestamp);
if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US
|| v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS
{
// Past expiration time or tried N times, do a full renew and fail out if we can't
full_renews.push((*k, *v));
} else if mapping_lifetime >= v.renewal_lifetime {
// Attempt a normal renewal
renews.push((*k, *v));
}
}
// See if we need to do some blocking operations
if full_renews.is_empty() && renews.is_empty() {
// Just return now since there's nothing to renew
return Ok(true);
}
}
let this = self.clone();
blocking_wrapper(
"igd tick",
move || {
let mut inner = this.inner.lock();
// Process full renewals
for (k, v) in full_renews {
// Get local ip for address type
let local_ip = match Self::get_local_ip(&mut inner, k.address_type) {
Some(ip) => ip,
None => {
return Err(eyre!("local ip missing for address type"));
}
};
// Get gateway for interface
let gw = match Self::get_gateway(&mut inner, local_ip) {
Some(gw) => gw,
None => {
return Err(eyre!("gateway missing for interface"));
}
};
// Delete the mapping if it exists, ignore any errors here
let _ = gw.remove_port(convert_protocol_type(k.protocol_type), v.mapped_port);
inner.port_maps.remove(&k);
let desc = this.get_description(k.protocol_type, k.local_port);
match gw.add_any_port(
convert_protocol_type(k.protocol_type),
SocketAddr::new(local_ip, k.local_port),
(UPNP_MAPPING_LIFETIME_MS + 999) / 1000,
&desc,
) {
Ok(mapped_port) => {
log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k);
inner.port_maps.insert(
k,
PortMapValue {
ext_ip: v.ext_ip,
mapped_port,
timestamp: get_timestamp(),
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64
* 1000u64,
renewal_attempts: 0,
},
);
}
Err(e) => {
info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e);
// Must restart network now :(
return Ok(false);
}
};
}
// Process normal renewals
for (k, mut v) in renews {
// Get local ip for address type
let local_ip = match Self::get_local_ip(&mut inner, k.address_type) {
Some(ip) => ip,
None => {
return Err(eyre!("local ip missing for address type"));
}
};
// Get gateway for interface
let gw = match Self::get_gateway(&mut inner, local_ip) {
Some(gw) => gw,
None => {
return Err(eyre!("gateway missing for address type"));
}
};
let desc = this.get_description(k.protocol_type, k.local_port);
match gw.add_port(
convert_protocol_type(k.protocol_type),
v.mapped_port,
SocketAddr::new(local_ip, k.local_port),
(UPNP_MAPPING_LIFETIME_MS + 999) / 1000,
&desc,
) {
Ok(()) => {
log_net!("renewed mapped port {:?} -> {:?}", v, k);
inner.port_maps.insert(
k,
PortMapValue {
ext_ip: v.ext_ip,
mapped_port: v.mapped_port,
timestamp: get_timestamp(),
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64
* 1000u64,
renewal_attempts: 0,
},
);
}
Err(e) => {
log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e);
// Get closer to the maximum renewal timeline by a factor of two each time
v.renewal_lifetime =
(v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64;
v.renewal_attempts += 1;
// Store new value to try again
inner.port_maps.insert(k, v);
}
};
}
// Normal exit, no restart
Ok(true)
},
Err(eyre!("failed to process blocking task")),
)
.instrument(tracing::trace_span!("igd tick fut"))
.await
}
/////////////////////////////////////////////////////////////////////
// Private Implementation
#[instrument(level = "trace", target = "net", skip_all)]
fn get_routed_local_ip_address(address_type: IGDAddressType) -> Option<IpAddr> {
let socket = match UdpSocket::bind(match address_type { let socket = match UdpSocket::bind(match address_type {
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), IGDAddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), IGDAddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
}) { }) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
@ -75,8 +401,8 @@ impl IGDManager {
// using google's dns, but it wont actually send any packets to it // using google's dns, but it wont actually send any packets to it
socket socket
.connect(match address_type { .connect(match address_type {
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 80), IGDAddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 80),
AddressType::IPV6 => SocketAddr::new( IGDAddressType::IPV6 => SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)), IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
80, 80,
), ),
@ -91,7 +417,7 @@ impl IGDManager {
} }
#[instrument(level = "trace", target = "net", skip_all)] #[instrument(level = "trace", target = "net", skip_all)]
fn find_local_ip(inner: &mut IGDManagerInner, address_type: AddressType) -> Option<IpAddr> { fn find_local_ip(inner: &mut IGDManagerInner, address_type: IGDAddressType) -> Option<IpAddr> {
if let Some(ip) = inner.local_ip_addrs.get(&address_type) { if let Some(ip) = inner.local_ip_addrs.get(&address_type) {
return Some(*ip); return Some(*ip);
} }
@ -109,7 +435,7 @@ impl IGDManager {
} }
#[instrument(level = "trace", target = "net", skip_all)] #[instrument(level = "trace", target = "net", skip_all)]
fn get_local_ip(inner: &mut IGDManagerInner, address_type: AddressType) -> Option<IpAddr> { fn get_local_ip(inner: &mut IGDManagerInner, address_type: IGDAddressType) -> Option<IpAddr> {
if let Some(ip) = inner.local_ip_addrs.get(&address_type) { if let Some(ip) = inner.local_ip_addrs.get(&address_type) {
return Some(*ip); return Some(*ip);
} }
@ -164,304 +490,10 @@ impl IGDManager {
None None
} }
fn get_description(&self, llpt: LowLevelProtocolType, local_port: u16) -> String { fn get_description(&self, protocol_type: IGDProtocolType, local_port: u16) -> String {
format!( format!(
"{} map {} for port {}", "{} map {} for port {}",
self.config.get().program_name, self.program_name, protocol_type, local_port
convert_llpt(llpt),
local_port
) )
} }
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn unmap_port(
&self,
llpt: LowLevelProtocolType,
at: AddressType,
mapped_port: u16,
) -> Option<()> {
let this = self.clone();
blocking_wrapper(
"igd unmap_port",
move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
let mut found = None;
for (pmk, pmv) in &inner.port_maps {
if pmk.llpt == llpt && pmk.at == at && pmv.mapped_port == mapped_port {
found = Some(*pmk);
break;
}
}
let pmk = found?;
let _pmv = inner
.port_maps
.remove(&pmk)
.expect("key found but remove failed");
// Get local ip address
let local_ip = Self::find_local_ip(&mut inner, at)?;
// Find gateway
let gw = Self::find_gateway(&mut inner, local_ip)?;
// Unmap port
match gw.remove_port(convert_llpt(llpt), mapped_port) {
Ok(()) => (),
Err(e) => {
// Failed to map external port
log_net!(debug "upnp failed to remove external port: {}", e);
return None;
}
};
Some(())
},
None,
)
.await
}
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn map_any_port(
&self,
llpt: LowLevelProtocolType,
at: AddressType,
local_port: u16,
expected_external_address: Option<IpAddr>,
) -> Option<SocketAddr> {
let this = self.clone();
blocking_wrapper("igd map_any_port", move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
let pmkey = PortMapKey {
llpt,
at,
local_port,
};
if let Some(pmval) = inner.port_maps.get(&pmkey) {
return Some(SocketAddr::new(pmval.ext_ip, pmval.mapped_port));
}
// Get local ip address
let local_ip = Self::find_local_ip(&mut inner, at)?;
// Find gateway
let gw = Self::find_gateway(&mut inner, local_ip)?;
// Get external address
let ext_ip = match gw.get_external_ip() {
Ok(ip) => ip,
Err(e) => {
log_net!(debug "couldn't get external ip from igd: {}", e);
return None;
}
};
// Ensure external IP matches address type
if ext_ip.is_ipv4() && at != AddressType::IPV4 {
log_net!(debug "mismatched ip address type from igd, wanted v4, got v6");
return None;
} else if ext_ip.is_ipv6() && at != AddressType::IPV6 {
log_net!(debug "mismatched ip address type from igd, wanted v6, got v4");
return None;
}
if let Some(expected_external_address) = expected_external_address {
if ext_ip != expected_external_address {
log_net!(debug "gateway external address does not match calculated external address: expected={} vs gateway={}", expected_external_address, ext_ip);
return None;
}
}
// Map any port
let desc = this.get_description(llpt, local_port);
let mapped_port = match gw.add_any_port(convert_llpt(llpt), SocketAddr::new(local_ip, local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) {
Ok(mapped_port) => mapped_port,
Err(e) => {
// Failed to map external port
log_net!(debug "upnp failed to map external port: {}", e);
return None;
}
};
// Add to mapping list to keep alive
let timestamp = Timestamp::now();
inner.port_maps.insert(PortMapKey {
llpt,
at,
local_port,
}, PortMapValue {
ext_ip,
mapped_port,
timestamp,
renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(),
renewal_attempts: 0,
});
// Succeeded, return the externally mapped port
Some(SocketAddr::new(ext_ip, mapped_port))
}, None)
.await
}
#[instrument(
level = "trace",
target = "net",
name = "IGDManager::tick",
skip_all,
err
)]
pub async fn tick(&self) -> EyreResult<bool> {
// Refresh mappings if we have them
// If an error is received, then return false to restart the local network
let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
{
let inner = self.inner.lock();
let now = Timestamp::now();
for (k, v) in &inner.port_maps {
let mapping_lifetime = now.saturating_sub(v.timestamp);
if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US
|| v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS
{
// Past expiration time or tried N times, do a full renew and fail out if we can't
full_renews.push((*k, *v));
} else if mapping_lifetime >= v.renewal_lifetime {
// Attempt a normal renewal
renews.push((*k, *v));
}
}
// See if we need to do some blocking operations
if full_renews.is_empty() && renews.is_empty() {
// Just return now since there's nothing to renew
return Ok(true);
}
}
let this = self.clone();
blocking_wrapper(
"igd tick",
move || {
let mut inner = this.inner.lock();
// Process full renewals
for (k, v) in full_renews {
// Get local ip for address type
let local_ip = match Self::get_local_ip(&mut inner, k.at) {
Some(ip) => ip,
None => {
return Err(eyre!("local ip missing for address type"));
}
};
// Get gateway for interface
let gw = match Self::get_gateway(&mut inner, local_ip) {
Some(gw) => gw,
None => {
return Err(eyre!("gateway missing for interface"));
}
};
// Delete the mapping if it exists, ignore any errors here
let _ = gw.remove_port(convert_llpt(k.llpt), v.mapped_port);
inner.port_maps.remove(&k);
let desc = this.get_description(k.llpt, k.local_port);
match gw.add_any_port(
convert_llpt(k.llpt),
SocketAddr::new(local_ip, k.local_port),
(UPNP_MAPPING_LIFETIME_MS + 999) / 1000,
&desc,
) {
Ok(mapped_port) => {
log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k);
inner.port_maps.insert(
k,
PortMapValue {
ext_ip: v.ext_ip,
mapped_port,
timestamp: Timestamp::now(),
renewal_lifetime: TimestampDuration::new(
(UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
),
renewal_attempts: 0,
},
);
}
Err(e) => {
info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e);
// Must restart network now :(
return Ok(false);
}
};
}
// Process normal renewals
for (k, mut v) in renews {
// Get local ip for address type
let local_ip = match Self::get_local_ip(&mut inner, k.at) {
Some(ip) => ip,
None => {
return Err(eyre!("local ip missing for address type"));
}
};
// Get gateway for interface
let gw = match Self::get_gateway(&mut inner, local_ip) {
Some(gw) => gw,
None => {
return Err(eyre!("gateway missing for address type"));
}
};
let desc = this.get_description(k.llpt, k.local_port);
match gw.add_port(
convert_llpt(k.llpt),
v.mapped_port,
SocketAddr::new(local_ip, k.local_port),
(UPNP_MAPPING_LIFETIME_MS + 999) / 1000,
&desc,
) {
Ok(()) => {
log_net!("renewed mapped port {:?} -> {:?}", v, k);
inner.port_maps.insert(
k,
PortMapValue {
ext_ip: v.ext_ip,
mapped_port: v.mapped_port,
timestamp: Timestamp::now(),
renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64
* 1000u64)
.into(),
renewal_attempts: 0,
},
);
}
Err(e) => {
log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e);
// Get closer to the maximum renewal timeline by a factor of two each time
v.renewal_lifetime =
(v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64;
v.renewal_attempts += 1;
// Store new value to try again
inner.port_maps.insert(k, v);
}
};
}
// Normal exit, no restart
Ok(true)
},
Err(eyre!("failed to process blocking task")),
)
.instrument(tracing::trace_span!("igd tick fut"))
.await
}
} }

View File

@ -167,6 +167,7 @@ impl Network {
connection_manager: ConnectionManager, connection_manager: ConnectionManager,
) -> NetworkUnlockedInner { ) -> NetworkUnlockedInner {
let config = network_manager.config(); let config = network_manager.config();
let program_name = config.get().program_name.clone();
NetworkUnlockedInner { NetworkUnlockedInner {
startup_lock: StartupLock::new(), startup_lock: StartupLock::new(),
network_manager, network_manager,
@ -183,7 +184,7 @@ impl Network {
), ),
upnp_task: TickTask::new("upnp_task", UPNP_TASK_TICK_PERIOD_SECS), upnp_task: TickTask::new("upnp_task", UPNP_TASK_TICK_PERIOD_SECS),
network_task_lock: AsyncMutex::new(()), network_task_lock: AsyncMutex::new(()),
igd_manager: igd_manager::IGDManager::new(config.clone()), igd_manager: igd_manager::IGDManager::new(program_name),
} }
} }

View File

@ -79,6 +79,7 @@ flume = { version = "0.11.0", features = ["async"] }
async-io = { version = "1.13.0" } async-io = { version = "1.13.0" }
async-std = { version = "1.12.0", features = ["unstable"], optional = true } async-std = { version = "1.12.0", features = ["unstable"], optional = true }
chrono = "0.4.38" chrono = "0.4.38"
ctrlc = "^3"
futures-util = { version = "0.3.30", default-features = false, features = [ futures-util = { version = "0.3.30", default-features = false, features = [
"async-await", "async-await",
"sink", "sink",

View File

@ -0,0 +1,45 @@
#![cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use cfg_if::*;
use parking_lot::*;
use stop_token::StopSource;
use veilid_tools::*;
const VERSION: &str = env!("CARGO_PKG_VERSION");
cfg_if! {
if #[cfg(feature="rt-async-std")] {
pub fn block_on<F: Future<Output = T>, T>(f: F) -> T {
async_std::task::block_on(f)
}
} else if #[cfg(feature="rt-tokio")] {
pub fn block_on<F: Future<Output = T>, T>(f: F) -> T {
let rt = tokio::runtime::Runtime::new().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, f)
}
} else {
compile_error!("needs executor implementation");
}
}
fn main() -> Result<(), String> {
let stop_source = StopSource::new();
let stop_token = stop_source.token();
let stop_mutex = Mutex::new(Some(stop_source));
ctrlc::set_handler(move || {
*(stop_mutex.lock()) = None;
})
.expect("Error setting Ctrl-C handler");
block_on(async {
println!("Veilid VirtualRouter v{}", VERSION);
let router_server = virtual_network::RouterServer::new();
router_server
.run(stop_token)
.await
.map_err(|e| e.to_string())
})
}

View File

@ -27,6 +27,7 @@
//! [VirtualUdpSocket] //! [VirtualUdpSocket]
//! [VirtualTcpListener] //! [VirtualTcpListener]
//! [VirtualTcpListenerStream] //! [VirtualTcpListenerStream]
//! [VirtualGateway]
//! [VirtualWsMeta] //! [VirtualWsMeta]
//! [VirtualWsStream] //! [VirtualWsStream]
//! //!
@ -42,8 +43,10 @@
mod machine; mod machine;
mod router_client; mod router_client;
mod router_op_table; mod router_op_table;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
mod router_server; mod router_server;
mod serde_io_error; mod serde_io_error;
mod virtual_gateway;
mod virtual_network_error; mod virtual_network_error;
mod virtual_tcp_listener; mod virtual_tcp_listener;
mod virtual_tcp_listener_stream; mod virtual_tcp_listener_stream;
@ -54,7 +57,9 @@ use super::*;
pub use machine::*; pub use machine::*;
pub use router_client::*; pub use router_client::*;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
pub use router_server::*; pub use router_server::*;
pub use virtual_gateway::*;
pub use virtual_network_error::*; pub use virtual_network_error::*;
pub use virtual_tcp_listener::*; pub use virtual_tcp_listener::*;
pub use virtual_tcp_listener_stream::*; pub use virtual_tcp_listener_stream::*;

View File

@ -6,7 +6,6 @@ use futures_util::{
}; };
use postcard::{from_bytes, to_stdvec}; use postcard::{from_bytes, to_stdvec};
use router_op_table::*; use router_op_table::*;
use serde::*;
use std::io; use std::io;
use stop_token::future::FutureExt as _; use stop_token::future::FutureExt as _;
@ -25,7 +24,7 @@ impl fmt::Debug for RouterClientInner {
} }
struct RouterClientUnlockedInner { struct RouterClientUnlockedInner {
sender: flume::Sender<Bytes>, sender: flume::Sender<ServerProcessorCommand>,
next_message_id: AtomicU64, next_message_id: AtomicU64,
router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>, router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>,
} }
@ -40,145 +39,6 @@ impl fmt::Debug for RouterClientUnlockedInner {
} }
} }
pub type MessageId = u64;
pub type SocketId = u64;
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorRequest {
AllocateMachine,
ReleaseMachine {
machine_id: MachineId,
},
GetInterfaces {
machine_id: MachineId,
},
TcpConnect {
machine_id: MachineId,
local_address: Option<SocketAddr>,
remote_address: SocketAddr,
timeout_ms: u32,
options: VirtualTcpOptions,
},
TcpBind {
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualTcpOptions,
},
TcpAccept {
machine_id: MachineId,
listen_socket_id: SocketId,
},
TcpShutdown {
machine_id: MachineId,
socket_id: SocketId,
},
UdpBind {
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualUdpOptions,
},
Send {
machine_id: MachineId,
socket_id: SocketId,
data: Vec<u8>,
},
SendTo {
machine_id: MachineId,
socket_id: SocketId,
remote_address: SocketAddr,
data: Vec<u8>,
},
Recv {
machine_id: MachineId,
socket_id: u64,
len: u32,
},
RecvFrom {
machine_id: MachineId,
socket_id: u64,
len: u32,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct ServerProcessorMessage {
message_id: MessageId,
request: ServerProcessorRequest,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorCommand {
Message(ServerProcessorMessage),
CloseSocket {
machine_id: MachineId,
socket_id: SocketId,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorReplyValue {
AllocateMachine {
machine_id: MachineId,
},
ReleaseMachine,
GetInterfaces {
interfaces: BTreeMap<String, NetworkInterface>,
},
TcpConnect {
socket_id: SocketId,
local_address: SocketAddr,
},
TcpBind {
socket_id: SocketId,
local_address: SocketAddr,
},
TcpAccept {
socket_id: SocketId,
address: SocketAddr,
},
TcpShutdown,
UdpBind {
socket_id: SocketId,
local_address: SocketAddr,
},
Send {
len: u32,
},
SendTo {
len: u32,
},
Recv {
data: Vec<u8>,
},
RecvFrom {
remote_address: SocketAddr,
data: Vec<u8>,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorReplyResult {
Value(ServerProcessorReplyValue),
InvalidMachineId,
InvalidSocketId,
IoError(#[serde(with = "serde_io_error::SerdeIoErrorKindDef")] io::ErrorKind),
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct ServerProcessorReply {
message_id: MessageId,
status: ServerProcessorReplyResult,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ServerProcessorEvent {
Reply(ServerProcessorReply),
// DeadSocket {
// machine_id: MachineId,
// socket_id: SocketId,
// },
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RouterClient { pub struct RouterClient {
unlocked_inner: Arc<RouterClientUnlockedInner>, unlocked_inner: Arc<RouterClientUnlockedInner>,
@ -213,7 +73,7 @@ impl RouterClient {
} }
// Create channels // Create channels
let (client_sender, server_receiver) = flume::unbounded::<Bytes>(); let (client_sender, server_receiver) = flume::unbounded::<ServerProcessorCommand>();
// Create stopper // Create stopper
let stop_source = StopSource::new(); let stop_source = StopSource::new();
@ -289,6 +149,29 @@ impl RouterClient {
)) ))
} }
pub(super) fn local_router_client(
client_sender: flume::Sender<ServerProcessorCommand>,
server_receiver: flume::Receiver<ServerProcessorEvent>,
) -> RouterClient {
// Create stopper
let stop_source = StopSource::new();
// Create router operation waiter
let router_op_waiter = RouterOpWaiter::new();
// Spawn a client connection handler
let jh_handler = spawn(
"RouterClient local processor",
Self::run_local_processor(
server_receiver,
router_op_waiter.clone(),
stop_source.token(),
),
);
Self::new(client_sender, router_op_waiter, jh_handler, stop_source)
}
pub async fn disconnect(self) { pub async fn disconnect(self) {
drop(self.inner.lock().stop_source.take()); drop(self.inner.lock().stop_source.take());
let jh_handler = self.inner.lock().jh_handler.take(); let jh_handler = self.inner.lock().jh_handler.take();
@ -466,7 +349,7 @@ impl RouterClient {
pub async fn recv( pub async fn recv(
self, self,
machine_id: MachineId, machine_id: MachineId,
socket_id: u64, socket_id: SocketId,
len: usize, len: usize,
) -> VirtualNetworkResult<Vec<u8>> { ) -> VirtualNetworkResult<Vec<u8>> {
let request = ServerProcessorRequest::Recv { let request = ServerProcessorRequest::Recv {
@ -483,7 +366,7 @@ impl RouterClient {
pub async fn recv_from( pub async fn recv_from(
self, self,
machine_id: MachineId, machine_id: MachineId,
socket_id: u64, socket_id: SocketId,
len: usize, len: usize,
) -> VirtualNetworkResult<(Vec<u8>, SocketAddr)> { ) -> VirtualNetworkResult<(Vec<u8>, SocketAddr)> {
let request = ServerProcessorRequest::RecvFrom { let request = ServerProcessorRequest::RecvFrom {
@ -501,11 +384,93 @@ impl RouterClient {
Ok((data, remote_address)) Ok((data, remote_address))
} }
pub async fn get_routed_local_address(
self,
machine_id: MachineId,
address_type: VirtualAddressType,
) -> VirtualNetworkResult<IpAddr> {
let request = ServerProcessorRequest::GetRoutedLocalAddress {
machine_id,
address_type,
};
let ServerProcessorReplyValue::GetRoutedLocalAddress { address } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(address)
}
pub async fn find_gateway(
self,
machine_id: MachineId,
) -> VirtualNetworkResult<Option<GatewayId>> {
let request = ServerProcessorRequest::FindGateway { machine_id };
let ServerProcessorReplyValue::FindGateway { opt_gateway_id } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(opt_gateway_id)
}
pub async fn get_external_address(self, gateway_id: GatewayId) -> VirtualNetworkResult<IpAddr> {
let request = ServerProcessorRequest::GetExternalAddress { gateway_id };
let ServerProcessorReplyValue::GetExternalAddress { address } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(address)
}
pub async fn add_port(
self,
gateway_id: GatewayId,
protocol: VirtualProtocolType,
external_port: Option<u16>,
local_address: SocketAddr,
lease_duration_ms: u32,
description: String,
) -> VirtualNetworkResult<u16> {
let request = ServerProcessorRequest::AddPort {
gateway_id,
protocol,
external_port,
local_address,
lease_duration_ms,
description,
};
let ServerProcessorReplyValue::AddPort { external_port } =
self.perform_request(request).await?
else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(external_port)
}
pub async fn remove_port(
self,
gateway_id: GatewayId,
protocol: VirtualProtocolType,
external_port: u16,
) -> VirtualNetworkResult<()> {
let request = ServerProcessorRequest::RemovePort {
gateway_id,
protocol,
external_port,
};
let ServerProcessorReplyValue::RemovePort = self.perform_request(request).await? else {
return Err(VirtualNetworkError::ResponseMismatch);
};
Ok(())
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
// Private implementation // Private implementation
fn new( fn new(
sender: flume::Sender<Bytes>, sender: flume::Sender<ServerProcessorCommand>,
router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>, router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>,
jh_handler: MustJoinHandle<()>, jh_handler: MustJoinHandle<()>,
stop_source: StopSource, stop_source: StopSource,
@ -528,19 +493,11 @@ impl RouterClient {
machine_id, machine_id,
socket_id, socket_id,
}; };
let command_vec = match to_stdvec(&command).map_err(VirtualNetworkError::SerializationError)
{
Ok(v) => Bytes::from(v),
Err(e) => {
error!("{}", e);
return;
}
};
if let Err(e) = self if let Err(e) = self
.unlocked_inner .unlocked_inner
.sender .sender
.send(command_vec) .send(command)
.map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe)) .map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe))
{ {
error!("{}", e); error!("{}", e);
@ -563,26 +520,25 @@ impl RouterClient {
&self, &self,
request: ServerProcessorRequest, request: ServerProcessorRequest,
) -> VirtualNetworkResult<ServerProcessorReplyValue> { ) -> VirtualNetworkResult<ServerProcessorReplyValue> {
let message_id = self let message_id = MessageId(
.unlocked_inner self.unlocked_inner
.next_message_id .next_message_id
.fetch_add(1, Ordering::AcqRel); .fetch_add(1, Ordering::AcqRel),
);
let command = ServerProcessorCommand::Message(ServerProcessorMessage { let command = ServerProcessorCommand::Message(ServerProcessorMessage {
message_id, message_id,
request, request,
}); });
let command_vec =
Bytes::from(to_stdvec(&command).map_err(VirtualNetworkError::SerializationError)?);
self.unlocked_inner self.unlocked_inner
.sender .sender
.send_async(command_vec) .send_async(command)
.await .await
.map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe))?; .map_err(|_| VirtualNetworkError::IoError(io::ErrorKind::BrokenPipe))?;
let handle = self let handle = self
.unlocked_inner .unlocked_inner
.router_op_waiter .router_op_waiter
.add_op_waiter(message_id, ()); .add_op_waiter(message_id.0, ());
let status = self let status = self
.unlocked_inner .unlocked_inner
@ -608,7 +564,7 @@ impl RouterClient {
async fn run_server_processor<R, W>( async fn run_server_processor<R, W>(
reader: R, reader: R,
writer: W, writer: W,
receiver: flume::Receiver<Bytes>, receiver: flume::Receiver<ServerProcessorCommand>,
router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>, router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>,
stop_token: StopToken, stop_token: StopToken,
) where ) where
@ -621,7 +577,16 @@ impl RouterClient {
let framed_writer = FramedWrite::new(writer, BytesCodec); let framed_writer = FramedWrite::new(writer, BytesCodec);
let framed_writer_fut = system_boxed(async move { let framed_writer_fut = system_boxed(async move {
if let Err(e) = receiver.into_stream().map(Ok).forward(framed_writer).await { if let Err(e) = receiver
.into_stream()
.map(|command| {
to_stdvec(&command)
.map_err(io::Error::other)
.map(Bytes::from)
})
.forward(framed_writer)
.await
{
error!("{}", e); error!("{}", e);
} }
}); });
@ -631,20 +596,7 @@ impl RouterClient {
let evt = from_bytes::<ServerProcessorEvent>(&x) let evt = from_bytes::<ServerProcessorEvent>(&x)
.map_err(VirtualNetworkError::SerializationError)?; .map_err(VirtualNetworkError::SerializationError)?;
match evt { Self::process_event(evt, router_op_waiter.clone()).await
ServerProcessorEvent::Reply(reply) => {
router_op_waiter
.complete_op_waiter(reply.message_id, reply.status)
.map_err(io::Error::other)?;
} // ServerProcessorEvent::DeadSocket {
// machine_id,
// socket_id,
// } => {
// //
// }
}
Ok(())
}); });
if let Err(e) = fut.await { if let Err(e) = fut.await {
error!("{}", e); error!("{}", e);
@ -655,4 +607,45 @@ impl RouterClient {
unord.push(framed_reader_fut); unord.push(framed_reader_fut);
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
} }
async fn run_local_processor(
receiver: flume::Receiver<ServerProcessorEvent>,
router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>,
stop_token: StopToken,
) {
let mut unord = FuturesUnordered::new();
let receiver = receiver
.into_stream()
.map(io::Result::<ServerProcessorEvent>::Ok);
let framed_reader_fut = system_boxed(async move {
let fut =
receiver.try_for_each(|evt| Self::process_event(evt, router_op_waiter.clone()));
if let Err(e) = fut.await {
error!("{}", e);
}
});
unord.push(framed_reader_fut);
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
}
async fn process_event(
evt: ServerProcessorEvent,
router_op_waiter: RouterOpWaiter<ServerProcessorReplyResult, ()>,
) -> io::Result<()> {
match evt {
ServerProcessorEvent::Reply(reply) => {
router_op_waiter
.complete_op_waiter(reply.message_id.0, reply.status)
.map_err(io::Error::other)?;
} // ServerProcessorEvent::DeadSocket {
// machine_id,
// socket_id,
// } => {
// //
// }
}
Ok(())
}
} }

View File

@ -0,0 +1,217 @@
use super::*;
use serde::*;
use std::io;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[repr(transparent)]
pub struct MessageId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[repr(transparent)]
pub struct SocketId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[repr(transparent)]
pub struct GatewayId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum VirtualAddressType {
IPV6,
IPV4,
}
impl fmt::Display for VirtualAddressType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VirtualAddressType::IPV6 => write!(f, "IPV6"),
VirtualAddressType::IPV4 => write!(f, "IPV4"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum VirtualProtocolType {
UDP,
TCP,
}
impl fmt::Display for VirtualProtocolType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VirtualProtocolType::UDP => write!(f, "UDP"),
VirtualProtocolType::TCP => write!(f, "TCP"),
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum ServerProcessorRequest {
AllocateMachine,
ReleaseMachine {
machine_id: MachineId,
},
GetInterfaces {
machine_id: MachineId,
},
TcpConnect {
machine_id: MachineId,
local_address: Option<SocketAddr>,
remote_address: SocketAddr,
timeout_ms: u32,
options: VirtualTcpOptions,
},
TcpBind {
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualTcpOptions,
},
TcpAccept {
machine_id: MachineId,
listen_socket_id: SocketId,
},
TcpShutdown {
machine_id: MachineId,
socket_id: SocketId,
},
UdpBind {
machine_id: MachineId,
local_address: Option<SocketAddr>,
options: VirtualUdpOptions,
},
Send {
machine_id: MachineId,
socket_id: SocketId,
data: Vec<u8>,
},
SendTo {
machine_id: MachineId,
socket_id: SocketId,
remote_address: SocketAddr,
data: Vec<u8>,
},
Recv {
machine_id: MachineId,
socket_id: SocketId,
len: u32,
},
RecvFrom {
machine_id: MachineId,
socket_id: SocketId,
len: u32,
},
GetRoutedLocalAddress {
machine_id: MachineId,
address_type: VirtualAddressType,
},
FindGateway {
machine_id: MachineId,
},
GetExternalAddress {
gateway_id: GatewayId,
},
AddPort {
gateway_id: GatewayId,
protocol: VirtualProtocolType,
external_port: Option<u16>,
local_address: SocketAddr,
lease_duration_ms: u32,
description: String,
},
RemovePort {
gateway_id: GatewayId,
protocol: VirtualProtocolType,
external_port: u16,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct ServerProcessorMessage {
pub message_id: MessageId,
pub request: ServerProcessorRequest,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum ServerProcessorCommand {
Message(ServerProcessorMessage),
CloseSocket {
machine_id: MachineId,
socket_id: SocketId,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum ServerProcessorReplyValue {
AllocateMachine {
machine_id: MachineId,
},
ReleaseMachine,
GetInterfaces {
interfaces: BTreeMap<String, NetworkInterface>,
},
TcpConnect {
socket_id: SocketId,
local_address: SocketAddr,
},
TcpBind {
socket_id: SocketId,
local_address: SocketAddr,
},
TcpAccept {
socket_id: SocketId,
address: SocketAddr,
},
TcpShutdown,
UdpBind {
socket_id: SocketId,
local_address: SocketAddr,
},
Send {
len: u32,
},
SendTo {
len: u32,
},
Recv {
data: Vec<u8>,
},
RecvFrom {
remote_address: SocketAddr,
data: Vec<u8>,
},
GetRoutedLocalAddress {
address: IpAddr,
},
FindGateway {
opt_gateway_id: Option<GatewayId>,
},
GetExternalAddress {
address: IpAddr,
},
AddPort {
external_port: u16,
},
RemovePort,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum ServerProcessorReplyResult {
Value(ServerProcessorReplyValue),
InvalidMachineId,
InvalidSocketId,
IoError(#[serde(with = "serde_io_error::SerdeIoErrorKindDef")] io::ErrorKind),
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct ServerProcessorReply {
pub message_id: MessageId,
pub status: ServerProcessorReplyResult,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum ServerProcessorEvent {
Reply(ServerProcessorReply),
// DeadSocket {
// machine_id: MachineId,
// socket_id: SocketId,
// },
}

View File

@ -0,0 +1,70 @@
mod commands;
pub(super) use commands::*;
use super::*;
use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite};
#[derive(ThisError, Debug, Clone, PartialEq, Eq)]
pub enum RouterServerError {
#[error("Serialization Error: {0}")]
SerializationError(postcard::Error),
}
pub type RouterServerResult<T> = Result<T, RouterServerError>;
/// Router server for virtual networking
///
/// Connect to this with a `RouterClient`. Simulates machines, allocates sockets
/// and gateways, manages a virtual simulated Internet and routes packets
/// virtually between `Machines` associated with `RouterClient`s.
#[derive(Debug)]
pub struct RouterServer {
//tcp_connections: HashMap<
client_inbound_sender: flume::Sender<Bytes>,
client_inbound_receiver: flume::Receiver<Bytes>,
local_inbound_sender: flume::Sender<ServerProcessorCommand>,
local_inbound_receiver: flume::Receiver<ServerProcessorCommand>,
}
impl RouterServer {
/// Create a router server for virtual networking
pub fn new() -> Self {
Self {}
}
/// Accept RouterClient connections on a TCP socket
pub fn listen_tcp(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
Ok(())
}
/// Accept RouterClient connections on a WebSocket
pub fn listen_ws(&self, addr: Option<SocketAddr>) -> RouterServerResult<StopSource> {
Ok(())
}
/// Return a local RouterClient
pub fn router_client(&self) -> RouterClient {
// Create the outbound channel
xxxx get these channels right
let (local_outbound_sender, local_outbound_receiver) = flume::unbounded();
// Create a RouterClient directly connected to this RouterServer
RouterClient::local_router_client(
self.local_inbound_sender.clone(),
local_outbound_receiver,
)
}
/// Run the router server until a stop is requested
pub async fn run(&self, stop_token: StopToken) -> RouterServerResult<()> {
Ok(())
}
}
impl Default for RouterServer {
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,89 @@
use super::*;
#[derive(Debug)]
pub struct VirtualGateway {
machine: Machine,
gateway_id: GatewayId,
}
impl VirtualGateway {
/////////////////////////////////////////////////////////////
// Public Interface
pub async fn find() -> VirtualNetworkResult<Option<Self>> {
let machine = default_machine().unwrap();
Self::find_with_machine(machine).await
}
pub async fn find_with_machine(machine: Machine) -> VirtualNetworkResult<Option<Self>> {
machine
.router_client
.clone()
.find_gateway(machine.id)
.await
.map(|opt_gateway_id| opt_gateway_id.map(|gateway_id| Self::new(machine, gateway_id)))
}
pub async fn get_routed_local_address(
&self,
address_type: VirtualAddressType,
) -> VirtualNetworkResult<IpAddr> {
self.machine
.router_client
.clone()
.get_routed_local_address(self.machine.id, address_type)
.await
}
pub async fn get_external_address(&self) -> VirtualNetworkResult<IpAddr> {
self.machine
.router_client
.clone()
.get_external_address(self.gateway_id)
.await
}
pub async fn add_port(
&self,
protocol: VirtualProtocolType,
external_port: Option<u16>,
local_address: SocketAddr,
lease_duration_ms: u32,
description: String,
) -> VirtualNetworkResult<u16> {
self.machine
.router_client
.clone()
.add_port(
self.gateway_id,
protocol,
external_port,
local_address,
lease_duration_ms,
description,
)
.await
}
pub async fn remove_port(
&self,
protocol: VirtualProtocolType,
external_port: u16,
) -> VirtualNetworkResult<()> {
self.machine
.router_client
.clone()
.remove_port(self.gateway_id, protocol, external_port)
.await
}
/////////////////////////////////////////////////////////////
// Private Implementation
fn new(machine: Machine, gateway_id: GatewayId) -> Self {
Self {
machine,
gateway_id,
}
}
}