diff --git a/Cargo.lock b/Cargo.lock index 94268f26..60de7f23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3874,6 +3874,7 @@ dependencies = [ "webpki 0.22.0", "webpki-roots 0.22.2", "wee_alloc", + "winapi", "ws_stream_wasm", "x25519-dalek-ng", ] diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index b0834723..7582745c 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -122,6 +122,10 @@ ifstructs = "^0" [target.'cfg(any(target_os = "android",target_os = "linux"))'.dependencies] rtnetlink = { version = "^0", default-features = false, features = [ "smol_socket" ] } +# Dependencies for Windows +[target.'cfg(target_os = "windows")'.dependencies] +winapi = { version = "^0", features = [ "iptypes", "iphlpapi" ] } + # Dependencies for MacOS and iOS #[target.'cfg(any(target_os = "ios", target_os = "macos"))'.dependencies] # XXX diff --git a/veilid-core/src/intf/native/utils/network_interfaces/apple.rs b/veilid-core/src/intf/native/utils/network_interfaces/apple.rs index 9faaa9c0..ce635f8d 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/apple.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/apple.rs @@ -207,7 +207,6 @@ pub struct IfAddrs { } impl IfAddrs { - #[allow(unsafe_code, clippy::new_ret_no_self)] pub fn new() -> io::Result { let mut ifaddrs = mem::MaybeUninit::uninit(); @@ -393,7 +392,7 @@ impl PlatformSupportApple { Ok(AddressFlags { is_temporary: (flags & IN6_IFF_TEMPORARY) != 0, is_dynamic: (flags & IN6_IFF_DYNAMIC) != 0, - is_deprecated: (flags & IN6_IFF_DEPRECATED) != 0, + is_preferred: (flags & (IN6_IFF_TENTATIVE | IN6_IFF_DUPLICATED | IN6_IFF_DETACHED | IN6_IFF_DEPRECATED) ) == 0, }) } diff --git a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs index 5b02c738..9f606f56 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs @@ -9,6 +9,7 @@ cfg_if::cfg_if! { use netlink::PlatformSupportNetlink as PlatformSupport; } else if #[cfg(target_os = "windows")] { mod windows; + mod sockaddr_tools; use windows::PlatformSupportWindows as PlatformSupport; } else if #[cfg(any(target_os = "macos", target_os = "ios"))] { mod apple; @@ -84,7 +85,7 @@ pub struct AddressFlags { pub is_dynamic: bool, // ipv6 flags pub is_temporary: bool, - pub is_deprecated: bool, + pub is_preferred: bool, } #[derive(PartialEq, Eq, Clone, Debug)] @@ -116,8 +117,8 @@ impl Ord for InterfaceAddress { } } (IfAddr::V6(a), IfAddr::V6(b)) => { - // non-deprecated addresses are better - let ret = (!self.flags.is_deprecated).cmp(&!other.flags.is_deprecated); + // preferred addresses are better + let ret = self.flags.is_preferred.cmp(&other.flags.is_preferred); if ret != Ordering::Equal { return ret; } @@ -187,8 +188,8 @@ impl InterfaceAddress { pub fn is_dynamic(&self) -> bool { self.flags.is_dynamic } - pub fn is_deprecated(&self) -> bool { - self.flags.is_deprecated + pub fn is_preferred(&self) -> bool { + self.flags.is_preferred } } diff --git a/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs b/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs index 7f698934..8233c81c 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs @@ -47,7 +47,7 @@ fn flags_to_address_flags(flags: u32) -> AddressFlags { AddressFlags { is_temporary: (flags & IFA_F_TEMPORARY) != 0, is_dynamic: (flags & IFA_F_PERMANENT) == 0, - is_deprecated: (flags & IFA_F_DEPRECATED) != 0, + is_preferred: (flags & (IFA_F_TENTATIVE | IFA_F_DADFAILED | IFA_F_DEPRECATED | IFA_F_OPTIMISTIC) ) == 0, } } diff --git a/veilid-core/src/intf/native/utils/network_interfaces/windows.rs b/veilid-core/src/intf/native/utils/network_interfaces/windows.rs index e66ac463..0f3a4484 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/windows.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/windows.rs @@ -1,19 +1,358 @@ +// Copyright 2018 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under the MIT license or the Modified BSD license , at your option. This file may not be copied, +// modified, or distributed except according to those terms. Please review the Licences for the +// specific language governing permissions and limitations relating to use of the SAFE Network +// Software. + use super::*; -pub struct PlatformSupportWindows { - // -} +use libc::{self, c_ulong, c_void, size_t}; +use std::ffi::CStr; +use std::{io, ptr}; +use winapi::shared::ifdef::IfOperStatusUp; +use winapi::shared::ipifcons::IF_TYPE_SOFTWARE_LOOPBACK; +use winapi::shared::nldef::{ + IpDadStatePreferred, IpPrefixOriginDhcp, IpSuffixOriginDhcp, IpSuffixOriginRandom, +}; +use winapi::shared::winerror::{ERROR_BUFFER_OVERFLOW, ERROR_SUCCESS}; +use winapi::um::iphlpapi::GetAdaptersAddresses; +use winapi::um::iptypes::{ + GAA_FLAG_INCLUDE_GATEWAYS, GAA_FLAG_INCLUDE_PREFIX, GAA_FLAG_SKIP_ANYCAST, + GAA_FLAG_SKIP_DNS_SERVER, GAA_FLAG_SKIP_FRIENDLY_NAME, GAA_FLAG_SKIP_MULTICAST, + IP_ADAPTER_ADDRESSES, IP_ADAPTER_PREFIX, IP_ADAPTER_UNICAST_ADDRESS, +}; + +pub struct PlatformSupportWindows {} impl PlatformSupportWindows { pub fn new() -> Result { Ok(PlatformSupportWindows {}) } + fn get_interface_flags(intf: &IpAdapterAddresses) -> InterfaceFlags { + InterfaceFlags { + is_loopback: intf.get_flag_loopback(), + is_running: intf.get_flag_running(), + has_default_route: intf.get_has_default_route(), + } + } + + fn get_address_flags(addr: *const IP_ADAPTER_UNICAST_ADDRESS) -> AddressFlags { + let ds = unsafe { (*addr).DadState }; + let po = unsafe { (*addr).PrefixOrigin }; + let so = unsafe { (*addr).SuffixOrigin }; + AddressFlags { + is_temporary: so == IpSuffixOriginRandom, + is_dynamic: po == IpPrefixOriginDhcp || so == IpSuffixOriginDhcp, + is_preferred: ds == IpDadStatePreferred, + } + } + pub async fn get_interfaces( &mut self, interfaces: &mut BTreeMap, ) -> Result<(), String> { - // + //self.refresh_default_route_interfaces().await?; + + // If we have no routes, this isn't going to work + // if self.default_route_interfaces.is_empty() { + // return Err("no routes available for NetworkInterfaces".to_owned()); + // } + + // Iterate all the interfaces + let windows_interfaces = WindowsInterfaces::new().map_err(map_to_string)?; + for windows_interface in windows_interfaces.iter() { + // Get name + let intf_name = windows_interface.name(); + + // Get flags + let flags = Self::get_interface_flags(&windows_interface); + + let mut network_interface = NetworkInterface::new(intf_name.clone(), flags); + + // Go through all addresses and add them if appropriate + for addr in windows_interface.unicast_addresses() { + let intf_addr = match sockaddr_tools::to_ipaddr(addr.Address.lpSockaddr) { + None => continue, + Some(IpAddr::V4(ipv4_addr)) => { + let mut item_netmask = Ipv4Addr::new(0, 0, 0, 0); + let mut item_broadcast = None; + + // Search prefixes for a prefix matching addr + 'prefixloopv4: for prefix in windows_interface.prefixes() { + let ipprefix = sockaddr_tools::to_ipaddr(prefix.Address.lpSockaddr); + match ipprefix { + Some(IpAddr::V4(ref a)) => { + let mut netmask: [u8; 4] = [0; 4]; + for (n, netmask_elt) in netmask + .iter_mut() + .enumerate() + .take((prefix.PrefixLength as usize + 7) / 8) + { + let x_byte = ipv4_addr.octets()[n]; + let y_byte = a.octets()[n]; + for m in 0..8 { + if (n * 8) + m > prefix.PrefixLength as usize { + break; + } + let bit = 1 << m; + if (x_byte & bit) == (y_byte & bit) { + *netmask_elt |= bit; + } else { + continue 'prefixloopv4; + } + } + } + item_netmask = Ipv4Addr::new( + netmask[0], netmask[1], netmask[2], netmask[3], + ); + let mut broadcast: [u8; 4] = ipv4_addr.octets(); + for n in 0..4 { + broadcast[n] |= !netmask[n]; + } + item_broadcast = Some(Ipv4Addr::new( + broadcast[0], + broadcast[1], + broadcast[2], + broadcast[3], + )); + break 'prefixloopv4; + } + _ => continue, + }; + } + IfAddr::V4(Ifv4Addr { + ip: ipv4_addr, + netmask: item_netmask, + broadcast: item_broadcast, + }) + } + Some(IpAddr::V6(ipv6_addr)) => { + let mut item_netmask = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0); + // Search prefixes for a prefix matching addr + 'prefixloopv6: for prefix in windows_interface.prefixes() { + let ipprefix = sockaddr_tools::to_ipaddr(prefix.Address.lpSockaddr); + match ipprefix { + Some(IpAddr::V6(ref a)) => { + // Iterate the bits in the prefix, if they all match this prefix + // is the right one, else try the next prefix + let mut netmask: [u16; 8] = [0; 8]; + for (n, netmask_elt) in netmask + .iter_mut() + .enumerate() + .take((prefix.PrefixLength as usize + 15) / 16) + { + let x_word = ipv6_addr.segments()[n]; + let y_word = a.segments()[n]; + for m in 0..16 { + if (n * 16) + m > prefix.PrefixLength as usize { + break; + } + let bit = 1 << m; + if (x_word & bit) == (y_word & bit) { + *netmask_elt |= bit; + } else { + continue 'prefixloopv6; + } + } + } + item_netmask = Ipv6Addr::new( + netmask[0], netmask[1], netmask[2], netmask[3], netmask[4], + netmask[5], netmask[6], netmask[7], + ); + break 'prefixloopv6; + } + _ => continue, + }; + } + IfAddr::V6(Ifv6Addr { + ip: ipv6_addr, + netmask: item_netmask, + broadcast: None, + }) + } + }; + + let address_flags = Self::get_address_flags(addr); + + network_interface + .addrs + .push(InterfaceAddress::new(intf_addr, address_flags)) + } + + interfaces.insert(intf_name, network_interface); + } + Ok(()) } } + +#[repr(C)] +pub struct IpAdapterAddresses { + data: *const IP_ADAPTER_ADDRESSES, +} + +impl IpAdapterAddresses { + #[allow(unsafe_code)] + pub fn name(&self) -> String { + unsafe { CStr::from_ptr((*self.data).AdapterName) } + .to_string_lossy() + .into_owned() + } + + pub fn prefixes(&self) -> PrefixesIterator { + PrefixesIterator { + _phantom: std::marker::PhantomData {}, + next: unsafe { (*self.data).FirstPrefix }, + } + } + + pub fn unicast_addresses(&self) -> UnicastAddressesIterator { + UnicastAddressesIterator { + _phantom: std::marker::PhantomData {}, + next: unsafe { (*self.data).FirstUnicastAddress }, + } + } + + pub fn get_flag_loopback(&self) -> bool { + unsafe { (*self.data).IfType == IF_TYPE_SOFTWARE_LOOPBACK } + } + pub fn get_flag_running(&self) -> bool { + unsafe { (*self.data).OperStatus == IfOperStatusUp } + } + pub fn get_has_default_route(&self) -> bool { + unsafe { !(*self.data).FirstGatewayAddress.is_null() } + } +} + +struct WindowsInterfaces { + data: *const IP_ADAPTER_ADDRESSES, +} + +impl WindowsInterfaces { + pub fn new() -> io::Result { + let mut buffersize: c_ulong = 16384; + let mut ifaddrs: *mut IP_ADAPTER_ADDRESSES; + + loop { + unsafe { + ifaddrs = libc::malloc(buffersize as size_t) as *mut IP_ADAPTER_ADDRESSES; + if ifaddrs.is_null() { + panic!("Failed to allocate buffer in get_if_addrs()"); + } + + let retcode = GetAdaptersAddresses( + 0, + GAA_FLAG_SKIP_ANYCAST + | GAA_FLAG_SKIP_MULTICAST + | GAA_FLAG_SKIP_DNS_SERVER + | GAA_FLAG_INCLUDE_PREFIX + | GAA_FLAG_SKIP_FRIENDLY_NAME + | GAA_FLAG_INCLUDE_GATEWAYS, + ptr::null_mut(), + ifaddrs, + &mut buffersize, + ); + + match retcode { + ERROR_SUCCESS => break, + ERROR_BUFFER_OVERFLOW => { + libc::free(ifaddrs as *mut c_void); + buffersize *= 2; + continue; + } + _ => return Err(io::Error::last_os_error()), + } + } + } + + Ok(Self { data: ifaddrs }) + } + + pub fn iter(&self) -> WindowsInterfacesIterator<'_> { + WindowsInterfacesIterator { + next: self.data, + _phantom: std::marker::PhantomData {}, + } + } +} + +impl Drop for WindowsInterfaces { + fn drop(&mut self) { + unsafe { + libc::free(self.data as *mut c_void); + } + } +} + +pub struct WindowsInterfacesIterator<'a> { + next: *const IP_ADAPTER_ADDRESSES, + _phantom: std::marker::PhantomData<&'a u8>, +} + +impl<'a> Iterator for WindowsInterfacesIterator<'a> { + type Item = IpAdapterAddresses; + + #[allow(unsafe_code)] + fn next(&mut self) -> Option { + if self.next.is_null() { + return None; + }; + + Some(unsafe { + let result = &*self.next; + self.next = (*self.next).Next; + + IpAdapterAddresses { data: result } + }) + } +} + +pub struct PrefixesIterator<'a> { + _phantom: std::marker::PhantomData<&'a u8>, + next: *const IP_ADAPTER_PREFIX, +} + +impl<'a> Iterator for PrefixesIterator<'a> { + type Item = &'a IP_ADAPTER_PREFIX; + + #[allow(unsafe_code)] + fn next(&mut self) -> Option { + if self.next.is_null() { + return None; + }; + + Some(unsafe { + let result = &*self.next; + self.next = (*self.next).Next; + + result + }) + } +} + +pub struct UnicastAddressesIterator<'a> { + _phantom: std::marker::PhantomData<&'a u8>, + next: *const IP_ADAPTER_UNICAST_ADDRESS, +} + +impl<'a> Iterator for UnicastAddressesIterator<'a> { + type Item = &'a IP_ADAPTER_UNICAST_ADDRESS; + + #[allow(unsafe_code)] + fn next(&mut self) -> Option { + if self.next.is_null() { + return None; + }; + + Some(unsafe { + let result = &*self.next; + self.next = (*self.next).Next; + + result + }) + } +} diff --git a/veilid-server/src/client_log_channel.rs b/veilid-server/src/client_log_channel.rs index 3e60645f..e41d57e2 100644 --- a/veilid-server/src/client_log_channel.rs +++ b/veilid-server/src/client_log_channel.rs @@ -12,7 +12,7 @@ use std::sync::mpsc::SyncSender as StdSender; use std::sync::mpsc::TrySendError as StdTrySendError; ////////////////////////////////////////// - +#[derive(Clone)] pub struct ClientLogChannelCloser { sender: Arc>>>, } @@ -59,6 +59,7 @@ pub type ClientLogChannelWriter = std::io::LineWriter, } diff --git a/veilid-server/src/cmdline.rs b/veilid-server/src/cmdline.rs new file mode 100644 index 00000000..a562e221 --- /dev/null +++ b/veilid-server/src/cmdline.rs @@ -0,0 +1,173 @@ +use crate::settings::*; +use std::ffi::OsStr; +use clap::{App, Arg, ArgMatches}; +use std::str::FromStr; + +fn do_clap_matches(default_config_path: &OsStr) -> Result { + + let matches = App::new("veilid-server") + .version("0.1") + .about("Veilid Server") + .color(clap::ColorChoice::Auto) + .arg( + Arg::new("daemon") + .long("daemon") + .short('d') + .help("Run in daemon mode in the background"), + ) + .arg( + Arg::new("config-file") + .short('c') + .long("config-file") + .takes_value(true) + .value_name("FILE") + .default_value_os(default_config_path) + .help("Specify a configuration file to use"), + ).arg( + Arg::new("attach") + .long("attach") + .takes_value(true) + .value_name("BOOL") + .possible_values(&["false", "true"]) + .help("Automatically attach the server to the Veilid network"), + ) + // Dev options + .arg( + Arg::new("debug") + .long("debug") + .help("Turn on debug logging on the terminal"), + ) + .arg( + Arg::new("trace") + .long("trace") + .conflicts_with("debug") + .help("Turn on trace logging on the terminal"), + ) + .arg( + Arg::new("subnode_index") + .long("subnode_index") + .takes_value(true) + .help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"), + ) + .arg( + Arg::new("generate-dht-key") + .long("generate-dht-key") + .help("Only generate a new dht key and print it"), + ) + + .arg( + Arg::new("dump-config") + .long("dump-config") + .help("Instead of running the server, print the configuration it would use to the console"), + ) + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .takes_value(true) + .value_name("BOOTSTRAP_LIST") + .help("Specify a list of bootstrap servers to use"), + ) + .arg( + Arg::new("local") + .long("local") + .help("Enable local peer scope") + ); + + #[cfg(debug_assertions)] + let matches = matches.arg( + Arg::new("wait-for-debug") + .long("wait-for-debug") + .help("Wait for debugger to attach"), + ); + + Ok(matches.get_matches()) +} + +pub fn process_command_line() -> Result<(Settings, ArgMatches), String> { + + // Get command line options + let default_config_path = Settings::get_default_config_path(); + let matches = do_clap_matches(default_config_path.as_os_str()) + .map_err(|e| format!("failed to parse command line: {}", e))?; + + // Check for one-off commands + #[cfg(debug_assertions)] + if matches.occurrences_of("wait-for-debug") != 0 { + use bugsalot::debugger; + debugger::wait_until_attached(None).expect("state() not implemented on this platform"); + } + + // Attempt to load configuration + let settings = Settings::new( + matches.occurrences_of("config-file") == 0, + matches.value_of_os("config-file").unwrap(), + ) + .map_err(|e| format!("configuration is invalid: {}", e))?; + + // write lock the settings + let mut settingsrw = settings.write(); + + // Set config from command line + if matches.occurrences_of("daemon") != 0 { + settingsrw.daemon = true; + settingsrw.logging.terminal.enabled = false; + } + if matches.occurrences_of("subnode_index") != 0 { + let subnode_index = match matches.value_of("subnode_index") { + Some(x) => x + .parse() + .map_err(|e| format!("couldn't parse subnode index: {}", e))?, + None => { + return Err("value not specified for subnode_index".to_owned()); + } + }; + if subnode_index == 0 { + return Err("value of subnode_index should be between 1 and 65535".to_owned()); + } + settingsrw.testing.subnode_index = subnode_index; + } + if matches.occurrences_of("debug") != 0 { + settingsrw.logging.terminal.enabled = true; + settingsrw.logging.terminal.level = LogLevel::Debug; + } + if matches.occurrences_of("trace") != 0 { + settingsrw.logging.terminal.enabled = true; + settingsrw.logging.terminal.level = LogLevel::Trace; + } + if matches.is_present("attach") { + settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true")); + } + if matches.is_present("local") { + settingsrw.core.network.enable_local_peer_scope = true; + } + if matches.occurrences_of("bootstrap") != 0 { + let bootstrap = match matches.value_of("bootstrap") { + Some(x) => { + println!("Overriding bootstrap with: "); + let mut out: Vec = Vec::new(); + for x in x.split(',') { + println!(" {}", x); + out.push(ParsedNodeDialInfo::from_str(x).map_err(|e| { + format!( + "unable to parse dial info in bootstrap list: {} for {}", + e, x + ) + })?); + } + out + } + None => { + return Err("value not specified for bootstrap".to_owned()); + } + }; + settingsrw.core.network.bootstrap = bootstrap; + } + + // Apply subnode index if we're testing + drop(settingsrw); + settings + .apply_subnode_index() + .map_err(|_| "failed to apply subnode index".to_owned())?; + + Ok((settings, matches)) +} \ No newline at end of file diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 2605afea..7bc3e060 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -4,26 +4,62 @@ mod client_api; mod client_log_channel; +mod cmdline; +mod server; mod settings; +#[cfg(unix)] +mod unix; +mod veilid_logs; +#[cfg(windows)] +mod windows; + +use async_std::task; +use cfg_if::*; +use server::*; +use veilid_logs::*; #[allow(clippy::all)] pub mod veilid_client_capnp { include!(concat!(env!("OUT_DIR"), "/proto/veilid_client_capnp.rs")); } -cfg_if::cfg_if! { - if #[cfg(windows)] { - mod windows; +fn main() -> Result<(), String> { + let (settings, matches) = cmdline::process_command_line()?; - fn main() -> windows_service::Result<(), String> { - windows::main() + // --- Dump Config --- + if matches.occurrences_of("dump-config") != 0 { + //let cfg = config::Config::try_from(&*settingsr); + return serde_yaml::to_writer(std::io::stdout(), &*settings.read()) + .map_err(|e| e.to_string()); + } + + // --- Generate Id --- + if matches.occurrences_of("generate-id") != 0 { + let (key, secret) = veilid_core::generate_secret(); + println!("Public: {}\nSecret: {}", key.encode(), secret.encode()); + return Ok(()); + } + + // --- Daemon Mode ---- + if settings.read().daemon { + cfg_if! { + if #[cfg(windows)] { + return windows::run_service(settings, matches).map_err(|e| format!("{}", e)); + } else if #[cfg(unix)] { + return unix::run_daemon(settings, matches); + } } } - else { - mod unix; - fn main() -> Result<(), String> { - async_std::task::block_on(unix::main()) - } - } + // Init combined console/file logger + let logs = VeilidLogs::setup_normal_logs(settings.clone())?; + + // --- Normal Startup --- + ctrlc::set_handler(move || { + shutdown(); + }) + .expect("Error setting Ctrl-C handler"); + + // Run the server loop + task::block_on(async { run_veilid_server(settings, logs).await }) } diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs new file mode 100644 index 00000000..e260f5fd --- /dev/null +++ b/veilid-server/src/server.rs @@ -0,0 +1,161 @@ +use crate::client_api; +use crate::settings::*; +use crate::veilid_logs::*; +use async_std::channel::{bounded, Receiver, Sender}; +use lazy_static::*; +use log::*; +use parking_lot::Mutex; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use veilid_core::xx::SingleShotEventual; + +lazy_static! { + static ref SHUTDOWN_SWITCH: Mutex>> = + Mutex::new(Some(SingleShotEventual::new(()))); +} + +pub fn shutdown() { + let shutdown_switch = SHUTDOWN_SWITCH.lock().take(); + if let Some(shutdown_switch) = shutdown_switch { + shutdown_switch.resolve(()); + } +} + +pub async fn run_veilid_server(settings: Settings, logs: VeilidLogs) -> Result<(), String> { + let settingsr = settings.read(); + + // Create Veilid Core + let veilid_core = veilid_core::VeilidCore::new(); + + // Create client api state change pipe + let (sender, receiver): ( + Sender, + Receiver, + ) = bounded(1); + + // Create VeilidCore setup + let vcs = veilid_core::VeilidCoreSetup { + state_change_callback: Arc::new( + move |change: veilid_core::VeilidStateChange| -> veilid_core::SystemPinBoxFuture<()> { + let sender = sender.clone(); + Box::pin(async move { + if sender.send(change).await.is_err() { + error!("error sending state change callback"); + } + }) + }, + ), + config_callback: settings.get_core_config_callback(), + }; + + // Start Veilid Core and get API + let veilid_api = veilid_core + .startup(vcs) + .await + .map_err(|e| format!("VeilidCore startup failed: {}", e))?; + + // Start client api if one is requested + let mut capi = if settingsr.client_api.enabled { + let some_capi = client_api::ClientApi::new(veilid_api.clone()); + some_capi + .clone() + .run(settingsr.client_api.listen_address.addrs.clone()); + Some(some_capi) + } else { + None + }; + + // Drop rwlock on settings + let auto_attach = settingsr.auto_attach; + drop(settingsr); + + // Handle state changes on main thread for capnproto rpc + let state_change_receiver_jh = capi.clone().map(|capi| { + async_std::task::spawn_local(async move { + while let Ok(change) = receiver.recv().await { + capi.clone().handle_state_change(change); + } + }) + }); + // Handle log messages on main thread for capnproto rpc + let client_log_receiver_jh = capi + .clone() + .map(|capi| { + logs.client_log_channel + .clone() + .map(|mut client_log_channel| { + async_std::task::spawn_local(async move { + // Batch messages to either 16384 chars at once or every second to minimize packets + let rate = Duration::from_secs(1); + let mut start = Instant::now(); + let mut messages = String::new(); + loop { + let timeout_dur = + rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO); + match async_std::future::timeout(timeout_dur, client_log_channel.recv()) + .await + { + Ok(Ok(message)) => { + messages += &message; + if messages.len() > 16384 { + capi.clone() + .handle_client_log(core::mem::take(&mut messages)); + start = Instant::now(); + } + } + Ok(Err(_)) => break, + Err(_) => { + capi.clone() + .handle_client_log(core::mem::take(&mut messages)); + start = Instant::now(); + } + } + } + }) + }) + }) + .flatten(); + + // Auto-attach if desired + if auto_attach { + info!("Auto-attach to the Veilid network"); + if let Err(e) = veilid_api.attach().await { + error!("Auto-attaching to the Veilid network failed: {:?}", e); + shutdown(); + } + } + + // Idle while waiting to exit + let shutdown_switch = { + let shutdown_switch_locked = SHUTDOWN_SWITCH.lock(); + (*shutdown_switch_locked).as_ref().map(|ss| ss.instance()) + }; + if let Some(shutdown_switch) = shutdown_switch { + shutdown_switch.await; + } + + // Stop the client api if we have one + if let Some(c) = capi.as_mut().cloned() { + c.stop().await; + } + + // Shut down Veilid API to release state change sender + veilid_api.shutdown().await; + + // Close the client api log channel if it is open to release client log sender + if let Some(client_log_channel_closer) = logs.client_log_channel_closer { + client_log_channel_closer.close(); + } + + // Wait for state change receiver to exit + if let Some(state_change_receiver_jh) = state_change_receiver_jh { + state_change_receiver_jh.await; + } + + // Wait for client api log receiver to exit + if let Some(client_log_receiver_jh) = client_log_receiver_jh { + client_log_receiver_jh.await; + } + + Ok(()) +} diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 191bd217..fc28e55c 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -18,34 +18,34 @@ pub fn load_default_config(cfg: &mut config::Config) -> Result<(), config::Confi daemon: false client_api: enabled: true - listen_address: "localhost:5959" + listen_address: 'localhost:5959' auto_attach: true logging: terminal: enabled: true - level: "info" + level: 'info' file: enabled: false - path: "" + path: '' append: true - level: "info" + level: 'info' client: enabled: true - level: "info" + level: 'info' testing: subnode_index: 0 core: protected_store: allow_insecure_fallback: true always_use_insecure_storage: false - insecure_fallback_directory: "%INSECURE_FALLBACK_DIRECTORY%" + insecure_fallback_directory: '%INSECURE_FALLBACK_DIRECTORY%' table_store: - directory: "%TABLE_STORE_DIRECTORY%" + directory: '%TABLE_STORE_DIRECTORY%' network: max_connections: 16 connection_initial_timeout: 2000000 - node_id: "" - node_id_secret: "" + node_id: '' + node_id_secret: '' bootstrap: [] rpc: concurrency: 0 @@ -73,46 +73,46 @@ core: enable_local_peer_scope: false restricted_nat_retries: 3 tls: - certificate_path: "/etc/veilid/server.crt" - private_key_path: "/etc/veilid/private/server.key" + certificate_path: '/etc/veilid/server.crt' + private_key_path: '/etc/veilid/private/server.key' connection_initial_timeout: 2000000 application: https: enabled: false - listen_address: "[::]:5150" - path: "app" - # url: "https://localhost:5150" + listen_address: '[::]:5150' + path: 'app' + # url: 'https://localhost:5150' http: enabled: false - listen_address: "[::]:5150" - path: "app" - # url: "http://localhost:5150" + listen_address: '[::]:5150' + path: 'app' + # url: 'http://localhost:5150' protocol: udp: enabled: true socket_pool_size: 0 - listen_address: "[::]:5150" - # public_address: "" + listen_address: '[::]:5150' + # public_address: '' tcp: connect: true listen: true max_connections: 32 - listen_address: "[::]:5150" - # "public_address": "" + listen_address: '[::]:5150' + #'public_address: '' ws: connect: true listen: true max_connections: 16 - listen_address: "[::]:5150" - path: "ws" - # url: "ws://localhost:5150/ws" + listen_address: '[::]:5150' + path: 'ws' + # url: 'ws://localhost:5150/ws' wss: connect: true listen: false max_connections: 16 - listen_address: "[::]:5150" - path: "ws" - # url: "" + listen_address: '[::]:5150' + path: 'ws' + # url: '' leases: max_server_signal_leases: 256 max_server_relay_leases: 8 @@ -558,10 +558,9 @@ pub struct SettingsInner { pub core: Core, } -type Handle = Arc>; - +#[derive(Clone, Debug)] pub struct Settings { - inner: Handle, + inner: Arc>, } impl Settings { diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 85f59193..a356c017 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -1,4 +1,3 @@ -#[cfg(unix)] use crate::client_api; use crate::client_log_channel::*; use crate::settings; @@ -16,392 +15,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use veilid_core::xx::SingleShotEventual; -fn parse_command_line(default_config_path: &OsStr) -> Result { - - let matches = App::new("veilid-server") - .version("0.1") - .about("Veilid Server") - .color(clap::ColorChoice::Auto) - .arg( - Arg::new("daemon") - .long("daemon") - .short('d') - .help("Run in daemon mode in the background"), - ) - .arg( - Arg::new("config-file") - .short('c') - .long("config-file") - .takes_value(true) - .value_name("FILE") - .default_value_os(default_config_path) - .help("Specify a configuration file to use"), - ).arg( - Arg::new("attach") - .long("attach") - .takes_value(true) - .value_name("BOOL") - .possible_values(&["false", "true"]) - .help("Automatically attach the server to the Veilid network"), - ) - // Dev options - .arg( - Arg::new("debug") - .long("debug") - .help("Turn on debug logging on the terminal"), - ) - .arg( - Arg::new("trace") - .long("trace") - .conflicts_with("debug") - .help("Turn on trace logging on the terminal"), - ) - .arg( - Arg::new("subnode_index") - .long("subnode_index") - .takes_value(true) - .help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"), - ) - .arg( - Arg::new("generate-dht-key") - .long("generate-dht-key") - .help("Only generate a new dht key and print it"), - ) - - .arg( - Arg::new("dump-config") - .long("dump-config") - .help("Instead of running the server, print the configuration it would use to the console"), - ) - .arg( - Arg::new("bootstrap") - .long("bootstrap") - .takes_value(true) - .value_name("BOOTSTRAP_LIST") - .help("Specify a list of bootstrap servers to use"), - ) - .arg( - Arg::new("local") - .long("local") - .help("Enable local peer scope") - ); - - #[cfg(debug_assertions)] - let matches = matches.arg( - Arg::new("wait-for-debug") - .long("wait-for-debug") - .help("Wait for debugger to attach"), - ); - - Ok(matches.get_matches()) -} - -lazy_static! { - static ref SHUTDOWN_SWITCH: Mutex>> = - Mutex::new(Some(SingleShotEventual::new(()))); -} - -pub fn shutdown() { - let shutdown_switch = SHUTDOWN_SWITCH.lock().take(); - if let Some(shutdown_switch) = shutdown_switch { - shutdown_switch.resolve(()); - } -} - -pub async fn main() -> Result<(), String> { - // Wait until signal - ctrlc::set_handler(move || { - shutdown(); - }) - .expect("Error setting Ctrl-C handler"); - - // Get command line options - let default_config_path = settings::Settings::get_default_config_path(); - let matches = parse_command_line(default_config_path.as_os_str()) - .map_err(|e| format!("failed to parse command line: {}", e))?; - - // Check for one-off commands - #[cfg(debug_assertions)] - if matches.occurrences_of("wait-for-debug") != 0 { - use bugsalot::debugger; - debugger::wait_until_attached(None).expect("state() not implemented on this platform"); - } - if matches.occurrences_of("generate-id") != 0 { - let (key, secret) = veilid_core::generate_secret(); - println!("Public: {}\nSecret: {}", key.encode(), secret.encode()); - return Ok(()); - } - - // Attempt to load configuration - let settings = settings::Settings::new( - matches.occurrences_of("config-file") == 0, - matches.value_of_os("config-file").unwrap(), - ) - .map_err(|e| format!("configuration is invalid: {}", e))?; - - // write lock the settings - let mut settingsrw = settings.write(); - - // Set config from command line - if matches.occurrences_of("daemon") != 0 { - settingsrw.daemon = true; - settingsrw.logging.terminal.enabled = false; - } - if matches.occurrences_of("subnode_index") != 0 { - let subnode_index = match matches.value_of("subnode_index") { - Some(x) => x - .parse() - .map_err(|e| format!("couldn't parse subnode index: {}", e))?, - None => { - return Err("value not specified for subnode_index".to_owned()); - } - }; - if subnode_index == 0 { - return Err("value of subnode_index should be between 1 and 65535".to_owned()); - } - settingsrw.testing.subnode_index = subnode_index; - } - if matches.occurrences_of("debug") != 0 { - settingsrw.logging.terminal.enabled = true; - settingsrw.logging.terminal.level = settings::LogLevel::Debug; - } - if matches.occurrences_of("trace") != 0 { - settingsrw.logging.terminal.enabled = true; - settingsrw.logging.terminal.level = settings::LogLevel::Trace; - } - if matches.is_present("attach") { - settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true")); - } - if matches.is_present("local") { - settingsrw.core.network.enable_local_peer_scope = true; - } - if matches.occurrences_of("bootstrap") != 0 { - let bootstrap = match matches.value_of("bootstrap") { - Some(x) => { - println!("Overriding bootstrap with: "); - let mut out: Vec = Vec::new(); - for x in x.split(',') { - println!(" {}", x); - out.push(settings::ParsedNodeDialInfo::from_str(x).map_err(|e| { - format!( - "unable to parse dial info in bootstrap list: {} for {}", - e, x - ) - })?); - } - out - } - None => { - return Err("value not specified for bootstrap".to_owned()); - } - }; - settingsrw.core.network.bootstrap = bootstrap; - } - - // Apply subnode index if we're testing - drop(settingsrw); - settings - .apply_subnode_index() - .map_err(|_| "failed to apply subnode index".to_owned())?; - let settingsr = settings.read(); - - // Set up loggers - let mut logs: Vec> = Vec::new(); - let mut client_log_channel: Option = None; - let mut client_log_channel_closer: Option = None; - let mut cb = ConfigBuilder::new(); - cb.add_filter_ignore_str("async_std"); - cb.add_filter_ignore_str("async_io"); - cb.add_filter_ignore_str("polling"); - cb.add_filter_ignore_str("rustls"); - cb.add_filter_ignore_str("async_tungstenite"); - cb.add_filter_ignore_str("tungstenite"); - cb.add_filter_ignore_str("netlink_proto"); - cb.add_filter_ignore_str("netlink_sys"); - - if settingsr.logging.terminal.enabled { - logs.push(TermLogger::new( - settings::convert_loglevel(settingsr.logging.terminal.level), - cb.build(), - TerminalMode::Mixed, - ColorChoice::Auto, - )) - } - if settingsr.logging.file.enabled { - let log_path = Path::new(&settingsr.logging.file.path); - - let logfile; - if settingsr.logging.file.append { - logfile = OpenOptions::new() - .create(true) - .append(true) - .open(log_path) - .map_err(|e| format!("failed to open log file: {}", e))? - } else { - logfile = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(log_path) - .map_err(|e| format!("failed to open log file: {}", e))? - } - logs.push(WriteLogger::new( - settings::convert_loglevel(settingsr.logging.file.level), - cb.build(), - logfile, - )) - } - if settingsr.logging.client.enabled { - let (clog, clogwriter, clogcloser) = ClientLogChannel::new(); - client_log_channel = Some(clog); - client_log_channel_closer = Some(clogcloser); - logs.push(WriteLogger::new( - settings::convert_loglevel(settingsr.logging.client.level), - cb.build(), - clogwriter, - )) - } - - // --- Dump Config --- - if matches.occurrences_of("dump-config") != 0 { - //let cfg = config::Config::try_from(&*settingsr); - return serde_yaml::to_writer(std::io::stdout(), &*settingsr).map_err(|e| e.to_string()); - } - - // --- Normal Startup --- - - CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?; - - // Create Veilid Core - let veilid_core = veilid_core::VeilidCore::new(); - - // Create client api state change pipe - let (sender, receiver): ( - Sender, - Receiver, - ) = bounded(1); - - // Create VeilidCore setup - let vcs = veilid_core::VeilidCoreSetup { - state_change_callback: Arc::new( - move |change: veilid_core::VeilidStateChange| -> veilid_core::SystemPinBoxFuture<()> { - let sender = sender.clone(); - Box::pin(async move { - if sender.send(change).await.is_err() { - error!("error sending state change callback"); - } - }) - }, - ), - config_callback: settings.get_core_config_callback(), - }; - - // Start Veilid Core and get API - let veilid_api = veilid_core - .startup(vcs) - .await - .map_err(|e| format!("VeilidCore startup failed: {}", e))?; - - // Start client api if one is requested - let mut capi = if settingsr.client_api.enabled { - let some_capi = client_api::ClientApi::new(veilid_api.clone()); - some_capi - .clone() - .run(settingsr.client_api.listen_address.addrs.clone()); - Some(some_capi) - } else { - None - }; - - // Drop rwlock on settings - let auto_attach = settingsr.auto_attach; - drop(settingsr); - - // Handle state changes on main thread for capnproto rpc - let state_change_receiver_jh = capi.clone().map(|capi| { - async_std::task::spawn_local(async move { - while let Ok(change) = receiver.recv().await { - capi.clone().handle_state_change(change); - } - }) - }); - // Handle log messages on main thread for capnproto rpc - let client_log_receiver_jh = capi - .clone() - .map(|capi| { - client_log_channel.take().map(|mut client_log_channel| { - async_std::task::spawn_local(async move { - // Batch messages to either 16384 chars at once or every second to minimize packets - let rate = Duration::from_secs(1); - let mut start = Instant::now(); - let mut messages = String::new(); - loop { - let timeout_dur = - rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO); - match async_std::future::timeout(timeout_dur, client_log_channel.recv()) - .await - { - Ok(Ok(message)) => { - messages += &message; - if messages.len() > 16384 { - capi.clone() - .handle_client_log(core::mem::take(&mut messages)); - start = Instant::now(); - } - } - Ok(Err(_)) => break, - Err(_) => { - capi.clone() - .handle_client_log(core::mem::take(&mut messages)); - start = Instant::now(); - } - } - } - }) - }) - }) - .flatten(); - - // Auto-attach if desired - if auto_attach { - info!("Auto-attach to the Veilid network"); - if let Err(e) = veilid_api.attach().await { - error!("Auto-attaching to the Veilid network failed: {:?}", e); - shutdown(); - } - } - - // Idle while waiting to exit - let shutdown_switch = { - let shutdown_switch_locked = SHUTDOWN_SWITCH.lock(); - (*shutdown_switch_locked).as_ref().map(|ss| ss.instance()) - }; - if let Some(shutdown_switch) = shutdown_switch { - shutdown_switch.await; - } - - // Stop the client api if we have one - if let Some(c) = capi.as_mut().cloned() { - c.stop().await; - } - - // Shut down Veilid API to release state change sender - veilid_api.shutdown().await; - - // Close the client api log channel if it is open to release client log sender - if let Some(client_log_channel_closer) = client_log_channel_closer { - client_log_channel_closer.close(); - } - - // Wait for state change receiver to exit - if let Some(state_change_receiver_jh) = state_change_receiver_jh { - state_change_receiver_jh.await; - } - - // Wait for client api log receiver to exit - if let Some(client_log_receiver_jh) = client_log_receiver_jh { - client_log_receiver_jh.await; - } - +pub fn run_daemon(settings: Settings, matches: ArgMatches) -> Result<(), String> { + eprintln!("Windows Service mode not implemented yet."); Ok(()) } diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs new file mode 100644 index 00000000..09503850 --- /dev/null +++ b/veilid-server/src/veilid_logs.rs @@ -0,0 +1,80 @@ +use crate::client_log_channel::*; +use crate::settings::*; +use simplelog::*; +use std::fs::OpenOptions; +use std::path::Path; + +pub struct VeilidLogs { + pub client_log_channel: Option, + pub client_log_channel_closer: Option, +} + +impl VeilidLogs { + pub fn setup_normal_logs(settings: Settings) -> Result { + let settingsr = settings.read(); + + // Set up loggers + let mut logs: Vec> = Vec::new(); + let mut client_log_channel: Option = None; + let mut client_log_channel_closer: Option = None; + let mut cb = ConfigBuilder::new(); + cb.add_filter_ignore_str("async_std"); + cb.add_filter_ignore_str("async_io"); + cb.add_filter_ignore_str("polling"); + cb.add_filter_ignore_str("rustls"); + cb.add_filter_ignore_str("async_tungstenite"); + cb.add_filter_ignore_str("tungstenite"); + cb.add_filter_ignore_str("netlink_proto"); + cb.add_filter_ignore_str("netlink_sys"); + + if settingsr.logging.terminal.enabled { + logs.push(TermLogger::new( + convert_loglevel(settingsr.logging.terminal.level), + cb.build(), + TerminalMode::Mixed, + ColorChoice::Auto, + )) + } + if settingsr.logging.file.enabled { + let log_path = Path::new(&settingsr.logging.file.path); + + let logfile; + if settingsr.logging.file.append { + logfile = OpenOptions::new() + .create(true) + .append(true) + .open(log_path) + .map_err(|e| format!("failed to open log file: {}", e))? + } else { + logfile = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(log_path) + .map_err(|e| format!("failed to open log file: {}", e))? + } + logs.push(WriteLogger::new( + convert_loglevel(settingsr.logging.file.level), + cb.build(), + logfile, + )) + } + if settingsr.logging.client.enabled { + let (clog, clogwriter, clogcloser) = ClientLogChannel::new(); + client_log_channel = Some(clog); + client_log_channel_closer = Some(clogcloser); + logs.push(WriteLogger::new( + convert_loglevel(settingsr.logging.client.level), + cb.build(), + clogwriter, + )) + } + + CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?; + + Ok(VeilidLogs { + client_log_channel, + client_log_channel_closer, + }) + } +} diff --git a/veilid-server/src/windows.rs b/veilid-server/src/windows.rs index 8b137891..79070a34 100644 --- a/veilid-server/src/windows.rs +++ b/veilid-server/src/windows.rs @@ -1 +1,71 @@ +use crate::settings::*; +use clap::ArgMatches; +use log::*; +use std::ffi::OsString; +use std::time::Duration; +use windows_service::service::{ + ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, ServiceStatus, ServiceType, +}; +use windows_service::service_control_handler::ServiceControlHandlerResult; +use windows_service::*; +// Register generated `ffi_service_main` with the system and start the service, blocking +// this thread until the service is stopped. +pub fn run_service(settings: Settings, matches: ArgMatches) -> windows_service::Result<()> { + eprintln!("Windows Service mode not implemented yet."); + + //service_dispatcher::start("veilid-server", ffi_veilid_service_main)?; + // + Ok(()) +} + +/////////////// +define_windows_service!(ffi_veilid_service_main, veilid_service_main); +fn veilid_service_main(arguments: Vec) { + if let Err(e) = register_service_handler(arguments) { + error!("{}", e); + } +} + +/////////////// + +fn register_service_handler(arguments: Vec) -> windows_service::Result<()> { + let event_handler = move |control_event| -> ServiceControlHandlerResult { + match control_event { + ServiceControl::Stop => { + // Handle stop event and return control back to the system. + ServiceControlHandlerResult::NoError + } + // All services must accept Interrogate even if it's a no-op. + ServiceControl::Interrogate => ServiceControlHandlerResult::NoError, + _ => ServiceControlHandlerResult::NotImplemented, + } + }; + + // Register system service event handler + let status_handle = service_control_handler::register("veilid-server", event_handler)?; + + let next_status = ServiceStatus { + // Should match the one from system service registry + service_type: ServiceType::OWN_PROCESS, + // The new state + current_state: ServiceState::Running, + // Accept stop events when running + controls_accepted: ServiceControlAccept::STOP, + // Used to report an error when starting or stopping only, otherwise must be zero + exit_code: ServiceExitCode::Win32(0), + // Only used for pending states, otherwise must be zero + checkpoint: 0, + // Only used for pending states, otherwise must be zero + wait_hint: Duration::default(), + // Unused for setting status + process_id: None, + }; + + // Tell the system that the service is running now + status_handle.set_service_status(next_status)?; + + // Do some work + + Ok(()) +}