diff --git a/veilid-tools/src/ipc/ipc_async_std/unix.rs b/veilid-tools/src/ipc/ipc_async_std/unix.rs index 0bbe2565..4d6c1bbd 100644 --- a/veilid-tools/src/ipc/ipc_async_std/unix.rs +++ b/veilid-tools/src/ipc/ipc_async_std/unix.rs @@ -106,6 +106,11 @@ impl IpcListener { /// Accepts a new incoming connection to this listener. pub fn accept(&self) -> SendPinBoxFuture> { + if self.path.is_none() { + return Box::pin(std::future::ready(Err(io::Error::from( + io::ErrorKind::NotConnected, + )))); + } let this = IpcListener { path: self.path.clone(), internal: self.internal.clone(), diff --git a/veilid-tools/src/ipc/ipc_tokio/unix.rs b/veilid-tools/src/ipc/ipc_tokio/unix.rs index 7606ec13..0a2f1a44 100644 --- a/veilid-tools/src/ipc/ipc_tokio/unix.rs +++ b/veilid-tools/src/ipc/ipc_tokio/unix.rs @@ -115,6 +115,11 @@ impl IpcListener { /// Accepts a new incoming connection to this listener. pub fn accept(&self) -> SendPinBoxFuture> { + if self.path.is_none() { + return Box::pin(std::future::ready(Err(io::Error::from( + io::ErrorKind::NotConnected, + )))); + } let this = IpcListener { path: self.path.clone(), internal: self.internal.clone(), diff --git a/veilid-tools/src/ipc/ipc_tokio/windows.rs b/veilid-tools/src/ipc/ipc_tokio/windows.rs index 5e4ecdbe..406b050e 100644 --- a/veilid-tools/src/ipc/ipc_tokio/windows.rs +++ b/veilid-tools/src/ipc/ipc_tokio/windows.rs @@ -114,12 +114,13 @@ impl FuturesAsyncWrite for IpcStream { ///////////////////////////////////////////////////////////// -pub struct IpcIncoming { - listener: Arc, +pub struct IpcIncoming<'a> { + listener: IpcListener, unord: FuturesUnordered>>, + phantom: std::marker::PhantomData<&'a ()>, } -impl Stream for IpcIncoming { +impl<'t> Stream for IpcIncoming<'t> { type Item = io::Result; fn poll_next<'a>( @@ -143,7 +144,7 @@ impl Stream for IpcIncoming { pub struct IpcListener { path: Option, - internal: Mutex>, + internal: Option>>, } impl IpcListener { @@ -155,18 +156,20 @@ impl IpcListener { .create(&path)?; Ok(Self { path: Some(path), - internal: Mutex::new(Some(server)), + internal: Some(Mutex::new(Some(server))), }) } /// Accepts a new incoming connection to this listener. pub fn accept(&self) -> SendPinBoxFuture> { - let mut opt_server = self.internal.lock(); - let Some(server) = opt_server.take() else { + if self.path.is_none() { return Box::pin(std::future::ready(Err(io::Error::from( - io::ErrorKind::BrokenPipe, + io::ErrorKind::NotConnected, )))); - }; + } + let internal = self.internal.as_ref().unwrap(); + let mut opt_server = internal.lock(); + let server = opt_server.take().unwrap(); let path = self.path.clone().unwrap(); *opt_server = match ServerOptions::new().create(path) { Ok(v) => Some(v), @@ -183,10 +186,17 @@ impl IpcListener { } /// Returns a stream of incoming connections. - pub fn incoming(self) -> IpcIncoming { - IpcIncoming { - listener: Arc::new(self), - unord: FuturesUnordered::new(), + pub fn incoming(&mut self) -> io::Result> { + if self.path.is_none() { + return Err(io::Error::from(io::ErrorKind::NotConnected)); } + Ok(IpcIncoming { + listener: IpcListener { + path: self.path.take(), + internal: self.internal.take(), + }, + unord: FuturesUnordered::new(), + phantom: std::marker::PhantomData, + }) } }