veilid/veilid-cli/src/command_processor.rs

326 lines
10 KiB
Rust
Raw Normal View History

2021-11-22 11:28:30 -05:00
use crate::client_api_connection::*;
use crate::settings::Settings;
use crate::ui::*;
use crate::veilid_client_capnp::*;
use async_std::prelude::FutureExt;
use log::*;
use std::cell::*;
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::{Duration, SystemTime};
use veilid_core::xx::{Eventual, EventualCommon};
#[derive(PartialEq, Clone)]
pub enum ConnectionState {
Disconnected,
Connected(SocketAddr, SystemTime),
Retrying(SocketAddr, SystemTime),
}
impl ConnectionState {
pub fn is_disconnected(&self) -> bool {
match *self {
Self::Disconnected => true,
_ => false,
}
}
pub fn is_connected(&self) -> bool {
match *self {
Self::Connected(_, _) => true,
_ => false,
}
}
pub fn is_retrying(&self) -> bool {
match *self {
Self::Retrying(_, _) => true,
_ => false,
}
}
}
struct CommandProcessorInner {
ui: UI,
capi: Option<ClientApiConnection>,
reconnect: bool,
finished: bool,
autoconnect: bool,
autoreconnect: bool,
server_addr: Option<SocketAddr>,
connection_waker: Eventual,
}
type Handle<T> = Rc<RefCell<T>>;
#[derive(Clone)]
pub struct CommandProcessor {
inner: Handle<CommandProcessorInner>,
}
impl CommandProcessor {
pub fn new(ui: UI, settings: &Settings) -> Self {
Self {
inner: Rc::new(RefCell::new(CommandProcessorInner {
2021-11-27 21:31:01 -05:00
ui,
2021-11-22 11:28:30 -05:00
capi: None,
reconnect: settings.autoreconnect,
finished: false,
autoconnect: settings.autoconnect,
autoreconnect: settings.autoreconnect,
server_addr: None,
connection_waker: Eventual::new(),
})),
}
}
pub fn set_client_api_connection(&mut self, capi: ClientApiConnection) {
self.inner.borrow_mut().capi = Some(capi);
}
fn inner(&self) -> Ref<CommandProcessorInner> {
self.inner.borrow()
}
fn inner_mut(&self) -> RefMut<CommandProcessorInner> {
self.inner.borrow_mut()
}
fn ui(&self) -> UI {
self.inner.borrow().ui.clone()
}
fn capi(&self) -> ClientApiConnection {
self.inner.borrow().capi.as_ref().unwrap().clone()
}
fn word_split(line: &str) -> (String, Option<String>) {
let trimmed = line.trim();
if let Some(p) = trimmed.find(char::is_whitespace) {
let first = trimmed[0..p].to_owned();
let rest = trimmed[p..].trim_start().to_owned();
(first, Some(rest))
} else {
(trimmed.to_owned(), None)
}
}
pub fn cmd_help(&self, _rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_help");
self.ui().add_node_event(
r#"Commands:
exit/quit - exit the client
disconnect - disconnect the client from the Veilid node
shutdown - shut the server down
attach - attach the server to the Veilid network
detach - detach the server from the Veilid network
"#,
);
let ui = self.ui();
callback(ui);
Ok(())
}
pub fn cmd_exit(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_exit");
let ui = self.ui();
callback(ui);
//
self.ui().quit();
Ok(())
}
pub fn cmd_shutdown(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_shutdown");
let mut capi = self.capi();
let ui = self.ui();
async_std::task::spawn_local(async move {
if let Err(e) = capi.server_shutdown().await {
error!("Server command 'shutdown' failed to execute: {}", e);
}
callback(ui);
});
Ok(())
}
pub fn cmd_attach(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_attach");
let mut capi = self.capi();
let ui = self.ui();
async_std::task::spawn_local(async move {
if let Err(e) = capi.server_attach().await {
error!("Server command 'attach' failed to execute: {}", e);
}
callback(ui);
});
Ok(())
}
pub fn cmd_detach(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_detach");
let mut capi = self.capi();
let ui = self.ui();
async_std::task::spawn_local(async move {
if let Err(e) = capi.server_detach().await {
error!("Server command 'detach' failed to execute: {}", e);
}
callback(ui);
});
Ok(())
}
pub fn cmd_disconnect(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_disconnect");
let mut capi = self.capi();
let ui = self.ui();
async_std::task::spawn_local(async move {
capi.disconnect().await;
callback(ui);
});
Ok(())
}
pub fn run_command(&self, command_line: &str, callback: UICallback) -> Result<(), String> {
//
let (cmd, rest) = Self::word_split(command_line);
match cmd.as_str() {
"help" => self.cmd_help(rest, callback),
"exit" => self.cmd_exit(callback),
"quit" => self.cmd_exit(callback),
"disconnect" => self.cmd_disconnect(callback),
"shutdown" => self.cmd_shutdown(callback),
"attach" => self.cmd_attach(callback),
"detach" => self.cmd_detach(callback),
_ => {
callback(self.ui());
Err(format!("Invalid command: {}", cmd))
}
}
}
pub async fn connection_manager(&mut self) {
// Connect until we're done
while !self.inner_mut().finished {
// Wait for connection request
if !self.inner().autoconnect {
let waker = self.inner_mut().connection_waker.instance_clone(());
waker.await;
} else {
self.inner_mut().autoconnect = false;
}
self.inner_mut().connection_waker.reset();
// Loop while we want to keep the connection
let mut first = true;
while self.inner().reconnect {
2021-11-27 21:31:01 -05:00
let server_addr_opt = self.inner_mut().server_addr;
2021-11-22 11:28:30 -05:00
let server_addr = match server_addr_opt {
None => break,
Some(addr) => addr,
};
if first {
info!("Connecting to server at {}", server_addr);
self.set_connection_state(ConnectionState::Retrying(
2021-11-27 21:31:01 -05:00
server_addr,
2021-11-22 11:28:30 -05:00
SystemTime::now(),
));
} else {
debug!("Retrying connection to {}", server_addr);
}
let mut capi = self.capi();
2021-11-27 21:31:01 -05:00
let res = capi.connect(server_addr).await;
if res.is_ok() {
2021-11-22 11:28:30 -05:00
info!(
"Connection to server at {} terminated normally",
server_addr
);
break;
}
if !self.inner().autoreconnect {
info!("Connection to server lost.");
break;
}
self.set_connection_state(ConnectionState::Retrying(
2021-11-27 21:31:01 -05:00
server_addr,
2021-11-22 11:28:30 -05:00
SystemTime::now(),
));
debug!("Connection lost, retrying in 2 seconds");
{
let waker = self.inner_mut().connection_waker.instance_clone(());
waker
.race(async_std::task::sleep(Duration::from_millis(2000)))
.await;
}
self.inner_mut().connection_waker.reset();
first = false;
}
info!("Disconnected.");
self.set_connection_state(ConnectionState::Disconnected);
self.inner_mut().reconnect = true;
}
}
// called by ui
////////////////////////////////////////////
pub fn set_server_address(&mut self, server_addr: Option<SocketAddr>) {
self.inner_mut().server_addr = server_addr;
}
pub fn get_server_address(&self) -> Option<SocketAddr> {
2021-11-27 21:31:01 -05:00
self.inner().server_addr
2021-11-22 11:28:30 -05:00
}
// called by client_api_connection
// calls into ui
////////////////////////////////////////////
pub fn set_attachment_state(&mut self, state: AttachmentState) {
self.inner_mut().ui.set_attachment_state(state);
}
// called by client_api_connection
// calls into ui
////////////////////////////////////////////
pub fn set_connection_state(&mut self, state: ConnectionState) {
self.inner_mut().ui.set_connection_state(state);
}
// called by ui
////////////////////////////////////////////
pub fn start_connection(&mut self) {
self.inner_mut().reconnect = true;
self.inner_mut().connection_waker.resolve();
}
// pub fn stop_connection(&mut self) {
// self.inner_mut().reconnect = false;
// let mut capi = self.capi().clone();
// async_std::task::spawn_local(async move {
// capi.disconnect().await;
// });
// }
pub fn cancel_reconnect(&mut self) {
self.inner_mut().reconnect = false;
self.inner_mut().connection_waker.resolve();
}
pub fn quit(&mut self) {
self.inner_mut().finished = true;
self.inner_mut().reconnect = false;
self.inner_mut().connection_waker.resolve();
}
// called by ui
// calls into client_api_connection
////////////////////////////////////////////
pub fn attach(&mut self) {
trace!("CommandProcessor::attach");
let mut capi = self.capi();
async_std::task::spawn_local(async move {
if let Err(e) = capi.server_attach().await {
error!("Server command 'attach' failed to execute: {}", e);
}
});
}
pub fn detach(&mut self) {
trace!("CommandProcessor::detach");
let mut capi = self.capi();
async_std::task::spawn_local(async move {
if let Err(e) = capi.server_detach().await {
error!("Server command 'detach' failed to execute: {}", e);
}
});
}
}