diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index f9e70dcb..831e910b 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -221,7 +221,9 @@ impl NetworkManager { } // reset the state - *self.inner.lock() = Self::new_inner(); + let mut inner = self.inner.lock(); + inner.components = None; + inner.network_class = None; trace!("NetworkManager::shutdown end"); } diff --git a/veilid-server/src/client_log_channel.rs b/veilid-server/src/client_log_channel.rs index 8fc84da9..3e60645f 100644 --- a/veilid-server/src/client_log_channel.rs +++ b/veilid-server/src/client_log_channel.rs @@ -1,45 +1,103 @@ -use async_std::channel::{bounded, Receiver, RecvError, Sender, TrySendError}; +use parking_lot::Mutex; use std::sync::Arc; -#[derive(Debug)] -struct ClientLogChannelInner { - sender: Sender, - receiver: Receiver, +// Must use async_std channel to send to main thread from blocking thread +use async_std::channel::bounded as async_bounded; +use async_std::channel::Receiver as AsyncReceiver; +pub use async_std::channel::RecvError; + +// Must use std mpsc so no logs are generated by async code +use std::sync::mpsc::sync_channel as std_sync_channel; +use std::sync::mpsc::SyncSender as StdSender; +use std::sync::mpsc::TrySendError as StdTrySendError; + +////////////////////////////////////////// + +pub struct ClientLogChannelCloser { + sender: Arc>>>, } -#[derive(Debug, Clone)] -pub struct ClientLogChannel { - inner: Arc, -} - -impl ClientLogChannel { - pub fn new() -> Self { - let (sender, receiver) = bounded(1024); - Self { - inner: Arc::new(ClientLogChannelInner { sender, receiver }), - } - } - - pub async fn recv(&self) -> Result { - self.inner.receiver.recv().await +impl ClientLogChannelCloser { + pub fn close(&self) { + // Drop the sender + self.sender.lock().take(); } } -impl std::io::Write for ClientLogChannel { +////////////////////////////////////////// +pub struct ClientLogChannelWriterShim { + sender: Arc>>>, +} + +impl std::io::Write for ClientLogChannelWriterShim { fn write(&mut self, buf: &[u8]) -> std::io::Result { let bufstr = String::from_utf8_lossy(buf).to_string(); - if let Err(e) = self.inner.sender.try_send(bufstr) { - match e { - TrySendError::Full(_) => Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)), - TrySendError::Closed(_) => { - Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) + let sender = self.sender.lock(); + if let Some(sender) = &*sender { + if let Err(e) = sender.try_send(bufstr) { + match e { + StdTrySendError::Full(_) => { + Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)) + } + StdTrySendError::Disconnected(_) => { + Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) + } } + } else { + Ok(buf.len()) } } else { - Ok(buf.len()) + Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) } } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } } + +pub type ClientLogChannelWriter = std::io::LineWriter; + +////////////////////////////////////////// + +pub struct ClientLogChannel { + async_receiver: AsyncReceiver, +} + +impl ClientLogChannel { + pub fn new() -> (Self, ClientLogChannelWriter, ClientLogChannelCloser) { + let (async_sender, async_receiver) = async_bounded(1024); + let (std_sender, std_receiver) = std_sync_channel(1024); + let shared_std_sender = Arc::new(Mutex::new(Some(std_sender))); + + // Spawn a processing thread for the blocking std sender + async_std::task::spawn(async move { + #[allow(clippy::while_let_loop)] + loop { + let message = match std_receiver.recv() { + Ok(v) => v, + Err(_) => break, + }; + if async_sender.send(message).await.is_err() { + break; + } + } + }); + + ( + Self { async_receiver }, + ClientLogChannelWriter::with_capacity( + 65536, + ClientLogChannelWriterShim { + sender: shared_std_sender.clone(), + }, + ), + ClientLogChannelCloser { + sender: shared_std_sender, + }, + ) + } + + pub async fn recv(&mut self) -> Result { + self.async_receiver.recv().await + } +} diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 8c2761b4..f8959a61 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -13,6 +13,7 @@ use std::fs::OpenOptions; use std::path::Path; use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, Instant}; use veilid_core::xx::SingleShotEventual; fn parse_command_line(default_config_path: &OsStr) -> Result { @@ -187,6 +188,7 @@ pub async fn main() -> Result<(), String> { // Set up loggers let mut logs: Vec> = Vec::new(); let mut client_log_channel: Option = None; + let mut client_log_channel_closer: Option = None; let mut cb = ConfigBuilder::new(); cb.add_filter_ignore_str("async_std"); cb.add_filter_ignore_str("async_io"); @@ -228,12 +230,13 @@ pub async fn main() -> Result<(), String> { )) } if settingsr.logging.client.enabled { - let clog = ClientLogChannel::new(); - client_log_channel = Some(clog.clone()); + let (clog, clogwriter, clogcloser) = ClientLogChannel::new(); + client_log_channel = Some(clog); + client_log_channel_closer = Some(clogcloser); logs.push(WriteLogger::new( settings::convert_loglevel(settingsr.logging.file.level), cb.build(), - std::io::LineWriter::with_capacity(65536, clog), + clogwriter, )) } CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?; @@ -284,26 +287,45 @@ pub async fn main() -> Result<(), String> { drop(settingsr); // Handle state changes on main thread for capnproto rpc - let capi_jh = capi.clone().map(|capi| { + let state_change_receiver_jh = capi.clone().map(|capi| { async_std::task::spawn_local(async move { - trace!("state change processing started"); while let Ok(change) = receiver.recv().await { capi.clone().handle_state_change(change); } - trace!("state change processing stopped"); }) }); // Handle log messages on main thread for capnproto rpc - let capi_jh2 = capi + let client_log_receiver_jh = capi .clone() .map(|capi| { - client_log_channel.map(|client_log_channel| { + client_log_channel.take().map(|mut client_log_channel| { async_std::task::spawn_local(async move { - trace!("client logging started"); - while let Ok(message) = client_log_channel.recv().await { - capi.clone().handle_client_log(message); + // Batch messages to either 16384 chars at once or every second to minimize packets + let rate = Duration::from_secs(1); + let mut start = Instant::now(); + let mut messages = String::new(); + loop { + let timeout_dur = + rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO); + match async_std::future::timeout(timeout_dur, client_log_channel.recv()) + .await + { + Ok(Ok(message)) => { + messages += &message; + if messages.len() > 16384 { + capi.clone() + .handle_client_log(core::mem::take(&mut messages)); + start = Instant::now(); + } + } + Ok(Err(_)) => break, + Err(_) => { + capi.clone() + .handle_client_log(core::mem::take(&mut messages)); + start = Instant::now(); + } + } } - trace!("client logging stopped") }) }) }) @@ -332,15 +354,22 @@ pub async fn main() -> Result<(), String> { c.stop().await; } - // Shut down Veilid API + // Shut down Veilid API to release state change sender veilid_api.shutdown().await; - // Wait for client api handlers to exit - if let Some(capi_jh) = capi_jh { - capi_jh.await; + // Close the client api log channel if it is open to release client log sender + if let Some(client_log_channel_closer) = client_log_channel_closer { + client_log_channel_closer.close(); } - if let Some(capi_jh2) = capi_jh2 { - capi_jh2.await; + + // Wait for state change receiver to exit + if let Some(state_change_receiver_jh) = state_change_receiver_jh { + state_change_receiver_jh.await; + } + + // Wait for client api log receiver to exit + if let Some(client_log_receiver_jh) = client_log_receiver_jh { + client_log_receiver_jh.await; } Ok(())