clean up interactive code

This commit is contained in:
Christien Rioux 2024-03-03 18:48:14 -05:00
parent ec71c9631c
commit e24f0ac8b2
2 changed files with 56 additions and 16 deletions

View File

@ -8,6 +8,8 @@ use crate::ui::*;
use flexi_logger::writers::LogWriter;
use rustyline_async::SharedWriter;
use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
use stop_token::future::FutureExt as StopTokenFutureExt;
use stop_token::*;
pub type InteractiveUICallback = Box<dyn FnMut() + Send>;
@ -15,7 +17,8 @@ pub struct InteractiveUIInner {
cmdproc: Option<CommandProcessor>,
stdout: Option<SharedWriter>,
error: Option<String>,
done: bool,
done: Option<StopSource>,
connection_state_receiver: flume::Receiver<ConnectionState>,
}
#[derive(Clone)]
@ -25,18 +28,22 @@ pub struct InteractiveUI {
impl InteractiveUI {
pub fn new(_settings: &Settings) -> (Self, InteractiveUISender) {
let (cssender, csreceiver) = flume::unbounded::<ConnectionState>();
// Create the UI object
let this = Self {
inner: Arc::new(Mutex::new(InteractiveUIInner {
cmdproc: None,
stdout: None,
error: None,
done: false,
done: Some(StopSource::new()),
connection_state_receiver: csreceiver,
})),
};
let ui_sender = InteractiveUISender {
inner: this.inner.clone(),
connection_state_sender: cssender,
};
(this, ui_sender)
@ -52,18 +59,41 @@ impl InteractiveUI {
}
};
let (connection_state_receiver, done) = {
let inner = self.inner.lock();
(
inner.connection_state_receiver.clone(),
inner.done.as_ref().unwrap().token(),
)
};
self.inner.lock().stdout = Some(stdout.clone());
// Wait for connection to be established
loop {
if self.inner.lock().done {
break;
match connection_state_receiver.recv_async().await {
Ok(ConnectionState::ConnectedTCP(_, _))
| Ok(ConnectionState::ConnectedIPC(_, _)) => {
break;
}
Ok(ConnectionState::RetryingTCP(_, _)) | Ok(ConnectionState::RetryingIPC(_, _)) => {
}
Ok(ConnectionState::Disconnected) => {}
Err(e) => {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
break;
}
}
}
loop {
if let Some(e) = self.inner.lock().error.clone() {
println!("Error: {:?}", e);
break;
}
match readline.readline().await {
Ok(ReadlineEvent::Line(line)) => {
match readline.readline().timeout_at(done.clone()).await {
Ok(Ok(ReadlineEvent::Line(line))) => {
let line = line.trim();
if line == "clear" {
if let Err(e) = readline.clear() {
@ -92,17 +122,20 @@ impl InteractiveUI {
}
}
}
Ok(ReadlineEvent::Interrupted) => {
Ok(Ok(ReadlineEvent::Interrupted)) => {
break;
}
Ok(ReadlineEvent::Eof) => {
Ok(Ok(ReadlineEvent::Eof)) => {
break;
}
Err(ReadlineError::Closed) => {}
Err(ReadlineError::IO(e)) => {
Ok(Err(ReadlineError::Closed)) => {}
Ok(Err(ReadlineError::IO(e))) => {
println!("IO Error: {:?}", e);
break;
}
Err(_) => {
break;
}
}
}
let _ = readline.flush();
@ -127,11 +160,15 @@ impl UI for InteractiveUI {
#[derive(Clone)]
pub struct InteractiveUISender {
inner: Arc<Mutex<InteractiveUIInner>>,
connection_state_sender: flume::Sender<ConnectionState>,
}
impl UISender for InteractiveUISender {
fn clone_uisender(&self) -> Box<dyn UISender> {
Box::new(self.clone())
Box::new(InteractiveUISender {
inner: self.inner.clone(),
connection_state_sender: self.connection_state_sender.clone(),
})
}
fn as_logwriter(&self) -> Option<Box<dyn LogWriter>> {
None
@ -150,7 +187,7 @@ impl UISender for InteractiveUISender {
}
fn quit(&self) {
self.inner.lock().done = true;
self.inner.lock().done.take();
}
fn send_callback(&self, callback: UICallback) {
@ -178,8 +215,11 @@ impl UISender for InteractiveUISender {
fn set_config(&mut self, _config: &json::JsonValue) {
//
}
fn set_connection_state(&mut self, _state: ConnectionState) {
//
fn set_connection_state(&mut self, state: ConnectionState) {
if let Err(e) = self.connection_state_sender.send(state) {
eprintln!("Error: {:?}", e);
self.inner.lock().done.take();
}
}
fn add_node_event(&self, _log_color: Level, event: &str) {

View File

@ -85,17 +85,17 @@ impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> IOReadWriteUI<R,
}
pub async fn command_loop(&self) {
let (in_io, out_sender, connection_state_receiver) = {
let (in_io, out_sender, connection_state_receiver, done) = {
let inner = self.inner.lock();
(
inner.in_io.clone(),
inner.out_sender.clone(),
inner.connection_state_receiver.clone(),
inner.done.as_ref().unwrap().token(),
)
};
let mut in_io = in_io.lock().await;
let done = self.inner.lock().done.as_ref().unwrap().token();
let (exec_sender, exec_receiver) = flume::bounded(1);
// Wait for connection to be established