From d1aa4888831753b0d63e491db9d245b5aaf49d91 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 15 Dec 2023 18:24:53 -0500 Subject: [PATCH] windows specific ipc logic --- veilid-cli/src/main.rs | 45 +++-- veilid-cli/src/settings.rs | 9 +- veilid-cli/src/ui.rs | 8 +- veilid-flutter/example/pubspec.lock | 26 +-- veilid-server/src/settings.rs | 8 +- veilid-tools/src/ipc/ipc_tokio/windows.rs | 192 ++++++++++++++++++++++ veilid-tools/src/ipc/mod.rs | 21 +++ veilid-tools/src/tools.rs | 28 +++- 8 files changed, 307 insertions(+), 30 deletions(-) diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index c50f6d32..d0156a89 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -8,7 +8,6 @@ use crate::{settings::NamedSocketAddrs, tools::*}; use clap::{Parser, ValueEnum}; use flexi_logger::*; use std::path::PathBuf; - mod cached_text_view; mod client_api_connection; mod command_processor; @@ -137,18 +136,38 @@ fn main() -> Result<(), String> { // Determine IPC path to try let mut client_api_ipc_path = None; if enable_ipc { - if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { - if ipc_path.exists() && !ipc_path.is_dir() { - // try direct path - enable_network = false; - client_api_ipc_path = Some(ipc_path); - } else if ipc_path.exists() && ipc_path.is_dir() { - // try subnode index inside path - let ipc_path = ipc_path.join(args.subnode_index.to_string()); - if ipc_path.exists() && !ipc_path.is_dir() { - // subnode indexed path exists - enable_network = false; - client_api_ipc_path = Some(ipc_path); + cfg_if::cfg_if! { + if #[cfg(windows)] { + if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { + if is_ipc_socket_path(&ipc_path) { + // try direct path + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } else { + // try subnode index inside path + let ipc_path = ipc_path.join(args.subnode_index.to_string()); + if is_ipc_socket_path(&ipc_path) { + // subnode indexed path exists + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } + } + } + } else { + if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { + if is_ipc_socket_path(&ipc_path) { + // try direct path + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } else if ipc_path.exists() && ipc_path.is_dir() { + // try subnode index inside path + let ipc_path = ipc_path.join(args.subnode_index.to_string()); + if is_ipc_socket_path(&ipc_path) { + // subnode indexed path exists + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } + } } } } diff --git a/veilid-cli/src/settings.rs b/veilid-cli/src/settings.rs index 992f325a..c1c55b79 100644 --- a/veilid-cli/src/settings.rs +++ b/veilid-cli/src/settings.rs @@ -229,6 +229,7 @@ pub struct Settings { } impl Settings { + #[allow(dead_code)] fn get_server_default_directory(subpath: &str) -> PathBuf { #[cfg(unix)] { @@ -249,7 +250,13 @@ impl Settings { } pub fn get_default_ipc_directory() -> PathBuf { - Self::get_server_default_directory("ipc") + cfg_if::cfg_if! { + if #[cfg(windows)] { + PathBuf::from(r"\\.\PIPE\veilid-server") + } else { + Self::get_server_default_directory("ipc") + } + } } pub fn get_default_config_path() -> PathBuf { diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index 85bf5c85..95b6fb53 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -805,7 +805,13 @@ impl UI { edit.set_enabled(ipc_path_enabled); let mut dlg = Self::connection_dialog(s); - dlg.add_button("Connect", Self::submit_network_address); + dlg.add_button("Connect", |s| { + if Self::ipc_path_radio(s).is_selected() { + Self::submit_ipc_path(s); + } else { + Self::submit_network_address(s); + } + }); } ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => {} ConnectionState::RetryingTCP(addr, _) => { diff --git a/veilid-flutter/example/pubspec.lock b/veilid-flutter/example/pubspec.lock index 28671d7b..73fc4fc6 100644 --- a/veilid-flutter/example/pubspec.lock +++ b/veilid-flutter/example/pubspec.lock @@ -61,10 +61,10 @@ packages: dependency: transitive description: name: collection - sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a + sha256: f092b211a4319e98e5ff58223576de6c2803db36221657b46c82574721240687 url: "https://pub.dev" source: hosted - version: "1.18.0" + version: "1.17.2" convert: dependency: transitive description: @@ -220,10 +220,10 @@ packages: dependency: transitive description: name: meta - sha256: a6e590c838b18133bb482a2745ad77c5bb7715fb0451209e1a7567d416678b8e + sha256: "3c74dbf8763d36539f114c799d8a2d87343b5067e9d796ca22b5eb8437090ee3" url: "https://pub.dev" source: hosted - version: "1.10.0" + version: "1.9.1" path: dependency: "direct main" description: @@ -329,18 +329,18 @@ packages: dependency: transitive description: name: stack_trace - sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b" + sha256: c3c7d8edb15bee7f0f74debd4b9c5f3c2ea86766fe4178eb2a18eb30a0bdaed5 url: "https://pub.dev" source: hosted - version: "1.11.1" + version: "1.11.0" stream_channel: dependency: transitive description: name: stream_channel - sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7 + sha256: "83615bee9045c1d322bbbd1ba209b7a749c2cbcdcb3fdd1df8eb488b3279c1c8" url: "https://pub.dev" source: hosted - version: "2.1.2" + version: "2.1.1" string_scanner: dependency: transitive description: @@ -377,10 +377,10 @@ packages: dependency: transitive description: name: test_api - sha256: "5c2f730018264d276c20e4f1503fd1308dfbbae39ec8ee63c5236311ac06954b" + sha256: "75760ffd7786fffdfb9597c35c5b27eaeec82be8edfb6d71d32651128ed7aab8" url: "https://pub.dev" source: hosted - version: "0.6.1" + version: "0.6.0" typed_data: dependency: transitive description: @@ -408,10 +408,10 @@ packages: dependency: transitive description: name: web - sha256: afe077240a270dcfd2aafe77602b4113645af95d0ad31128cc02bce5ac5d5152 + sha256: dc8ccd225a2005c1be616fe02951e2e342092edf968cf0844220383757ef8f10 url: "https://pub.dev" source: hosted - version: "0.3.0" + version: "0.1.4-beta" win32: dependency: transitive description: @@ -437,5 +437,5 @@ packages: source: hosted version: "3.5.0" sdks: - dart: ">=3.2.0-194.0.dev <4.0.0" + dart: ">=3.1.0-185.0.dev <4.0.0" flutter: ">=3.10.6" diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index f2f69434..f2f2c3bc 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -807,7 +807,13 @@ impl Settings { } pub fn get_default_ipc_directory() -> PathBuf { - Self::get_or_create_default_directory("ipc") + cfg_if! { + if #[cfg(windows)] { + PathBuf::from(r"\\.\PIPE\veilid-server") + } else { + Self::get_or_create_default_directory("ipc") + } + } } pub fn get_default_remote_max_subkey_cache_memory_mb() -> u32 { diff --git a/veilid-tools/src/ipc/ipc_tokio/windows.rs b/veilid-tools/src/ipc/ipc_tokio/windows.rs index e69de29b..5e4ecdbe 100644 --- a/veilid-tools/src/ipc/ipc_tokio/windows.rs +++ b/veilid-tools/src/ipc/ipc_tokio/windows.rs @@ -0,0 +1,192 @@ +use crate::*; +use futures_util::stream::FuturesUnordered; +use futures_util::AsyncRead as FuturesAsyncRead; +use futures_util::AsyncWrite as FuturesAsyncWrite; +use futures_util::Stream; +use std::path::PathBuf; +use std::{io, path::Path}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::windows::named_pipe::{ + ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions, +}; +///////////////////////////////////////////////////////////// + +enum IpcStreamInternal { + Client(NamedPipeClient), + Server(NamedPipeServer), +} + +pub struct IpcStream { + internal: IpcStreamInternal, +} + +impl IpcStream { + pub async fn connect>(path: P) -> io::Result { + Ok(IpcStream { + internal: IpcStreamInternal::Client( + ClientOptions::new().open(path.as_ref().to_path_buf().as_os_str())?, + ), + }) + } +} + +impl FuturesAsyncRead for IpcStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + match &mut self.internal { + IpcStreamInternal::Client(client) => { + let mut rb = ReadBuf::new(buf); + match ::poll_read( + std::pin::Pin::new(client), + cx, + &mut rb, + ) { + std::task::Poll::Ready(r) => { + std::task::Poll::Ready(r.map(|_| rb.filled().len())) + } + std::task::Poll::Pending => std::task::Poll::Pending, + } + } + IpcStreamInternal::Server(server) => { + let mut rb = ReadBuf::new(buf); + match ::poll_read( + std::pin::Pin::new(server), + cx, + &mut rb, + ) { + std::task::Poll::Ready(r) => { + std::task::Poll::Ready(r.map(|_| rb.filled().len())) + } + std::task::Poll::Pending => std::task::Poll::Pending, + } + } + } + } +} + +impl FuturesAsyncWrite for IpcStream { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut self.internal { + IpcStreamInternal::Client(client) => { + ::poll_write(std::pin::Pin::new(client), cx, buf) + } + IpcStreamInternal::Server(server) => { + ::poll_write(std::pin::Pin::new(server), cx, buf) + } + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.internal { + IpcStreamInternal::Client(client) => { + ::poll_flush(std::pin::Pin::new(client), cx) + } + IpcStreamInternal::Server(server) => { + ::poll_flush(std::pin::Pin::new(server), cx) + } + } + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.internal { + IpcStreamInternal::Client(client) => { + ::poll_shutdown(std::pin::Pin::new(client), cx) + } + IpcStreamInternal::Server(server) => { + ::poll_shutdown(std::pin::Pin::new(server), cx) + } + } + } +} + +///////////////////////////////////////////////////////////// + +pub struct IpcIncoming { + listener: Arc, + unord: FuturesUnordered>>, +} + +impl Stream for IpcIncoming { + type Item = io::Result; + + fn poll_next<'a>( + mut self: std::pin::Pin<&'a mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.unord.is_empty() { + self.unord.push(Box::pin(self.listener.accept())); + } + match Pin::new(&mut self.unord).poll_next(cx) { + task::Poll::Ready(ro) => { + self.unord.push(Box::pin(self.listener.accept())); + std::task::Poll::Ready(ro) + } + task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +///////////////////////////////////////////////////////////// + +pub struct IpcListener { + path: Option, + internal: Mutex>, +} + +impl IpcListener { + /// Creates a new `IpcListener` bound to the specified path. + pub async fn bind>(path: P) -> io::Result { + let path = path.as_ref().to_path_buf(); + let server = ServerOptions::new() + .first_pipe_instance(true) + .create(&path)?; + Ok(Self { + path: Some(path), + internal: Mutex::new(Some(server)), + }) + } + + /// Accepts a new incoming connection to this listener. + pub fn accept(&self) -> SendPinBoxFuture> { + let mut opt_server = self.internal.lock(); + let Some(server) = opt_server.take() else { + return Box::pin(std::future::ready(Err(io::Error::from( + io::ErrorKind::BrokenPipe, + )))); + }; + let path = self.path.clone().unwrap(); + *opt_server = match ServerOptions::new().create(path) { + Ok(v) => Some(v), + Err(e) => return Box::pin(std::future::ready(Err(e))), + }; + + Box::pin(async move { + server.connect().await?; + + Ok(IpcStream { + internal: IpcStreamInternal::Server(server), + }) + }) + } + + /// Returns a stream of incoming connections. + pub fn incoming(self) -> IpcIncoming { + IpcIncoming { + listener: Arc::new(self), + unord: FuturesUnordered::new(), + } + } +} diff --git a/veilid-tools/src/ipc/mod.rs b/veilid-tools/src/ipc/mod.rs index ec24eb5d..27f81639 100644 --- a/veilid-tools/src/ipc/mod.rs +++ b/veilid-tools/src/ipc/mod.rs @@ -1,4 +1,5 @@ use cfg_if::*; +use std::path::Path; cfg_if! { if #[cfg(feature="rt-tokio")] { @@ -9,3 +10,23 @@ cfg_if! { pub use ipc_async_std::*; } } + +pub fn is_ipc_socket_path>(path: P) -> bool { + cfg_if! { + if #[cfg(windows)] { + let p = path.as_ref().to_path_buf().to_string_lossy().to_string().to_lowercase(); + p.starts_with(r"\\.\pipe") && path.as_ref().exists() + } else if #[cfg(unix)] { + use std::os::unix::fs::FileTypeExt; + let meta = match std::fs::metadata(path) { + Ok(v) => v, + Err(_) => { + return false; + } + }; + meta.file_type().is_socket() + } else { + false + } + } +} diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index e86ca492..ffb552ba 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -363,15 +363,41 @@ cfg_if::cfg_if! { pub fn ensure_file_private_owner>(path: P) -> Result<(), String> { let path = path.as_ref(); - if !path.exists() { + if !path.is_file() { return Ok(()); } Ok(()) } + + pub fn ensure_directory_private_owner>(path: P, _group_read: bool) -> Result<(), String> + { + let path = path.as_ref(); + if !path.is_dir() { + return Ok(()); + } + + Ok(()) + } + } else { pub fn ensure_file_private_owner>(_path: P) -> Result<(), String> { + let path = path.as_ref(); + if !path.is_file() { + return Ok(()); + } + + Ok(()) + } + + pub fn ensure_directory_private_owner>(path: P, _group_read: bool) -> Result<(), String> + { + let path = path.as_ref(); + if !path.is_dir() { + return Ok(()); + } + Ok(()) } }