better log channel and fix up message frequency

This commit is contained in:
John Smith 2021-12-11 18:14:24 -05:00
parent 43fd315932
commit 8fe99f6090
3 changed files with 135 additions and 46 deletions

View File

@ -221,7 +221,9 @@ impl NetworkManager {
} }
// reset the state // reset the state
*self.inner.lock() = Self::new_inner(); let mut inner = self.inner.lock();
inner.components = None;
inner.network_class = None;
trace!("NetworkManager::shutdown end"); trace!("NetworkManager::shutdown end");
} }

View File

@ -1,45 +1,103 @@
use async_std::channel::{bounded, Receiver, RecvError, Sender, TrySendError}; use parking_lot::Mutex;
use std::sync::Arc; use std::sync::Arc;
#[derive(Debug)] // Must use async_std channel to send to main thread from blocking thread
struct ClientLogChannelInner { use async_std::channel::bounded as async_bounded;
sender: Sender<String>, use async_std::channel::Receiver as AsyncReceiver;
receiver: Receiver<String>, pub use async_std::channel::RecvError;
// Must use std mpsc so no logs are generated by async code
use std::sync::mpsc::sync_channel as std_sync_channel;
use std::sync::mpsc::SyncSender as StdSender;
use std::sync::mpsc::TrySendError as StdTrySendError;
//////////////////////////////////////////
pub struct ClientLogChannelCloser {
sender: Arc<Mutex<Option<StdSender<String>>>>,
} }
#[derive(Debug, Clone)] impl ClientLogChannelCloser {
pub struct ClientLogChannel { pub fn close(&self) {
inner: Arc<ClientLogChannelInner>, // Drop the sender
} self.sender.lock().take();
impl ClientLogChannel {
pub fn new() -> Self {
let (sender, receiver) = bounded(1024);
Self {
inner: Arc::new(ClientLogChannelInner { sender, receiver }),
}
}
pub async fn recv(&self) -> Result<String, RecvError> {
self.inner.receiver.recv().await
} }
} }
impl std::io::Write for ClientLogChannel { //////////////////////////////////////////
pub struct ClientLogChannelWriterShim {
sender: Arc<Mutex<Option<StdSender<String>>>>,
}
impl std::io::Write for ClientLogChannelWriterShim {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bufstr = String::from_utf8_lossy(buf).to_string(); let bufstr = String::from_utf8_lossy(buf).to_string();
if let Err(e) = self.inner.sender.try_send(bufstr) { let sender = self.sender.lock();
match e { if let Some(sender) = &*sender {
TrySendError::Full(_) => Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)), if let Err(e) = sender.try_send(bufstr) {
TrySendError::Closed(_) => { match e {
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) StdTrySendError::Full(_) => {
Err(std::io::Error::from(std::io::ErrorKind::WouldBlock))
}
StdTrySendError::Disconnected(_) => {
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
} }
} else {
Ok(buf.len())
} }
} else { } else {
Ok(buf.len()) Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
} }
} }
fn flush(&mut self) -> std::io::Result<()> { fn flush(&mut self) -> std::io::Result<()> {
Ok(()) Ok(())
} }
} }
pub type ClientLogChannelWriter = std::io::LineWriter<ClientLogChannelWriterShim>;
//////////////////////////////////////////
pub struct ClientLogChannel {
async_receiver: AsyncReceiver<String>,
}
impl ClientLogChannel {
pub fn new() -> (Self, ClientLogChannelWriter, ClientLogChannelCloser) {
let (async_sender, async_receiver) = async_bounded(1024);
let (std_sender, std_receiver) = std_sync_channel(1024);
let shared_std_sender = Arc::new(Mutex::new(Some(std_sender)));
// Spawn a processing thread for the blocking std sender
async_std::task::spawn(async move {
#[allow(clippy::while_let_loop)]
loop {
let message = match std_receiver.recv() {
Ok(v) => v,
Err(_) => break,
};
if async_sender.send(message).await.is_err() {
break;
}
}
});
(
Self { async_receiver },
ClientLogChannelWriter::with_capacity(
65536,
ClientLogChannelWriterShim {
sender: shared_std_sender.clone(),
},
),
ClientLogChannelCloser {
sender: shared_std_sender,
},
)
}
pub async fn recv(&mut self) -> Result<String, RecvError> {
self.async_receiver.recv().await
}
}

View File

@ -13,6 +13,7 @@ use std::fs::OpenOptions;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use veilid_core::xx::SingleShotEventual; use veilid_core::xx::SingleShotEventual;
fn parse_command_line(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> { fn parse_command_line(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
@ -187,6 +188,7 @@ pub async fn main() -> Result<(), String> {
// Set up loggers // Set up loggers
let mut logs: Vec<Box<dyn SharedLogger>> = Vec::new(); let mut logs: Vec<Box<dyn SharedLogger>> = Vec::new();
let mut client_log_channel: Option<ClientLogChannel> = None; let mut client_log_channel: Option<ClientLogChannel> = None;
let mut client_log_channel_closer: Option<ClientLogChannelCloser> = None;
let mut cb = ConfigBuilder::new(); let mut cb = ConfigBuilder::new();
cb.add_filter_ignore_str("async_std"); cb.add_filter_ignore_str("async_std");
cb.add_filter_ignore_str("async_io"); cb.add_filter_ignore_str("async_io");
@ -228,12 +230,13 @@ pub async fn main() -> Result<(), String> {
)) ))
} }
if settingsr.logging.client.enabled { if settingsr.logging.client.enabled {
let clog = ClientLogChannel::new(); let (clog, clogwriter, clogcloser) = ClientLogChannel::new();
client_log_channel = Some(clog.clone()); client_log_channel = Some(clog);
client_log_channel_closer = Some(clogcloser);
logs.push(WriteLogger::new( logs.push(WriteLogger::new(
settings::convert_loglevel(settingsr.logging.file.level), settings::convert_loglevel(settingsr.logging.file.level),
cb.build(), cb.build(),
std::io::LineWriter::with_capacity(65536, clog), clogwriter,
)) ))
} }
CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?; CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?;
@ -284,26 +287,45 @@ pub async fn main() -> Result<(), String> {
drop(settingsr); drop(settingsr);
// Handle state changes on main thread for capnproto rpc // Handle state changes on main thread for capnproto rpc
let capi_jh = capi.clone().map(|capi| { let state_change_receiver_jh = capi.clone().map(|capi| {
async_std::task::spawn_local(async move { async_std::task::spawn_local(async move {
trace!("state change processing started");
while let Ok(change) = receiver.recv().await { while let Ok(change) = receiver.recv().await {
capi.clone().handle_state_change(change); capi.clone().handle_state_change(change);
} }
trace!("state change processing stopped");
}) })
}); });
// Handle log messages on main thread for capnproto rpc // Handle log messages on main thread for capnproto rpc
let capi_jh2 = capi let client_log_receiver_jh = capi
.clone() .clone()
.map(|capi| { .map(|capi| {
client_log_channel.map(|client_log_channel| { client_log_channel.take().map(|mut client_log_channel| {
async_std::task::spawn_local(async move { async_std::task::spawn_local(async move {
trace!("client logging started"); // Batch messages to either 16384 chars at once or every second to minimize packets
while let Ok(message) = client_log_channel.recv().await { let rate = Duration::from_secs(1);
capi.clone().handle_client_log(message); let mut start = Instant::now();
let mut messages = String::new();
loop {
let timeout_dur =
rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
match async_std::future::timeout(timeout_dur, client_log_channel.recv())
.await
{
Ok(Ok(message)) => {
messages += &message;
if messages.len() > 16384 {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
Ok(Err(_)) => break,
Err(_) => {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
} }
trace!("client logging stopped")
}) })
}) })
}) })
@ -332,15 +354,22 @@ pub async fn main() -> Result<(), String> {
c.stop().await; c.stop().await;
} }
// Shut down Veilid API // Shut down Veilid API to release state change sender
veilid_api.shutdown().await; veilid_api.shutdown().await;
// Wait for client api handlers to exit // Close the client api log channel if it is open to release client log sender
if let Some(capi_jh) = capi_jh { if let Some(client_log_channel_closer) = client_log_channel_closer {
capi_jh.await; client_log_channel_closer.close();
} }
if let Some(capi_jh2) = capi_jh2 {
capi_jh2.await; // Wait for state change receiver to exit
if let Some(state_change_receiver_jh) = state_change_receiver_jh {
state_change_receiver_jh.await;
}
// Wait for client api log receiver to exit
if let Some(client_log_receiver_jh) = client_log_receiver_jh {
client_log_receiver_jh.await;
} }
Ok(()) Ok(())