async-std support

This commit is contained in:
John Smith 2023-12-16 15:49:53 -05:00 committed by Christien Rioux
parent 4e36524778
commit b11f404d3f
8 changed files with 160 additions and 17 deletions

1
Cargo.lock generated
View File

@ -5576,7 +5576,6 @@ dependencies = [
"nix 0.27.1", "nix 0.27.1",
"num-traits", "num-traits",
"once_cell", "once_cell",
"owning_ref",
"paranoid-android", "paranoid-android",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"paste", "paste",

View File

@ -55,7 +55,7 @@ flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] }
thiserror = "^1" thiserror = "^1"
crossbeam-channel = "^0" crossbeam-channel = "^0"
hex = "^0" hex = "^0"
veilid-tools = { version = "0.2.5", path = "../veilid-tools" } veilid-tools = { version = "0.2.5", path = "../veilid-tools", default-features = false}
json = "^0" json = "^0"
stop-token = { version = "^0", default-features = false } stop-token = { version = "^0", default-features = false }

View File

@ -9,8 +9,7 @@ use stop_token::{future::FutureExt as _, StopSource};
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
use async_std::io::prelude::BufReadExt; use futures::{AsyncBufReadExt, AsyncWriteExt};
use async_std::io::WriteExt;
use async_std::io::BufReader; use async_std::io::BufReader;
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
@ -243,7 +242,7 @@ impl ClientApiConnection {
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
use futures::AsyncReadExt; use futures::AsyncReadExt;
let (reader, mut writer) = stream.split(); let (reader, writer) = stream.split();
let reader = BufReader::new(reader); let reader = BufReader::new(reader);
} else { } else {
let (reader, writer) = stream.into_split(); let (reader, writer) = stream.into_split();

View File

@ -64,7 +64,6 @@ veilid-tools = { version = "0.2.5", path = "../veilid-tools", features = [
], default-features = false } ], default-features = false }
paste = "1.0.14" paste = "1.0.14"
once_cell = "1.19.0" once_cell = "1.19.0"
owning_ref = "0.4.1"
backtrace = "0.3.69" backtrace = "0.3.69"
num-traits = "0.2.17" num-traits = "0.2.17"
shell-words = "1.1.0" shell-words = "1.1.0"

View File

@ -15,6 +15,7 @@ path = "src/main.rs"
[features] [features]
default = ["rt-tokio", "veilid-core/default"] default = ["rt-tokio", "veilid-core/default"]
default-async-std = ["rt-async-std", "veilid-core/default-async-std"]
crypto-test = ["rt-tokio", "veilid-core/crypto-test"] crypto-test = ["rt-tokio", "veilid-core/crypto-test"]
crypto-test-none = ["rt-tokio", "veilid-core/crypto-test-none"] crypto-test-none = ["rt-tokio", "veilid-core/crypto-test-none"]

View File

@ -20,8 +20,7 @@ const MAX_NON_JSON_LOGGING: usize = 50;
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
use async_std::io::prelude::BufReadExt; use futures_util::{AsyncBufReadExt, AsyncWriteExt};
use async_std::io::WriteExt;
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@ -116,11 +115,11 @@ impl ClientApi {
return Err(e); return Err(e);
} }
} }
let listener = IpcListener::bind(ipc_path.clone()).await?; let mut listener = IpcListener::bind(ipc_path.clone()).await?;
debug!("IPC Client API listening on: {:?}", ipc_path); debug!("IPC Client API listening on: {:?}", ipc_path);
// Process the incoming accept stream // Process the incoming accept stream
let mut incoming_stream = listener.incoming(); let mut incoming_stream = listener.incoming()?;
// Make wait group for all incoming connections // Make wait group for all incoming connections
let awg = AsyncWaitGroup::new(); let awg = AsyncWaitGroup::new();
@ -444,7 +443,7 @@ impl ClientApi {
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
use futures_util::AsyncReadExt; use futures_util::AsyncReadExt;
let (reader, mut writer) = stream.split(); let (reader, writer) = stream.split();
let reader = BufReader::new(reader); let reader = BufReader::new(reader);
} else { } else {
let (reader, writer) = stream.into_split(); let (reader, writer) = stream.into_split();

View File

@ -0,0 +1,141 @@
use crate::*;
use async_std::io::Read as AsyncRead;
use async_std::io::Write as AsyncWrite;
use async_std::os::unix::net::{Incoming, UnixListener, UnixStream};
use futures_util::AsyncRead as FuturesAsyncRead;
use futures_util::AsyncWrite as FuturesAsyncWrite;
use futures_util::Stream;
use std::path::PathBuf;
use std::{io, path::Path};
/////////////////////////////////////////////////////////////
pub struct IpcStream {
internal: UnixStream,
}
impl IpcStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcStream> {
Ok(IpcStream {
internal: UnixStream::connect(path.as_ref()).await?,
})
}
}
impl FuturesAsyncRead for IpcStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<io::Result<usize>> {
<UnixStream as AsyncRead>::poll_read(std::pin::Pin::new(&mut self.internal), cx, buf)
}
}
impl FuturesAsyncWrite for IpcStream {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<io::Result<usize>> {
<UnixStream as AsyncWrite>::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
<UnixStream as AsyncWrite>::poll_flush(std::pin::Pin::new(&mut self.internal), cx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
<UnixStream as AsyncWrite>::poll_close(std::pin::Pin::new(&mut self.internal), cx)
}
}
/////////////////////////////////////////////////////////////
pub struct IpcIncoming<'a> {
path: PathBuf,
internal: Incoming<'a>,
}
impl<'a> Drop for IpcIncoming<'a> {
fn drop(&mut self) {
// Clean up IPC path
if let Err(e) = std::fs::remove_file(&self.path) {
warn!("Unable to remove IPC socket: {}", e);
}
}
}
impl<'a> Stream for IpcIncoming<'a> {
type Item = io::Result<IpcStream>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match <Incoming as Stream>::poll_next(std::pin::Pin::new(&mut self.internal), cx) {
std::task::Poll::Ready(ro) => {
std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s })))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
/////////////////////////////////////////////////////////////
pub struct IpcListener {
path: Option<PathBuf>,
internal: Option<Arc<UnixListener>>,
}
impl IpcListener {
/// Creates a new `IpcListener` bound to the specified path.
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Ok(Self {
path: Some(path.as_ref().to_path_buf()),
internal: Some(Arc::new(UnixListener::bind(path.as_ref()).await?)),
})
}
/// Accepts a new incoming connection to this listener.
pub fn accept(&self) -> SendPinBoxFuture<io::Result<IpcStream>> {
let this = IpcListener {
path: self.path.clone(),
internal: self.internal.clone(),
};
Box::pin(async move {
Ok(IpcStream {
internal: this.internal.as_ref().unwrap().accept().await?.0,
})
})
}
/// Returns a stream of incoming connections.
pub fn incoming<'a>(&'a mut self) -> io::Result<IpcIncoming<'a>> {
if self.path.is_none() {
return Err(io::Error::from(io::ErrorKind::NotConnected));
}
Ok(IpcIncoming {
path: self.path.take().unwrap(),
internal: self.internal.as_ref().unwrap().incoming(),
})
}
}
impl Drop for IpcListener {
fn drop(&mut self) {
// Clean up IPC path
if let Some(path) = &self.path {
if let Err(e) = std::fs::remove_file(path) {
warn!("Unable to remove IPC socket: {}", e);
}
}
}
}

View File

@ -65,12 +65,13 @@ impl FuturesAsyncWrite for IpcStream {
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
pub struct IpcIncoming { pub struct IpcIncoming<'a> {
path: PathBuf, path: PathBuf,
internal: UnixListenerStream, internal: UnixListenerStream,
phantom: std::marker::PhantomData<&'a ()>,
} }
impl Stream for IpcIncoming { impl<'a> Stream for IpcIncoming<'a> {
type Item = io::Result<IpcStream>; type Item = io::Result<IpcStream>;
fn poll_next( fn poll_next(
@ -87,7 +88,7 @@ impl Stream for IpcIncoming {
} }
} }
impl Drop for IpcIncoming { impl<'a> Drop for IpcIncoming<'a> {
fn drop(&mut self) { fn drop(&mut self) {
// Clean up IPC path // Clean up IPC path
if let Err(e) = std::fs::remove_file(&self.path) { if let Err(e) = std::fs::remove_file(&self.path) {
@ -126,13 +127,17 @@ impl IpcListener {
} }
/// Returns a stream of incoming connections. /// Returns a stream of incoming connections.
pub fn incoming(mut self) -> IpcIncoming { pub fn incoming(&mut self) -> io::Result<IpcIncoming<'_>> {
IpcIncoming { if self.path.is_none() {
return Err(io::Error::from(io::ErrorKind::NotConnected));
}
Ok(IpcIncoming {
path: self.path.take().unwrap(), path: self.path.take().unwrap(),
internal: UnixListenerStream::new( internal: UnixListenerStream::new(
Arc::into_inner(self.internal.take().unwrap()).unwrap(), Arc::into_inner(self.internal.take().unwrap()).unwrap(),
), ),
} phantom: std::marker::PhantomData,
})
} }
} }