io read write

This commit is contained in:
Christien Rioux 2024-03-03 18:38:25 -05:00
parent 774abe2225
commit ec71c9631c
10 changed files with 537 additions and 191 deletions

7
Cargo.lock generated
View File

@ -5770,9 +5770,12 @@ dependencies = [
[[package]]
name = "veilid-bugsalot"
version = "0.1.0"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9ee584edf237fac328b891dd06c21e7914a1db3762907edc366a13803451fe3"
checksum = "2836acd414bd560c55c906a636c3bca7f080a8fc21802f18616d6be380819ddc"
dependencies = [
"libc",
]
[[package]]
name = "veilid-cli"

View File

@ -50,7 +50,7 @@ serde_derive = "^1"
parking_lot = "^0"
cfg-if = "^1"
config = { version = "^0", features = ["yaml"] }
bugsalot = { package = "veilid-bugsalot", version = "0.1.0" }
bugsalot = { package = "veilid-bugsalot", version = "0.2.0" }
flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] }
thiserror = "^1"
crossbeam-channel = "^0"

View File

@ -414,7 +414,7 @@ Server Debug Commands:
////////////////////////////////////////////
pub fn log_message(&self, log_level: Level, message: &str) {
self.inner().ui_sender.add_node_event(log_level, message);
self.inner().ui_sender.add_log_event(log_level, message);
}
pub fn update_attachment(&self, attachment: &json::JsonValue) {
@ -481,7 +481,7 @@ Server Debug Commands:
pub fn update_log(&self, log: &json::JsonValue) {
let log_level =
Level::from_str(log["log_level"].as_str().unwrap_or("error")).unwrap_or(Level::Error);
self.inner().ui_sender.add_node_event(
self.inner().ui_sender.add_log_event(
log_level,
&format!(
"{}: {}{}",

View File

@ -1370,6 +1370,22 @@ impl UISender for CursiveUISender {
),
);
}
fn add_log_event(&self, log_color: Level, event: &str) {
let color = {
let inner = self.inner.lock();
*inner.log_colors.get(&log_color).unwrap()
};
let _ = self.push_styled_lines(
color.into(),
format!(
"{}: {}\n",
CursiveUI::cli_ts(CursiveUI::get_start_time()),
event
),
);
}
}
impl CursiveUISender {
pub fn push_styled(&self, styled_string: StyledString) -> std::io::Result<()> {

View File

@ -114,7 +114,6 @@ impl UI for InteractiveUI {
let mut inner = self.inner.lock();
inner.cmdproc = Some(cmdproc);
}
// Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this
fn run_async(&mut self) -> Pin<Box<dyn core::future::Future<Output = ()>>> {
let this = self.clone();
Box::pin(async move {
@ -139,7 +138,12 @@ impl UISender for InteractiveUISender {
}
fn display_string_dialog(&self, title: &str, text: &str, close_cb: UICallback) {
println!("{}: {}", title, text);
let Some(mut stdout) = self.inner.lock().stdout.clone() else {
return;
};
if let Err(e) = writeln!(stdout, "{}: {}", title, text) {
self.inner.lock().error = Some(e.to_string());
}
if let UICallback::Interactive(mut close_cb) = close_cb {
close_cb()
}
@ -186,4 +190,5 @@ impl UISender for InteractiveUISender {
self.inner.lock().error = Some(e.to_string());
}
}
fn add_log_event(&self, _log_color: Level, _event: &str) {}
}

View File

@ -0,0 +1,268 @@
use crate::command_processor::*;
use crate::settings::*;
use crate::tools::*;
use crate::ui::*;
use futures::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
use stop_token::future::FutureExt as StopTokenFutureExt;
use stop_token::*;
use veilid_tools::AsyncMutex;
use flexi_logger::writers::LogWriter;
static FINISHED_LINE: &str = "\x7F ===FINISHED=== \x7F";
pub type IOReadWriteUICallback = Box<dyn FnMut() + Send>;
pub struct IOReadWriteUIInner<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> {
cmdproc: Option<CommandProcessor>,
in_io: Arc<AsyncMutex<BufReader<R>>>,
out_io: Arc<AsyncMutex<BufWriter<W>>>,
out_receiver: flume::Receiver<String>,
out_sender: flume::Sender<String>,
done: Option<StopSource>,
connection_state_receiver: flume::Receiver<ConnectionState>,
}
pub struct IOReadWriteUI<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> {
inner: Arc<Mutex<IOReadWriteUIInner<R, W>>>,
}
impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Clone for IOReadWriteUI<R, W> {
fn clone(&self) -> Self {
IOReadWriteUI {
inner: self.inner.clone(),
}
}
}
impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> IOReadWriteUI<R, W> {
pub fn new(_settings: &Settings, in_io: R, out_io: W) -> (Self, IOReadWriteUISender<R, W>) {
// Create the UI object
let (sender, receiver) = flume::unbounded::<String>();
let (cssender, csreceiver) = flume::unbounded::<ConnectionState>();
let this = Self {
inner: Arc::new(Mutex::new(IOReadWriteUIInner {
cmdproc: None,
in_io: Arc::new(AsyncMutex::new(BufReader::new(in_io))),
out_io: Arc::new(AsyncMutex::new(BufWriter::new(out_io))),
out_receiver: receiver,
out_sender: sender.clone(),
connection_state_receiver: csreceiver,
done: Some(StopSource::new()),
})),
};
let ui_sender = IOReadWriteUISender {
inner: this.inner.clone(),
out_sender: sender,
connection_state_sender: cssender,
};
(this, ui_sender)
}
pub async fn output_loop(&self) {
let out_receiver = self.inner.lock().out_receiver.clone();
let out_io = self.inner.lock().out_io.clone();
let mut out = out_io.lock().await;
let done = self.inner.lock().done.as_ref().unwrap().token();
while let Ok(Ok(line)) = out_receiver.recv_async().timeout_at(done.clone()).await {
if line == FINISHED_LINE {
break;
}
let line = format!("{}\n", line);
if let Err(e) = out.write_all(line.as_bytes()).await {
eprintln!("Error: {:?}", e);
break;
}
if let Err(e) = out.flush().await {
eprintln!("Error: {:?}", e);
break;
}
}
}
pub async fn command_loop(&self) {
let (in_io, out_sender, connection_state_receiver) = {
let inner = self.inner.lock();
(
inner.in_io.clone(),
inner.out_sender.clone(),
inner.connection_state_receiver.clone(),
)
};
let mut in_io = in_io.lock().await;
let done = self.inner.lock().done.as_ref().unwrap().token();
let (exec_sender, exec_receiver) = flume::bounded(1);
// Wait for connection to be established
loop {
match connection_state_receiver.recv_async().await {
Ok(ConnectionState::ConnectedTCP(_, _))
| Ok(ConnectionState::ConnectedIPC(_, _)) => {
break;
}
Ok(ConnectionState::RetryingTCP(_, _)) | Ok(ConnectionState::RetryingIPC(_, _)) => {
}
Ok(ConnectionState::Disconnected) => {}
Err(e) => {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
break;
}
}
}
// Process the input
loop {
let mut line = String::new();
match in_io.read_line(&mut line).timeout_at(done.clone()).await {
Ok(Ok(bytes)) => {
if bytes == 0 {
// Clean exit after everything else is sent
if let Err(e) = out_sender.send(FINISHED_LINE.to_string()) {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
}
break;
}
let line = line.trim();
if !line.is_empty() {
let cmdproc = self.inner.lock().cmdproc.clone();
if let Some(cmdproc) = &cmdproc {
// Run command
if let Err(e) = cmdproc.run_command(
line,
UICallback::IOReadWrite(Box::new({
let exec_sender = exec_sender.clone();
move || {
// Let the next command execute
if let Err(e) = exec_sender.send(()) {
eprintln!("Error: {:?}", e);
}
}
})),
) {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
break;
}
// Wait until command is done executing before running the next line
if let Err(e) = exec_receiver.recv_async().await {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
break;
}
}
}
}
Ok(Err(e)) => {
eprintln!("IO Error: {:?}", e);
self.inner.lock().done.take();
break;
}
Err(_) => {
break;
}
}
}
}
}
impl<R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static> UI
for IOReadWriteUI<R, W>
{
fn set_command_processor(&mut self, cmdproc: CommandProcessor) {
let mut inner = self.inner.lock();
inner.cmdproc = Some(cmdproc);
}
fn run_async(&mut self) -> Pin<Box<dyn core::future::Future<Output = ()>>> {
let this = self.clone();
Box::pin(async move {
let out_fut = this.output_loop();
let cmd_fut = this.command_loop();
futures::join!(out_fut, cmd_fut);
})
}
}
//////////////////////////////////////////////////////////////////////////////
#[derive(Clone)]
pub struct IOReadWriteUISender<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> {
inner: Arc<Mutex<IOReadWriteUIInner<R, W>>>,
out_sender: flume::Sender<String>,
connection_state_sender: flume::Sender<ConnectionState>,
}
impl<R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static> UISender
for IOReadWriteUISender<R, W>
{
fn clone_uisender(&self) -> Box<dyn UISender> {
Box::new(IOReadWriteUISender {
inner: self.inner.clone(),
out_sender: self.out_sender.clone(),
connection_state_sender: self.connection_state_sender.clone(),
})
}
fn as_logwriter(&self) -> Option<Box<dyn LogWriter>> {
None
}
fn display_string_dialog(&self, title: &str, text: &str, close_cb: UICallback) {
if let Err(e) = self.out_sender.send(format!("{}: {}", title, text)) {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
}
if let UICallback::IOReadWrite(mut close_cb) = close_cb {
close_cb()
}
}
fn quit(&self) {
self.inner.lock().done.take();
}
fn send_callback(&self, callback: UICallback) {
if let UICallback::IOReadWrite(mut callback) = callback {
callback();
}
}
fn set_attachment_state(
&mut self,
_state: &str,
_public_internet_ready: bool,
_local_network_ready: bool,
) {
//
}
fn set_network_status(
&mut self,
_started: bool,
_bps_down: u64,
_bps_up: u64,
mut _peers: Vec<json::JsonValue>,
) {
//
}
fn set_config(&mut self, _config: &json::JsonValue) {
//
}
fn set_connection_state(&mut self, state: ConnectionState) {
if let Err(e) = self.connection_state_sender.send(state) {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
}
}
fn add_node_event(&self, _log_color: Level, event: &str) {
if let Err(e) = self.out_sender.send(format!("{}\n", event)) {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
}
}
fn add_log_event(&self, _log_color: Level, _event: &str) {}
}

View File

@ -8,11 +8,13 @@ use crate::{settings::NamedSocketAddrs, tools::*, ui::*};
use clap::{Parser, ValueEnum};
use flexi_logger::*;
use std::path::PathBuf;
mod cached_text_view;
mod client_api_connection;
mod command_processor;
mod cursive_ui;
mod interactive_ui;
mod io_read_write_ui;
mod peers_table_view;
mod settings;
mod tools;
@ -67,6 +69,8 @@ struct CmdlineArgs {
}
fn main() -> Result<(), String> {
// Start async
block_on(async move {
// Get command line options
let default_config_path = settings::Settings::get_default_config_path();
let args = CmdlineArgs::parse();
@ -99,7 +103,11 @@ fn main() -> Result<(), String> {
// If we are running in interactive mode disable some things
let mut enable_cursive = true;
if args.interactive || args.show_log || args.command_file.is_some() || args.evaluate.is_some() {
if args.interactive
|| args.show_log
|| args.command_file.is_some()
|| args.evaluate.is_some()
{
settings.logging.terminal.enabled = false;
enable_cursive = false;
}
@ -117,6 +125,52 @@ fn main() -> Result<(), String> {
Box::new(ui) as Box<dyn UI>,
Box::new(uisender) as Box<dyn UISender>,
)
} else if let Some(command_file) = args.command_file {
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use async_std::prelude::*;
} else if #[cfg(feature="rt-tokio")] {
use tokio_util::compat::{TokioAsyncWriteCompatExt, TokioAsyncReadCompatExt};
let (in_obj, out_obj) =
if command_file.to_string_lossy() == "-" {
(Box::pin(tokio::io::stdin().compat()) as Pin<Box<dyn futures::AsyncRead + Send>>, tokio::io::stdout().compat_write())
} else {
let f = match tokio::fs::File::open(command_file).await {
Ok(v) => v,
Err(e) => {
return Err(e.to_string());
}
};
(Box::pin(f.compat()) as Pin<Box<dyn futures::AsyncRead + Send>>, tokio::io::stdout().compat_write())
};
} else {
compile_error!("needs executor implementation")
}
}
let (ui, uisender) = io_read_write_ui::IOReadWriteUI::new(&settings, in_obj, out_obj);
(
Box::new(ui) as Box<dyn UI>,
Box::new(uisender) as Box<dyn UISender>,
)
} else if let Some(evaluate) = args.evaluate {
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use async_std::prelude::*;
} else if #[cfg(feature="rt-tokio")] {
use tokio_util::compat::{TokioAsyncWriteCompatExt};
let in_str = format!("{}\n", evaluate);
let (in_obj, out_obj) = (futures::io::Cursor::new(in_str), tokio::io::stdout().compat_write());
} else {
compile_error!("needs executor implementation")
}
}
let (ui, uisender) = io_read_write_ui::IOReadWriteUI::new(&settings, in_obj, out_obj);
(
Box::new(ui) as Box<dyn UI>,
Box::new(uisender) as Box<dyn UISender>,
)
} else {
panic!("unknown ui mode");
};
@ -258,8 +312,6 @@ fn main() -> Result<(), String> {
let comproc2 = comproc.clone();
let connection_future = comproc.connection_manager();
// Start async
block_on(async move {
// Start UI
let ui_future = async move {
ui.run_async().await;
@ -281,7 +333,6 @@ fn main() -> Result<(), String> {
compile_error!("needs executor implementation")
}
}
});
Ok(())
})
}

View File

@ -1,6 +1,7 @@
use crate::command_processor::*;
use crate::cursive_ui::CursiveUICallback;
use crate::interactive_ui::InteractiveUICallback;
use crate::io_read_write_ui::IOReadWriteUICallback;
use crate::tools::*;
use flexi_logger::writers::LogWriter;
use log::Level;
@ -8,6 +9,7 @@ use log::Level;
pub enum UICallback {
Cursive(CursiveUICallback),
Interactive(InteractiveUICallback),
IOReadWrite(IOReadWriteUICallback),
}
pub trait UISender: Send {
@ -33,6 +35,7 @@ pub trait UISender: Send {
fn set_config(&mut self, config: &json::JsonValue);
fn set_connection_state(&mut self, state: ConnectionState);
fn add_node_event(&self, log_color: Level, event: &str);
fn add_log_event(&self, log_color: Level, event: &str);
}
pub trait UI {

View File

@ -144,7 +144,7 @@ lz4_flex = { version = "0.11.1", default-features = false, features = [
# Tools
config = { version = "0.13.4", features = ["yaml"] }
bugsalot = { package = "veilid-bugsalot", version = "0.1.0" }
bugsalot = { package = "veilid-bugsalot", version = "0.2.0" }
chrono = "0.4.31"
libc = "0.2.151"
nix = "0.27.1"

View File

@ -76,7 +76,7 @@ futures-util = { version = "^0", default-features = false, features = [
url = "^2"
ctrlc = "^3"
lazy_static = "^1"
bugsalot = { package = "veilid-bugsalot", version = "0.1.0" }
bugsalot = { package = "veilid-bugsalot", version = "0.2.0" }
flume = { version = "^0", features = ["async"] }
rpassword = "^7"
hostname = "^0"