From b11f404d3f7d5d84f62a74b3ce11858b2b5a8434 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 16 Dec 2023 15:49:53 -0500 Subject: [PATCH] async-std support --- Cargo.lock | 1 - veilid-cli/Cargo.toml | 2 +- veilid-cli/src/client_api_connection.rs | 5 +- veilid-core/Cargo.toml | 1 - veilid-server/Cargo.toml | 1 + veilid-server/src/client_api.rs | 9 +- veilid-tools/src/ipc/ipc_async_std/unix.rs | 141 +++++++++++++++++++++ veilid-tools/src/ipc/ipc_tokio/unix.rs | 17 ++- 8 files changed, 160 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb42d578..d6d1edbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5576,7 +5576,6 @@ dependencies = [ "nix 0.27.1", "num-traits", "once_cell", - "owning_ref", "paranoid-android", "parking_lot 0.12.1", "paste", diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index f96a708a..7cb24732 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -55,7 +55,7 @@ flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] } thiserror = "^1" crossbeam-channel = "^0" hex = "^0" -veilid-tools = { version = "0.2.5", path = "../veilid-tools" } +veilid-tools = { version = "0.2.5", path = "../veilid-tools", default-features = false} json = "^0" stop-token = { version = "^0", default-features = false } diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 6abf0cab..d507f5ee 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -9,8 +9,7 @@ use stop_token::{future::FutureExt as _, StopSource}; cfg_if! { if #[cfg(feature="rt-async-std")] { - use async_std::io::prelude::BufReadExt; - use async_std::io::WriteExt; + use futures::{AsyncBufReadExt, AsyncWriteExt}; use async_std::io::BufReader; } else if #[cfg(feature="rt-tokio")] { use tokio::io::AsyncBufReadExt; @@ -243,7 +242,7 @@ impl ClientApiConnection { cfg_if! { if #[cfg(feature="rt-async-std")] { use futures::AsyncReadExt; - let (reader, mut writer) = stream.split(); + let (reader, writer) = stream.split(); let reader = BufReader::new(reader); } else { let (reader, writer) = stream.into_split(); diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 795e565a..7bab4989 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -64,7 +64,6 @@ veilid-tools = { version = "0.2.5", path = "../veilid-tools", features = [ ], default-features = false } paste = "1.0.14" once_cell = "1.19.0" -owning_ref = "0.4.1" backtrace = "0.3.69" num-traits = "0.2.17" shell-words = "1.1.0" diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 5e52288f..edb0292b 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" [features] default = ["rt-tokio", "veilid-core/default"] +default-async-std = ["rt-async-std", "veilid-core/default-async-std"] crypto-test = ["rt-tokio", "veilid-core/crypto-test"] crypto-test-none = ["rt-tokio", "veilid-core/crypto-test-none"] diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 6ddc1ec4..8933444a 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -20,8 +20,7 @@ const MAX_NON_JSON_LOGGING: usize = 50; cfg_if! { if #[cfg(feature="rt-async-std")] { - use async_std::io::prelude::BufReadExt; - use async_std::io::WriteExt; + use futures_util::{AsyncBufReadExt, AsyncWriteExt}; } else if #[cfg(feature="rt-tokio")] { use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; @@ -116,11 +115,11 @@ impl ClientApi { return Err(e); } } - let listener = IpcListener::bind(ipc_path.clone()).await?; + let mut listener = IpcListener::bind(ipc_path.clone()).await?; debug!("IPC Client API listening on: {:?}", ipc_path); // Process the incoming accept stream - let mut incoming_stream = listener.incoming(); + let mut incoming_stream = listener.incoming()?; // Make wait group for all incoming connections let awg = AsyncWaitGroup::new(); @@ -444,7 +443,7 @@ impl ClientApi { cfg_if! { if #[cfg(feature="rt-async-std")] { use futures_util::AsyncReadExt; - let (reader, mut writer) = stream.split(); + let (reader, writer) = stream.split(); let reader = BufReader::new(reader); } else { let (reader, writer) = stream.into_split(); diff --git a/veilid-tools/src/ipc/ipc_async_std/unix.rs b/veilid-tools/src/ipc/ipc_async_std/unix.rs index e69de29b..0bbe2565 100644 --- a/veilid-tools/src/ipc/ipc_async_std/unix.rs +++ b/veilid-tools/src/ipc/ipc_async_std/unix.rs @@ -0,0 +1,141 @@ +use crate::*; +use async_std::io::Read as AsyncRead; +use async_std::io::Write as AsyncWrite; +use async_std::os::unix::net::{Incoming, UnixListener, UnixStream}; +use futures_util::AsyncRead as FuturesAsyncRead; +use futures_util::AsyncWrite as FuturesAsyncWrite; +use futures_util::Stream; +use std::path::PathBuf; +use std::{io, path::Path}; + +///////////////////////////////////////////////////////////// + +pub struct IpcStream { + internal: UnixStream, +} + +impl IpcStream { + pub async fn connect>(path: P) -> io::Result { + Ok(IpcStream { + internal: UnixStream::connect(path.as_ref()).await?, + }) + } +} + +impl FuturesAsyncRead for IpcStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + ::poll_read(std::pin::Pin::new(&mut self.internal), cx, buf) + } +} + +impl FuturesAsyncWrite for IpcStream { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + ::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + ::poll_flush(std::pin::Pin::new(&mut self.internal), cx) + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + ::poll_close(std::pin::Pin::new(&mut self.internal), cx) + } +} + +///////////////////////////////////////////////////////////// + +pub struct IpcIncoming<'a> { + path: PathBuf, + internal: Incoming<'a>, +} + +impl<'a> Drop for IpcIncoming<'a> { + fn drop(&mut self) { + // Clean up IPC path + if let Err(e) = std::fs::remove_file(&self.path) { + warn!("Unable to remove IPC socket: {}", e); + } + } +} + +impl<'a> Stream for IpcIncoming<'a> { + type Item = io::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match ::poll_next(std::pin::Pin::new(&mut self.internal), cx) { + std::task::Poll::Ready(ro) => { + std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s }))) + } + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +///////////////////////////////////////////////////////////// + +pub struct IpcListener { + path: Option, + internal: Option>, +} + +impl IpcListener { + /// Creates a new `IpcListener` bound to the specified path. + pub async fn bind>(path: P) -> io::Result { + Ok(Self { + path: Some(path.as_ref().to_path_buf()), + internal: Some(Arc::new(UnixListener::bind(path.as_ref()).await?)), + }) + } + + /// Accepts a new incoming connection to this listener. + pub fn accept(&self) -> SendPinBoxFuture> { + let this = IpcListener { + path: self.path.clone(), + internal: self.internal.clone(), + }; + Box::pin(async move { + Ok(IpcStream { + internal: this.internal.as_ref().unwrap().accept().await?.0, + }) + }) + } + + /// Returns a stream of incoming connections. + pub fn incoming<'a>(&'a mut self) -> io::Result> { + if self.path.is_none() { + return Err(io::Error::from(io::ErrorKind::NotConnected)); + } + Ok(IpcIncoming { + path: self.path.take().unwrap(), + internal: self.internal.as_ref().unwrap().incoming(), + }) + } +} + +impl Drop for IpcListener { + fn drop(&mut self) { + // Clean up IPC path + if let Some(path) = &self.path { + if let Err(e) = std::fs::remove_file(path) { + warn!("Unable to remove IPC socket: {}", e); + } + } + } +} diff --git a/veilid-tools/src/ipc/ipc_tokio/unix.rs b/veilid-tools/src/ipc/ipc_tokio/unix.rs index 8abc434b..7606ec13 100644 --- a/veilid-tools/src/ipc/ipc_tokio/unix.rs +++ b/veilid-tools/src/ipc/ipc_tokio/unix.rs @@ -65,12 +65,13 @@ impl FuturesAsyncWrite for IpcStream { ///////////////////////////////////////////////////////////// -pub struct IpcIncoming { +pub struct IpcIncoming<'a> { path: PathBuf, internal: UnixListenerStream, + phantom: std::marker::PhantomData<&'a ()>, } -impl Stream for IpcIncoming { +impl<'a> Stream for IpcIncoming<'a> { type Item = io::Result; fn poll_next( @@ -87,7 +88,7 @@ impl Stream for IpcIncoming { } } -impl Drop for IpcIncoming { +impl<'a> Drop for IpcIncoming<'a> { fn drop(&mut self) { // Clean up IPC path if let Err(e) = std::fs::remove_file(&self.path) { @@ -126,13 +127,17 @@ impl IpcListener { } /// Returns a stream of incoming connections. - pub fn incoming(mut self) -> IpcIncoming { - IpcIncoming { + pub fn incoming(&mut self) -> io::Result> { + if self.path.is_none() { + return Err(io::Error::from(io::ErrorKind::NotConnected)); + } + Ok(IpcIncoming { path: self.path.take().unwrap(), internal: UnixListenerStream::new( Arc::into_inner(self.internal.take().unwrap()).unwrap(), ), - } + phantom: std::marker::PhantomData, + }) } }