From 37979277b5dbb6268fa1139770924342c385adf9 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 14 Dec 2023 17:23:43 -0500 Subject: [PATCH] IPC to server --- Cargo.lock | 1 + veilid-cli/src/client_api_connection.rs | 112 ++++++--- veilid-cli/src/command_processor.rs | 124 +++++++--- veilid-cli/src/main.rs | 72 ++++-- veilid-cli/src/settings.rs | 48 +++- veilid-cli/src/ui.rs | 220 ++++++++++++++---- veilid-server/Cargo.toml | 6 +- veilid-server/src/client_api.rs | 190 ++++++++++----- veilid-server/src/server.rs | 28 ++- veilid-server/src/settings.rs | 75 +++--- veilid-tools/Cargo.toml | 2 + veilid-tools/src/ipc/ipc_async_std/mod.rs | 11 + veilid-tools/src/ipc/ipc_async_std/unix.rs | 0 veilid-tools/src/ipc/ipc_async_std/windows.rs | 0 veilid-tools/src/ipc/ipc_tokio/mod.rs | 11 + veilid-tools/src/ipc/ipc_tokio/unix.rs | 114 +++++++++ veilid-tools/src/ipc/ipc_tokio/windows.rs | 0 veilid-tools/src/ipc/mod.rs | 11 + veilid-tools/src/lib.rs | 3 + veilid-tools/src/tools.rs | 28 ++- 20 files changed, 817 insertions(+), 239 deletions(-) create mode 100644 veilid-tools/src/ipc/ipc_async_std/mod.rs create mode 100644 veilid-tools/src/ipc/ipc_async_std/unix.rs create mode 100644 veilid-tools/src/ipc/ipc_async_std/windows.rs create mode 100644 veilid-tools/src/ipc/ipc_tokio/mod.rs create mode 100644 veilid-tools/src/ipc/ipc_tokio/unix.rs create mode 100644 veilid-tools/src/ipc/ipc_tokio/windows.rs create mode 100644 veilid-tools/src/ipc/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f97f6045..637488ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5769,6 +5769,7 @@ dependencies = [ "stop-token", "thiserror", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-oslog", diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index cb39e5de..6abf0cab 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -3,6 +3,7 @@ use crate::tools::*; use futures::stream::FuturesUnordered; use futures::StreamExt; use std::net::SocketAddr; +use std::path::PathBuf; use std::time::SystemTime; use stop_token::{future::FutureExt as _, StopSource}; @@ -20,7 +21,6 @@ cfg_if! { struct ClientApiConnectionInner { comproc: CommandProcessor, - connect_addr: Option, request_sender: Option>, disconnector: Option, disconnect_requested: bool, @@ -38,7 +38,6 @@ impl ClientApiConnection { Self { inner: Arc::new(Mutex::new(ClientApiConnectionInner { comproc, - connect_addr: None, request_sender: None, disconnector: None, disconnect_requested: false, @@ -117,33 +116,15 @@ impl ClientApiConnection { } } - async fn handle_connection(&self, connect_addr: SocketAddr) -> Result<(), String> { - trace!("ClientApiConnection::handle_connection"); - - // Connect the TCP socket - let stream = TcpStream::connect(connect_addr) - .await - .map_err(map_to_string)?; - - // If it succeed, disable nagle algorithm - stream.set_nodelay(true).map_err(map_to_string)?; - - // State we connected - let comproc = self.inner.lock().comproc.clone(); - comproc.set_connection_state(ConnectionState::Connected(connect_addr, SystemTime::now())); - - // Split the stream - cfg_if! { - if #[cfg(feature="rt-async-std")] { - use futures::AsyncReadExt; - let (reader, mut writer) = stream.split(); - let mut reader = BufReader::new(reader); - } else if #[cfg(feature="rt-tokio")] { - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - } - } - + pub async fn run_json_api_processor( + self, + mut reader: R, + mut writer: W, + ) -> Result<(), String> + where + R: AsyncBufReadExt + Unpin + Send, + W: AsyncWriteExt + Unpin + Send, + { // Requests to send let (requests_tx, requests_rx) = flume::unbounded(); @@ -152,7 +133,6 @@ impl ClientApiConnection { let stop_source = StopSource::new(); let token = stop_source.token(); let mut inner = self.inner.lock(); - inner.connect_addr = Some(connect_addr); inner.disconnector = Some(stop_source); inner.request_sender = Some(requests_tx); token @@ -231,7 +211,6 @@ impl ClientApiConnection { inner.request_sender = None; inner.disconnector = None; inner.disconnect_requested = false; - inner.connect_addr = None; // Connection finished if disconnect_requested { @@ -241,6 +220,66 @@ impl ClientApiConnection { } } + async fn handle_tcp_connection(&self, connect_addr: SocketAddr) -> Result<(), String> { + trace!("ClientApiConnection::handle_tcp_connection"); + + // Connect the TCP socket + let stream = TcpStream::connect(connect_addr) + .await + .map_err(map_to_string)?; + + // If it succeed, disable nagle algorithm + stream.set_nodelay(true).map_err(map_to_string)?; + + // State we connected + let comproc = self.inner.lock().comproc.clone(); + comproc.set_connection_state(ConnectionState::ConnectedTCP( + connect_addr, + SystemTime::now(), + )); + + // Split into reader and writer halves + // with line buffering on the reader + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use futures::AsyncReadExt; + let (reader, mut writer) = stream.split(); + let reader = BufReader::new(reader); + } else { + let (reader, writer) = stream.into_split(); + let reader = BufReader::new(reader); + } + } + + self.clone().run_json_api_processor(reader, writer).await + } + + async fn handle_ipc_connection(&self, ipc_path: PathBuf) -> Result<(), String> { + trace!("ClientApiConnection::handle_ipc_connection"); + + // Connect the IPC socket + let stream = IpcStream::connect(&ipc_path).await.map_err(map_to_string)?; + + // State we connected + let comproc = self.inner.lock().comproc.clone(); + comproc.set_connection_state(ConnectionState::ConnectedIPC(ipc_path, SystemTime::now())); + + // Split into reader and writer halves + // with line buffering on the reader + use futures::AsyncReadExt; + let (reader, writer) = stream.split(); + cfg_if! { + if #[cfg(feature = "rt-tokio")] { + use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; + let reader = reader.compat(); + let writer = writer.compat_write(); + } + } + let reader = BufReader::new(reader); + + self.clone().run_json_api_processor(reader, writer).await + } + async fn perform_request(&self, mut req: json::JsonValue) -> Option { let (sender, reply_rx) = { let mut inner = self.inner.lock(); @@ -358,10 +397,15 @@ impl ClientApiConnection { } // Start Client API connection - pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> { - trace!("ClientApiConnection::connect"); + pub async fn ipc_connect(&self, ipc_path: PathBuf) -> Result<(), String> { + trace!("ClientApiConnection::ipc_connect"); + // Save the pathto connect to + self.handle_ipc_connection(ipc_path).await + } + pub async fn tcp_connect(&self, connect_addr: SocketAddr) -> Result<(), String> { + trace!("ClientApiConnection::tcp_connect"); // Save the address to connect to - self.handle_connection(connect_addr).await + self.handle_tcp_connection(connect_addr).await } // End Client API connection diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index cc5d13e1..0438367d 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -4,6 +4,7 @@ use crate::tools::*; use crate::ui::*; use indent::indent_all_by; use std::net::SocketAddr; +use std::path::PathBuf; use std::time::SystemTime; use veilid_tools::*; @@ -22,18 +23,20 @@ pub fn convert_loglevel(s: &str) -> Result { #[derive(PartialEq, Clone)] pub enum ConnectionState { Disconnected, - Connected(SocketAddr, SystemTime), - Retrying(SocketAddr, SystemTime), + ConnectedTCP(SocketAddr, SystemTime), + RetryingTCP(SocketAddr, SystemTime), + ConnectedIPC(PathBuf, SystemTime), + RetryingIPC(PathBuf, SystemTime), } impl ConnectionState { pub fn is_disconnected(&self) -> bool { matches!(*self, Self::Disconnected) } pub fn is_connected(&self) -> bool { - matches!(*self, Self::Connected(_, _)) + matches!(*self, Self::ConnectedTCP(_, _) | Self::ConnectedIPC(_, _)) } pub fn is_retrying(&self) -> bool { - matches!(*self, Self::Retrying(_, _)) + matches!(*self, Self::RetryingTCP(_, _) | Self::RetryingIPC(_, _)) } } @@ -44,7 +47,8 @@ struct CommandProcessorInner { finished: bool, autoconnect: bool, autoreconnect: bool, - server_addr: Option, + ipc_path: Option, + network_addr: Option, connection_waker: Eventual, last_call_id: Option, enable_app_messages: bool, @@ -65,7 +69,8 @@ impl CommandProcessor { finished: false, autoconnect: settings.autoconnect, autoreconnect: settings.autoreconnect, - server_addr: None, + ipc_path: None, + network_addr: None, connection_waker: Eventual::new(), last_call_id: None, enable_app_messages: false, @@ -306,38 +311,75 @@ Server Debug Commands: // Loop while we want to keep the connection let mut first = true; while self.inner().reconnect { - let server_addr_opt = self.inner_mut().server_addr; - let server_addr = match server_addr_opt { - None => break, - Some(addr) => addr, - }; - if first { - info!("Connecting to server at {}", server_addr); - self.set_connection_state(ConnectionState::Retrying( - server_addr, + // IPC + let ipc_path_opt = self.inner_mut().ipc_path.clone(); + if let Some(ipc_path) = ipc_path_opt { + if first { + info!( + "Connecting to server at {}", + ipc_path.to_string_lossy().to_string() + ); + self.set_connection_state(ConnectionState::RetryingIPC( + ipc_path.clone(), + SystemTime::now(), + )); + } else { + debug!( + "Retrying connection to {}", + ipc_path.to_string_lossy().to_string() + ); + } + let capi = self.capi(); + let res = capi.ipc_connect(ipc_path.clone()).await; + if res.is_ok() { + info!( + "Connection to server at {} terminated normally", + ipc_path.to_string_lossy().to_string() + ); + break; + } + if !self.inner().autoreconnect { + info!("Connection to server lost."); + break; + } + + self.set_connection_state(ConnectionState::RetryingIPC( + ipc_path, SystemTime::now(), )); - } else { - debug!("Retrying connection to {}", server_addr); - } - let capi = self.capi(); - let res = capi.connect(server_addr).await; - if res.is_ok() { - info!( - "Connection to server at {} terminated normally", - server_addr - ); - break; - } - if !self.inner().autoreconnect { - info!("Connection to server lost."); - break; } - self.set_connection_state(ConnectionState::Retrying( - server_addr, - SystemTime::now(), - )); + // TCP + let network_addr_opt = self.inner_mut().network_addr; + if let Some(network_addr) = network_addr_opt { + if first { + info!("Connecting to server at {}", network_addr); + self.set_connection_state(ConnectionState::RetryingTCP( + network_addr, + SystemTime::now(), + )); + } else { + debug!("Retrying connection to {}", network_addr); + } + let capi = self.capi(); + let res = capi.tcp_connect(network_addr).await; + if res.is_ok() { + info!( + "Connection to server at {} terminated normally", + network_addr + ); + break; + } + if !self.inner().autoreconnect { + info!("Connection to server lost."); + break; + } + + self.set_connection_state(ConnectionState::RetryingTCP( + network_addr, + SystemTime::now(), + )); + } debug!("Connection lost, retrying in 2 seconds"); { @@ -355,11 +397,17 @@ Server Debug Commands: // called by ui //////////////////////////////////////////// - pub fn set_server_address(&self, server_addr: Option) { - self.inner_mut().server_addr = server_addr; + pub fn set_ipc_path(&self, ipc_path: Option) { + self.inner_mut().ipc_path = ipc_path; } - pub fn get_server_address(&self) -> Option { - self.inner().server_addr + pub fn get_ipc_path(&self) -> Option { + self.inner().ipc_path.clone() + } + pub fn set_network_address(&self, network_addr: Option) { + self.inner_mut().network_addr = network_addr; + } + pub fn get_network_address(&self) -> Option { + self.inner().network_addr } // called by client_api_connection // calls into ui diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index 35e00e88..39a3577a 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -3,11 +3,11 @@ #![deny(unused_must_use)] #![recursion_limit = "256"] -use crate::tools::*; +use crate::{settings::NamedSocketAddrs, tools::*}; use clap::{Parser, ValueEnum}; use flexi_logger::*; -use std::{net::ToSocketAddrs, path::PathBuf}; +use std::path::PathBuf; mod cached_text_view; mod client_api_connection; @@ -28,14 +28,20 @@ enum LogLevel { #[derive(Parser, Debug)] #[command(author, version, about = "Veilid Console Client")] struct CmdlineArgs { + /// IPC socket to connect to + #[arg(long, short = 'p')] + ipc_path: Option, + /// IPC socket to connect to + #[arg(long, short = 'i', default_value = "0")] + subnode_index: usize, /// Address to connect to - #[arg(long)] + #[arg(long, short = 'a')] address: Option, /// Wait for debugger to attach #[arg(long)] wait_for_debug: bool, /// Specify a configuration file to use - #[arg(short, long, value_name = "FILE")] + #[arg(short = 'c', long, value_name = "FILE")] config_file: Option, /// log level #[arg(value_enum)] @@ -123,16 +129,48 @@ fn main() -> Result<(), String> { .expect("failed to initialize logger!"); } } + // Get client address - let server_addrs = if let Some(address_arg) = args.address { - address_arg - .to_socket_addrs() - .map_err(|e| format!("Invalid server address '{}'", e))? - .collect() - } else { - settings.address.addrs.clone() - }; - let server_addr = server_addrs.first().cloned(); + let enable_ipc = settings.enable_ipc; + let mut enable_network = settings.enable_network; + + // Determine IPC path to try + let mut client_api_ipc_path = None; + if enable_ipc { + if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { + if ipc_path.exists() && !ipc_path.is_dir() { + // try direct path + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } else if ipc_path.exists() && ipc_path.is_dir() { + // try subnode index inside path + let ipc_path = ipc_path.join(args.subnode_index.to_string()); + if ipc_path.exists() && !ipc_path.is_dir() { + // subnode indexed path exists + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } + } + } + } + let mut client_api_network_addresses = None; + if enable_network { + let args_address = if let Some(args_address) = args.address { + match NamedSocketAddrs::try_from(args_address) { + Ok(v) => Some(v), + Err(e) => { + return Err(format!("Invalid server address: {}", e)); + } + } + } else { + None + }; + if let Some(address_arg) = args_address.or(settings.address.clone()) { + client_api_network_addresses = Some(address_arg.addrs); + } else if let Some(address) = settings.address.clone() { + client_api_network_addresses = Some(address.addrs.clone()); + } + } // Create command processor debug!("Creating Command Processor "); @@ -147,7 +185,13 @@ fn main() -> Result<(), String> { comproc.set_client_api_connection(capi.clone()); // Keep a connection to the server - comproc.set_server_address(server_addr); + if let Some(client_api_ipc_path) = client_api_ipc_path { + comproc.set_ipc_path(Some(client_api_ipc_path)); + } else if let Some(client_api_network_address) = client_api_network_addresses { + let network_addr = client_api_network_address.first().cloned(); + comproc.set_network_address(network_addr); + } + let comproc2 = comproc.clone(); let connection_future = comproc.connection_manager(); diff --git a/veilid-cli/src/settings.rs b/veilid-cli/src/settings.rs index 2014ab37..5aeea7e9 100644 --- a/veilid-cli/src/settings.rs +++ b/veilid-cli/src/settings.rs @@ -7,6 +7,9 @@ use std::path::{Path, PathBuf}; pub fn load_default_config() -> Result { let default_config = r#"--- +enable_ipc: true +local_socket_path: '%LOCAL_SOCKET_DIRECTORY%' +enable_network: true address: "localhost:5959" autoconnect: true autoreconnect: true @@ -45,6 +48,10 @@ interface: warn : "light yellow" error : "light red" "# + .replace( + "%LOCAL_SOCKET_DIRECTORY%", + &Settings::get_default_local_socket_path().to_string_lossy(), + ) .replace( "%LOGGING_FILE_DIRECTORY%", &Settings::get_default_log_directory().to_string_lossy(), @@ -111,11 +118,22 @@ pub fn convert_loglevel(log_level: LogLevel) -> log::LevelFilter { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NamedSocketAddrs { pub name: String, pub addrs: Vec, } + +impl TryFrom for NamedSocketAddrs { + type Error = std::io::Error; + + fn try_from(value: String) -> Result { + let addrs = value.to_socket_addrs()?.collect(); + let name = value; + Ok(NamedSocketAddrs { name, addrs }) + } +} + impl<'de> serde::Deserialize<'de> for NamedSocketAddrs { fn deserialize(deserializer: D) -> Result where @@ -200,7 +218,10 @@ pub struct Interface { #[derive(Debug, Deserialize)] pub struct Settings { - pub address: NamedSocketAddrs, + pub enable_ipc: bool, + pub ipc_path: Option, + pub enable_network: bool, + pub address: Option, pub autoconnect: bool, pub autoreconnect: bool, pub logging: Logging, @@ -208,6 +229,29 @@ pub struct Settings { } impl Settings { + fn get_server_default_directory(subpath: &str) -> PathBuf { + #[cfg(unix)] + { + let globalpath = PathBuf::from("/var/db/veilid-server").join(subpath); + if globalpath.is_dir() { + return globalpath; + } + } + + let mut ts_path = if let Some(my_proj_dirs) = ProjectDirs::from("org", "Veilid", "Veilid") { + PathBuf::from(my_proj_dirs.data_local_dir()) + } else { + PathBuf::from("./") + }; + ts_path.push(subpath); + + ts_path + } + + pub fn get_default_local_socket_path() -> PathBuf { + Self::get_server_default_directory("local_sockets") + } + pub fn get_default_config_path() -> PathBuf { // Get default configuration file location let mut default_config_path = diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index c42be9ce..c723269d 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -20,6 +20,7 @@ use crate::cached_text_view::*; use chrono::{Datelike, Timelike}; use std::collections::{HashMap, VecDeque}; use std::io::Write; +use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use thiserror::Error; @@ -259,8 +260,17 @@ impl UI { fn peers(s: &mut Cursive) -> ViewRef { s.find_name("peers").unwrap() } - fn connection_address(s: &mut Cursive) -> ViewRef { - s.find_name("connection-address").unwrap() + fn ipc_path(s: &mut Cursive) -> ViewRef { + s.find_name("ipc-path").unwrap() + } + fn ipc_path_radio(s: &mut Cursive) -> ViewRef> { + s.find_name("ipc-path-radio").unwrap() + } + fn network_address(s: &mut Cursive) -> ViewRef { + s.find_name("network-address").unwrap() + } + fn network_address_radio(s: &mut Cursive) -> ViewRef> { + s.find_name("network-address-radio").unwrap() } fn connection_dialog(s: &mut Cursive) -> ViewRef { s.find_name("connection-dialog").unwrap() @@ -321,7 +331,7 @@ impl UI { } } fn render_button_attach<'a>(inner: &mut UIInner) -> (&'a str, bool) { - if let ConnectionState::Connected(_, _) = inner.ui_state.connection_state.get() { + if let ConnectionState::ConnectedTCP(_, _) = inner.ui_state.connection_state.get() { match inner.ui_state.attachment_state.get().as_str() { "Detached" => ("Attach", true), "Attaching" => ("Detach", true), @@ -496,19 +506,39 @@ impl UI { button_attach.set_enabled(button_enable); } - fn submit_connection_address(s: &mut Cursive) { - let edit = Self::connection_address(s); + fn submit_ipc_path(s: &mut Cursive) { + let edit = Self::ipc_path(s); let addr = (*edit.get_content()).clone(); - let sa = match addr.parse::() { - Ok(sa) => Some(sa), + let ipc_path = match addr.parse::() { + Ok(sa) => sa, Err(_) => { - s.add_layer(Dialog::text("Invalid address").button("Close", |s| { + s.add_layer(Dialog::text("Invalid IPC path").button("Close", |s| { s.pop_layer(); })); return; } }; - Self::command_processor(s).set_server_address(sa); + Self::command_processor(s).set_ipc_path(Some(ipc_path)); + Self::command_processor(s).set_network_address(None); + Self::command_processor(s).start_connection(); + } + + fn submit_network_address(s: &mut Cursive) { + let edit = Self::network_address(s); + let addr = (*edit.get_content()).clone(); + let sa = match addr.parse::() { + Ok(sa) => sa, + Err(_) => { + s.add_layer( + Dialog::text("Invalid network address").button("Close", |s| { + s.pop_layer(); + }), + ); + return; + } + }; + Self::command_processor(s).set_ipc_path(None); + Self::command_processor(s).set_network_address(Some(sa)); Self::command_processor(s).start_connection(); } @@ -589,8 +619,19 @@ impl UI { } fn show_connection_dialog(s: &mut Cursive, state: ConnectionState) -> bool { + let is_ipc = Self::command_processor(s).get_ipc_path().is_some(); let mut inner = Self::inner_mut(s); + let mut connection_type_group: RadioGroup = RadioGroup::new().on_change(|s, v| { + if *v == 0 { + Self::ipc_path(s).enable(); + Self::network_address(s).disable(); + } else if *v == 1 { + Self::ipc_path(s).disable(); + Self::network_address(s).enable(); + } + }); + let mut show: bool = false; let mut hide: bool = false; let mut reset: bool = false; @@ -613,7 +654,7 @@ impl UI { reset = true; } } - ConnectionState::Connected(_, _) => { + ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => { if inner.connection_dialog_state.is_some() && !inner .connection_dialog_state @@ -624,7 +665,7 @@ impl UI { hide = true; } } - ConnectionState::Retrying(_, _) => { + ConnectionState::RetryingTCP(_, _) | ConnectionState::RetryingIPC(_, _) => { if inner.connection_dialog_state.is_none() || inner .connection_dialog_state @@ -655,15 +696,42 @@ impl UI { ResizedView::with_full_screen(DummyView {}), ColorStyle::new(PaletteColor::Background, PaletteColor::Background), )); + s.add_layer( Dialog::around( LinearLayout::vertical().child( LinearLayout::horizontal() - .child(TextView::new("Address:")) + .child( + if is_ipc { + connection_type_group.button(0, "IPC Path").selected() + } else { + connection_type_group.button(0, "IPC Path") + } + .with_name("ipc-path-radio"), + ) .child( EditView::new() - .on_submit(|s, _| Self::submit_connection_address(s)) - .with_name("connection-address") + .with_enabled(is_ipc) + .on_submit(|s, _| Self::submit_ipc_path(s)) + .with_name("ipc-path") + .fixed_height(1) + .min_width(40), + ) + .child( + if is_ipc { + connection_type_group.button(1, "Network Address") + } else { + connection_type_group + .button(1, "Network Address") + .selected() + } + .with_name("network-address-radio"), + ) + .child( + EditView::new() + .with_enabled(!is_ipc) + .on_submit(|s, _| Self::submit_network_address(s)) + .with_name("network-address") .fixed_height(1) .min_width(40), ), @@ -693,24 +761,57 @@ impl UI { match new_state { ConnectionState::Disconnected => { - let addr = match Self::command_processor(s).get_server_address() { - None => "".to_owned(), - Some(addr) => addr.to_string(), + Self::ipc_path_radio(s).set_enabled(true); + Self::network_address_radio(s).set_enabled(true); + + let (network_address, network_address_enabled) = + match Self::command_processor(s).get_network_address() { + None => ("".to_owned(), false), + Some(addr) => (addr.to_string(), true), + }; + let mut edit = Self::network_address(s); + edit.set_content(network_address); + edit.set_enabled(network_address_enabled); + + let (ipc_path, ipc_path_enabled) = match Self::command_processor(s).get_ipc_path() { + None => ("".to_owned(), false), + Some(ipc_path) => (ipc_path.to_string_lossy().to_string(), true), }; - debug!("address is {}", addr); - let mut edit = Self::connection_address(s); - edit.set_content(addr); - edit.set_enabled(true); + let mut edit = Self::ipc_path(s); + edit.set_content(ipc_path); + edit.set_enabled(ipc_path_enabled); + let mut dlg = Self::connection_dialog(s); - dlg.add_button("Connect", Self::submit_connection_address); + dlg.add_button("Connect", Self::submit_network_address); } - ConnectionState::Connected(_, _) => {} - ConnectionState::Retrying(addr, _) => { + ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => {} + ConnectionState::RetryingTCP(addr, _) => { + Self::ipc_path_radio(s).set_enabled(false); + Self::network_address_radio(s).set_enabled(false); + // - let mut edit = Self::connection_address(s); - debug!("address is {}", addr); + let mut edit = Self::network_address(s); edit.set_content(addr.to_string()); edit.set_enabled(false); + + Self::ipc_path(s).set_enabled(false); + + let mut dlg = Self::connection_dialog(s); + dlg.add_button("Cancel", |s| { + Self::command_processor(s).cancel_reconnect(); + }); + } + ConnectionState::RetryingIPC(ipc_path, _) => { + Self::ipc_path_radio(s).set_enabled(false); + Self::network_address_radio(s).set_enabled(false); + + // + let mut edit = Self::ipc_path(s); + edit.set_content(ipc_path.to_string_lossy().to_string()); + edit.set_enabled(false); + + Self::network_address(s).set_enabled(false); + let mut dlg = Self::connection_dialog(s); dlg.add_button("Cancel", |s| { Self::command_processor(s).cancel_reconnect(); @@ -732,6 +833,8 @@ impl UI { let mut status = StyledString::new(); + let mut enable_status_fields = false; + match inner.ui_state.connection_state.get() { ConnectionState::Disconnected => { status.append_styled( @@ -740,35 +843,64 @@ impl UI { ); status.append_styled("|", ColorStyle::highlight_inactive()); } - ConnectionState::Retrying(addr, _) => { + ConnectionState::RetryingTCP(addr, _) => { status.append_styled( format!("Reconnecting to {} ", addr), ColorStyle::highlight_inactive(), ); status.append_styled("|", ColorStyle::highlight_inactive()); } - ConnectionState::Connected(addr, _) => { + ConnectionState::RetryingIPC(path, _) => { + status.append_styled( + format!( + "Reconnecting to IPC#{} ", + path.file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned() + ), + ColorStyle::highlight_inactive(), + ); + status.append_styled("|", ColorStyle::highlight_inactive()); + } + ConnectionState::ConnectedTCP(addr, _) => { status.append_styled( format!("Connected to {} ", addr), ColorStyle::highlight_inactive(), ); - status.append_styled("|", ColorStyle::highlight_inactive()); - // Add attachment state - status.append_styled( - format!(" {} ", UI::render_attachment_state(&mut inner)), - ColorStyle::highlight_inactive(), - ); - status.append_styled("|", ColorStyle::highlight_inactive()); - // Add bandwidth status - status.append_styled( - format!(" {} ", UI::render_network_status(&mut inner)), - ColorStyle::highlight_inactive(), - ); - status.append_styled("|", ColorStyle::highlight_inactive()); - // Add tunnel status - status.append_styled(" No Tunnels ", ColorStyle::highlight_inactive()); - status.append_styled("|", ColorStyle::highlight_inactive()); + enable_status_fields = true; } + ConnectionState::ConnectedIPC(path, _) => { + status.append_styled( + format!( + "Connected to IPC#{} ", + path.file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned() + ), + ColorStyle::highlight_inactive(), + ); + enable_status_fields = true; + } + } + if enable_status_fields { + status.append_styled("|", ColorStyle::highlight_inactive()); + // Add attachment state + status.append_styled( + format!(" {} ", UI::render_attachment_state(&mut inner)), + ColorStyle::highlight_inactive(), + ); + status.append_styled("|", ColorStyle::highlight_inactive()); + // Add bandwidth status + status.append_styled( + format!(" {} ", UI::render_network_status(&mut inner)), + ColorStyle::highlight_inactive(), + ); + status.append_styled("|", ColorStyle::highlight_inactive()); + // Add tunnel status + status.append_styled(" No Tunnels ", ColorStyle::highlight_inactive()); + status.append_styled("|", ColorStyle::highlight_inactive()); }; statusbar.set_content(status); diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index b983835f..08cadf60 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -48,10 +48,10 @@ opentelemetry = { version = "0.20" } opentelemetry-otlp = { version = "0.13" } opentelemetry-semantic-conventions = "0.12" async-std = { version = "^1", features = ["unstable"], optional = true } -tokio = { version = "^1", features = ["full", "tracing"], optional = true } +tokio = { version = "1.32.0", features = ["full", "tracing"], optional = true } +tokio-stream = { version = "0.1.14", features = ["net"], optional = true } +tokio-util = { version = "0.7.8", features = ["compat"], optional = true } console-subscriber = { version = "^0", optional = true } -tokio-stream = { version = "^0", features = ["net"], optional = true } -tokio-util = { version = "^0", features = ["compat"], optional = true } async-tungstenite = { package = "veilid-async-tungstenite", version = "^0", features = [ "async-tls", ] } diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 4eb9c39e..3f137b74 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -6,6 +6,7 @@ use futures_util::{future::join_all, stream::FuturesUnordered, StreamExt}; use parking_lot::Mutex; use std::collections::HashMap; use std::net::SocketAddr; +use std::path::PathBuf; use std::sync::Arc; use stop_token::future::FutureExt as _; use stop_token::*; @@ -46,7 +47,7 @@ struct ClientApiInner { settings: Settings, stop: Option, join_handle: Option, - update_channels: HashMap<(SocketAddr, SocketAddr), flume::Sender>, + update_channels: HashMap>, } #[derive(Clone)] @@ -108,9 +109,40 @@ impl ClientApi { trace!("ClientApi::stop: stopped"); } - async fn handle_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> { + async fn handle_ipc_incoming(self, ipc_path: PathBuf) -> std::io::Result<()> { + let listener = IpcListener::bind(ipc_path.clone()).await?; + debug!("IPC Client API listening on: {:?}", ipc_path); + + // Process the incoming accept stream + let mut incoming_stream = listener.incoming(); + + // Make wait group for all incoming connections + let awg = AsyncWaitGroup::new(); + + let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); + while let Ok(Some(stream_result)) = + incoming_stream.next().timeout_at(stop_token.clone()).await + { + // Get the stream to process + let stream = stream_result?; + + // Increment wait group + awg.add(1); + let t_awg = awg.clone(); + + // Process the connection + spawn(self.clone().handle_ipc_connection(stream, t_awg)).detach(); + } + + // Wait for all connections to terminate + awg.wait().await; + + Ok(()) + } + + async fn handle_tcp_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> { let listener = TcpListener::bind(bind_addr).await?; - debug!("Client API listening on: {:?}", bind_addr); + debug!("TCPClient API listening on: {:?}", bind_addr); // Process the incoming accept stream cfg_if! { @@ -137,7 +169,7 @@ impl ClientApi { let t_awg = awg.clone(); // Process the connection - spawn(self.clone().handle_connection(stream, t_awg)).detach(); + spawn(self.clone().handle_tcp_connection(stream, t_awg)).detach(); } // Wait for all connections to terminate @@ -300,47 +332,11 @@ impl ClientApi { VeilidAPIResult::Ok(None) } - pub async fn handle_connection(self, stream: TcpStream, awg: AsyncWaitGroup) { - // Get address of peer - let peer_addr = match stream.peer_addr() { - Ok(v) => v, - Err(e) => { - eprintln!("can't get peer address: {}", e); - return; - } - }; - // Get local address - let local_addr = match stream.local_addr() { - Ok(v) => v, - Err(e) => { - eprintln!("can't get local address: {}", e); - return; - } - }; - // Get connection tuple - let conn_tuple = (local_addr, peer_addr); - - debug!( - "Accepted Client API Connection: {:?} -> {:?}", - peer_addr, local_addr - ); - - // Make stop token to quit when stop() is requested externally - let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); - - // Split into reader and writer halves - // with line buffering on the reader - cfg_if! { - if #[cfg(feature="rt-async-std")] { - use futures_util::AsyncReadExt; - let (reader, mut writer) = stream.split(); - let reader = BufReader::new(reader); - } else { - let (reader, writer) = stream.into_split(); - let reader = BufReader::new(reader); - } - } - + pub async fn run_json_request_processor(self, reader: R, writer: W, stop_token: StopToken) + where + R: AsyncBufReadExt + Unpin + Send, + W: AsyncWriteExt + Unpin + Send, + { // Make request processor for this connection let api = self.inner.lock().veilid_api.clone(); let jrp = json_api::JsonRequestProcessor::new(api); @@ -354,10 +350,11 @@ impl ClientApi { let (responses_tx, responses_rx) = flume::unbounded(); // Start sending updates + let id = get_timestamp(); self.inner .lock() .update_channels - .insert(conn_tuple, responses_tx.clone()); + .insert(id, responses_tx.clone()); // Request receive processor future // Receives from socket and enqueues RequestLines @@ -407,7 +404,50 @@ impl ClientApi { } // Stop sending updates - self.inner.lock().update_channels.remove(&conn_tuple); + self.inner.lock().update_channels.remove(&id); + } + + pub async fn handle_tcp_connection(self, stream: TcpStream, awg: AsyncWaitGroup) { + // Get address of peer + let peer_addr = match stream.peer_addr() { + Ok(v) => v, + Err(e) => { + eprintln!("can't get peer address: {}", e); + return; + } + }; + // Get local address + let local_addr = match stream.local_addr() { + Ok(v) => v, + Err(e) => { + eprintln!("can't get local address: {}", e); + return; + } + }; + // Get connection tuple + debug!( + "Accepted TCP Client API Connection: {:?} -> {:?}", + peer_addr, local_addr + ); + + // Make stop token to quit when stop() is requested externally + let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); + + // Split into reader and writer halves + // with line buffering on the reader + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use futures_util::AsyncReadExt; + let (reader, mut writer) = stream.split(); + let reader = BufReader::new(reader); + } else { + let (reader, writer) = stream.into_split(); + let reader = BufReader::new(reader); + } + } + + self.run_json_request_processor(reader, writer, stop_token) + .await; debug!( "Closed Client API Connection: {:?} -> {:?}", @@ -417,6 +457,34 @@ impl ClientApi { awg.done(); } + pub async fn handle_ipc_connection(self, stream: IpcStream, awg: AsyncWaitGroup) { + // Get connection tuple + debug!("Accepted IPC Client API Connection"); + + // Make stop token to quit when stop() is requested externally + let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); + + // Split into reader and writer halves + // with line buffering on the reader + use futures_util::AsyncReadExt; + let (reader, writer) = stream.split(); + cfg_if! { + if #[cfg(feature = "rt-tokio")] { + use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; + let reader = reader.compat(); + let writer = writer.compat_write(); + } + } + let reader = BufReader::new(reader); + + self.run_json_request_processor(reader, writer, stop_token) + .await; + + debug!("Closed Client API Connection",); + + awg.done(); + } + pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) { // serialize update to NDJSON let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n"; @@ -431,15 +499,29 @@ impl ClientApi { } #[instrument(level = "trace", skip(self))] - pub fn run(&self, bind_addrs: Vec) { - let bind_futures = bind_addrs.iter().copied().map(|addr| { + pub fn run(&self, ipc_path: Option, tcp_bind_addrs: Vec) { + let mut bind_futures: Vec> = Vec::new(); + + // Local IPC + if let Some(ipc_path) = ipc_path { let this = self.clone(); - async move { - if let Err(e) = this.handle_incoming(addr).await { - warn!("Not binding client API to {}: {}", addr, e); + bind_futures.push(Box::pin(async move { + if let Err(e) = this.handle_ipc_incoming(ipc_path.clone()).await { + warn!("Not binding IPC client API to {:?}: {}", ipc_path, e); } - } - }); + })); + } + + // Network sockets + for addr in tcp_bind_addrs.iter().copied() { + let this = self.clone(); + bind_futures.push(Box::pin(async move { + if let Err(e) = this.handle_tcp_incoming(addr).await { + warn!("Not binding TCP client API to {}: {}", addr, e); + } + })); + } + let bind_futures_join = join_all(bind_futures); self.inner.lock().join_handle = Some(spawn(bind_futures_join)); } diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 8cfbe032..c84e0723 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -50,15 +50,21 @@ pub async fn run_veilid_server_internal( let ( settings_auto_attach, - settings_client_api_enabled, + settings_client_api_ipc_enabled, + settings_client_api_network_enabled, + settings_client_api_ipc_directory, settings_client_api_listen_address_addrs, + subnode_index, ) = { let settingsr = settings.read(); ( settingsr.auto_attach, - settingsr.client_api.enabled, + settingsr.client_api.ipc_enabled, + settingsr.client_api.network_enabled, + settingsr.client_api.ipc_directory.clone(), settingsr.client_api.listen_address.addrs.clone(), + settingsr.testing.subnode_index, ) }; @@ -84,12 +90,22 @@ pub async fn run_veilid_server_internal( .wrap_err("VeilidCore startup failed")?; // Start client api if one is requested - let mut capi = if settings_client_api_enabled && matches!(server_mode, ServerMode::Normal) { + let capi_enabled = settings_client_api_ipc_enabled || settings_client_api_network_enabled; + let mut capi = if capi_enabled && matches!(server_mode, ServerMode::Normal) { let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone(), settings.clone()); - some_capi - .clone() - .run(settings_client_api_listen_address_addrs); + some_capi.clone().run( + if settings_client_api_ipc_enabled { + Some(settings_client_api_ipc_directory.join(subnode_index.to_string())) + } else { + None + }, + if settings_client_api_network_enabled { + settings_client_api_listen_address_addrs + } else { + vec![] + }, + ); Some(some_capi) } else { None diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 575a9c9f..95b3bc6f 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -18,7 +18,9 @@ pub fn load_default_config() -> EyreResult { daemon: enabled: false client_api: - enabled: true + ipc_enabled: false + ipc_directory: '%IPC_DIRECTORY%' + network_enabled: false listen_address: 'localhost:5959' auto_attach: true logging: @@ -158,6 +160,10 @@ core: # url: '' "#, ) + .replace( + "%IPC_DIRECTORY%", + &Settings::get_default_ipc_directory().to_string_lossy(), + ) .replace( "%TABLE_STORE_DIRECTORY%", &VeilidConfigTableStore::default().directory, @@ -172,11 +178,11 @@ core: ) .replace( "%CERTIFICATE_PATH%", - &VeilidConfigTLS::default().certificate_path + &VeilidConfigTLS::default().certificate_path, ) .replace( "%PRIVATE_KEY_PATH%", - &VeilidConfigTLS::default().private_key_path + &VeilidConfigTLS::default().private_key_path, ) .replace( "%REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%", @@ -445,7 +451,9 @@ pub struct Otlp { #[derive(Debug, Deserialize, Serialize)] pub struct ClientApi { - pub enabled: bool, + pub ipc_enabled: bool, + pub ipc_directory: PathBuf, + pub network_enabled: bool, pub listen_address: NamedSocketAddrs, } @@ -798,6 +806,10 @@ impl Settings { .unwrap_or_else(|| PathBuf::from("./veilid-server.conf")) } + pub fn get_default_ipc_directory() -> PathBuf { + Self::get_or_create_default_directory("ipc") + } + pub fn get_default_remote_max_subkey_cache_memory_mb() -> u32 { let sys = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory()); ((sys.free_memory() / (1024u64 * 1024u64)) / 16) as u32 @@ -854,7 +866,9 @@ impl Settings { } set_config_value!(inner.daemon.enabled, value); - set_config_value!(inner.client_api.enabled, value); + set_config_value!(inner.client_api.ipc_enabled, value); + set_config_value!(inner.client_api.ipc_directory, value); + set_config_value!(inner.client_api.network_enabled, value); set_config_value!(inner.client_api.listen_address, value); set_config_value!(inner.auto_attach, value); set_config_value!(inner.logging.system.enabled, value); @@ -1021,13 +1035,9 @@ impl Settings { "protected_store.always_use_insecure_storage" => Ok(Box::new( inner.core.protected_store.always_use_insecure_storage, )), - "protected_store.directory" => Ok(Box::new( - inner - .core - .protected_store - .directory - .clone(), - )), + "protected_store.directory" => { + Ok(Box::new(inner.core.protected_store.directory.clone())) + } "protected_store.delete" => Ok(Box::new(inner.core.protected_store.delete)), "protected_store.device_encryption_key_password" => Ok(Box::new( inner @@ -1044,22 +1054,10 @@ impl Settings { .clone(), )), - "table_store.directory" => Ok(Box::new( - inner - .core - .table_store - .directory - .clone(), - )), + "table_store.directory" => Ok(Box::new(inner.core.table_store.directory.clone())), "table_store.delete" => Ok(Box::new(inner.core.table_store.delete)), - "block_store.directory" => Ok(Box::new( - inner - .core - .block_store - .directory - .clone(), - )), + "block_store.directory" => Ok(Box::new(inner.core.block_store.directory.clone())), "block_store.delete" => Ok(Box::new(inner.core.block_store.delete)), "network.connection_initial_timeout_ms" => { @@ -1214,22 +1212,12 @@ impl Settings { "network.restricted_nat_retries" => { Ok(Box::new(inner.core.network.restricted_nat_retries)) } - "network.tls.certificate_path" => Ok(Box::new( - inner - .core - .network - .tls - .certificate_path - .clone(), - )), - "network.tls.private_key_path" => Ok(Box::new( - inner - .core - .network - .tls - .private_key_path - .clone(), - )), + "network.tls.certificate_path" => { + Ok(Box::new(inner.core.network.tls.certificate_path.clone())) + } + "network.tls.private_key_path" => { + Ok(Box::new(inner.core.network.tls.private_key_path.clone())) + } "network.tls.connection_initial_timeout_ms" => Ok(Box::new( inner.core.network.tls.connection_initial_timeout_ms, )), @@ -1439,7 +1427,8 @@ mod tests { assert_eq!(s.daemon.group, None); assert_eq!(s.daemon.stdout_file, None); assert_eq!(s.daemon.stderr_file, None); - assert!(s.client_api.enabled); + assert!(s.client_api.ipc_enabled); + assert!(!s.client_api.network_enabled); assert_eq!(s.client_api.listen_address.name, "localhost:5959"); assert_eq!( s.client_api.listen_address.addrs, diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index a54dc939..46dbda0f 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -23,6 +23,7 @@ rt-async-std = [ rt-tokio = [ "tokio", "tokio-util", + "tokio-stream", "rtnetlink/tokio_socket", "async_executors/tokio_tp", "async_executors/tokio_io", @@ -66,6 +67,7 @@ flume = { version = "0.11.0", features = ["async"] } async-std = { version = "1.12.0", features = ["unstable"], optional = true } tokio = { version = "1.32.0", features = ["full"], optional = true } tokio-util = { version = "0.7.8", features = ["compat"], optional = true } +tokio-stream = { version = "0.1.14", features = ["net"], optional = true } futures-util = { version = "0.3.28", default-features = false, features = [ "async-await", "sink", diff --git a/veilid-tools/src/ipc/ipc_async_std/mod.rs b/veilid-tools/src/ipc/ipc_async_std/mod.rs new file mode 100644 index 00000000..a07114e9 --- /dev/null +++ b/veilid-tools/src/ipc/ipc_async_std/mod.rs @@ -0,0 +1,11 @@ +use cfg_if::*; + +cfg_if! { + if #[cfg(unix)] { + mod unix; + pub use unix::*; + } else if #[cfg(windows)] { + mod windows; + pub use windows::*; + } +} diff --git a/veilid-tools/src/ipc/ipc_async_std/unix.rs b/veilid-tools/src/ipc/ipc_async_std/unix.rs new file mode 100644 index 00000000..e69de29b diff --git a/veilid-tools/src/ipc/ipc_async_std/windows.rs b/veilid-tools/src/ipc/ipc_async_std/windows.rs new file mode 100644 index 00000000..e69de29b diff --git a/veilid-tools/src/ipc/ipc_tokio/mod.rs b/veilid-tools/src/ipc/ipc_tokio/mod.rs new file mode 100644 index 00000000..a07114e9 --- /dev/null +++ b/veilid-tools/src/ipc/ipc_tokio/mod.rs @@ -0,0 +1,11 @@ +use cfg_if::*; + +cfg_if! { + if #[cfg(unix)] { + mod unix; + pub use unix::*; + } else if #[cfg(windows)] { + mod windows; + pub use windows::*; + } +} diff --git a/veilid-tools/src/ipc/ipc_tokio/unix.rs b/veilid-tools/src/ipc/ipc_tokio/unix.rs new file mode 100644 index 00000000..39246332 --- /dev/null +++ b/veilid-tools/src/ipc/ipc_tokio/unix.rs @@ -0,0 +1,114 @@ +use futures_util::AsyncRead as FuturesAsyncRead; +use futures_util::AsyncWrite as FuturesAsyncWrite; +use futures_util::Stream; +use std::{io, path::Path}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::{UnixListener, UnixStream}; +use tokio_stream::wrappers::UnixListenerStream; +///////////////////////////////////////////////////////////// + +pub struct IpcStream { + internal: UnixStream, +} + +impl IpcStream { + pub async fn connect>(path: P) -> io::Result { + Ok(IpcStream { + internal: UnixStream::connect(path).await?, + }) + } +} + +impl FuturesAsyncRead for IpcStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let mut rb = ReadBuf::new(buf); + match ::poll_read( + std::pin::Pin::new(&mut self.internal), + cx, + &mut rb, + ) { + std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map(|_| rb.filled().len())), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +impl FuturesAsyncWrite for IpcStream { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + ::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + ::poll_flush(std::pin::Pin::new(&mut self.internal), cx) + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + ::poll_shutdown(std::pin::Pin::new(&mut self.internal), cx) + } +} + +///////////////////////////////////////////////////////////// + +pub struct IpcIncoming { + internal: UnixListenerStream, +} + +impl Stream for IpcIncoming { + type Item = io::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match ::poll_next(std::pin::Pin::new(&mut self.internal), cx) + { + std::task::Poll::Ready(ro) => { + std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s }))) + } + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +///////////////////////////////////////////////////////////// + +pub struct IpcListener { + internal: UnixListener, +} + +impl IpcListener { + /// Creates a new `IpcListener` bound to the specified path. + pub async fn bind>(path: P) -> io::Result { + Ok(Self { + internal: UnixListener::bind(path)?, + }) + } + + /// Accepts a new incoming connection to this listener. + pub async fn accept(&self) -> io::Result { + Ok(IpcStream { + internal: self.internal.accept().await?.0, + }) + } + + /// Returns a stream of incoming connections. + pub fn incoming(self) -> IpcIncoming { + IpcIncoming { + internal: UnixListenerStream::new(self.internal), + } + } +} diff --git a/veilid-tools/src/ipc/ipc_tokio/windows.rs b/veilid-tools/src/ipc/ipc_tokio/windows.rs new file mode 100644 index 00000000..e69de29b diff --git a/veilid-tools/src/ipc/mod.rs b/veilid-tools/src/ipc/mod.rs new file mode 100644 index 00000000..ec24eb5d --- /dev/null +++ b/veilid-tools/src/ipc/mod.rs @@ -0,0 +1,11 @@ +use cfg_if::*; + +cfg_if! { + if #[cfg(feature="rt-tokio")] { + mod ipc_tokio; + pub use ipc_tokio::*; + } else if #[cfg(feature="rt-async-std")] { + mod ipc_async_std; + pub use ipc_async_std::*; + } +} diff --git a/veilid-tools/src/lib.rs b/veilid-tools/src/lib.rs index 9f6836ac..c0a5b2a6 100644 --- a/veilid-tools/src/lib.rs +++ b/veilid-tools/src/lib.rs @@ -36,6 +36,7 @@ pub mod eventual_value_clone; pub mod interval; pub mod ip_addr_port; pub mod ip_extra; +pub mod ipc; pub mod log_thru; pub mod must_join_handle; pub mod must_join_single_future; @@ -176,6 +177,8 @@ pub use ip_addr_port::*; #[doc(inline)] pub use ip_extra::*; #[doc(inline)] +pub use ipc::*; +#[doc(inline)] pub use log_thru::*; #[doc(inline)] pub use must_join_handle::*; diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 1d8017c7..e86ca492 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -314,7 +314,7 @@ cfg_if::cfg_if! { pub fn ensure_file_private_owner>(path: P) -> Result<(), String> { let path = path.as_ref(); - if !path.exists() { + if !path.is_file() { return Ok(()); } @@ -330,6 +330,32 @@ cfg_if::cfg_if! { } Ok(()) } + + pub fn ensure_directory_private_owner>(path: P, group_read: bool) -> Result<(), String> + { + let path = path.as_ref(); + if !path.is_dir() { + return Ok(()); + } + + let uid = Uid::effective(); + let gid = Gid::effective(); + let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?; + + let perm = if group_read { + 0o750 + } else { + 0o700 + }; + + if meta.mode() != perm { + std::fs::set_permissions(path,std::fs::Permissions::from_mode(perm)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?; + } + if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() { + return Err("path has incorrect owner/group".to_owned()); + } + Ok(()) + } } else if #[cfg(windows)] { //use std::os::windows::fs::MetadataExt; //use windows_permissions::*;