diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index b7e12670..e790dcd9 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -1,6 +1,5 @@ use crate::command_processor::*; use crate::tools::*; -use core::str::FromStr; use futures::stream::FuturesUnordered; use futures::StreamExt; use std::net::SocketAddr; @@ -23,7 +22,6 @@ struct ClientApiConnectionInner { comproc: CommandProcessor, connect_addr: Option, request_sender: Option>, - server_settings: Option, disconnector: Option, disconnect_requested: bool, reply_channels: HashMap>, @@ -42,7 +40,6 @@ impl ClientApiConnection { comproc, connect_addr: None, request_sender: None, - server_settings: None, disconnector: None, disconnect_requested: false, reply_channels: HashMap::new(), @@ -56,28 +53,19 @@ impl ClientApiConnection { inner.reply_channels.clear(); } - // async fn process_veilid_state<'a>( - // &'a mut self, - // veilid_state: VeilidState, - // ) -> Result<(), String> { - // let mut inner = self.inner.borrow_mut(); - // inner.comproc.update_attachment(veilid_state.attachment); - // inner.comproc.update_network_status(veilid_state.network); - // inner.comproc.update_config(veilid_state.config); - // Ok(()) - // } + async fn process_veilid_state<'a>(&self, state: &json::JsonValue) { + let comproc = self.inner.lock().comproc.clone(); + comproc.update_attachment(&state["attachment"]); + comproc.update_network_status(&state["network"]); + comproc.update_config(&state["config"]); + } async fn process_response(&self, response: json::JsonValue) { // find the operation id and send the response to the channel for it - let Some(id_str) = response["id"].as_str() else { - error!("missing id: {}", response); - return; - }; - let Ok(id) = u32::from_str(id_str) else { + let Some(id) = response["id"].as_u32() else { error!("invalid id: {}", response); return; }; - let reply_channel = { let mut inner = self.inner.lock(); inner.reply_channels.remove(&id) @@ -92,7 +80,7 @@ impl ClientApiConnection { } } - async fn process_update(&self, update: json::JsonValue) { + async fn process_veilid_update(&self, update: json::JsonValue) { let comproc = self.inner.lock().comproc.clone(); let Some(kind) = update["kind"].as_str() else { comproc.log_message(format!("missing update kind: {}", update)); @@ -100,29 +88,29 @@ impl ClientApiConnection { }; match kind { "Log" => { - comproc.update_log(update); + comproc.update_log(&update); } "AppMessage" => { - comproc.update_app_message(update); + comproc.update_app_message(&update); } "AppCall" => { - comproc.update_app_call(update); + comproc.update_app_call(&update); } "Attachment" => { - comproc.update_attachment(update); + comproc.update_attachment(&update); } "Network" => { - comproc.update_network_status(update); + comproc.update_network_status(&update); } "Config" => { - comproc.update_config(update); + comproc.update_config(&update); } "RouteChange" => { - comproc.update_route(update); + comproc.update_route(&update); } "Shutdown" => comproc.update_shutdown(), "ValueChange" => { - comproc.update_value_change(update); + comproc.update_value_change(&update); } _ => { comproc.log_message(format!("unknown update kind: {}", update)); @@ -130,97 +118,6 @@ impl ClientApiConnection { } } - // async fn spawn_rpc_system( - // &self, - // connect_addr: SocketAddr, - // mut rpc_system: RpcSystem, - // ) -> Result<(), String> { - // let mut request; - // { - // let mut inner = self.inner.borrow_mut(); - - // // Get the bootstrap server connection object - // inner.server = Some(Rc::new(RefCell::new( - // rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server), - // ))); - - // // Store our disconnector future for later (must happen after bootstrap, contrary to documentation) - // inner.disconnector = Some(rpc_system.get_disconnector()); - - // // Get a client object to pass to the server for status update callbacks - // let client = capnp_rpc::new_client(VeilidClientImpl::new(inner.comproc.clone())); - - // // Register our client and get a registration object back - // request = inner - // .server - // .as_ref() - // .unwrap() - // .borrow_mut() - // .register_request(); - // request.get().set_veilid_client(client); - - // inner - // .comproc - // .set_connection_state(ConnectionState::Connected( - // connect_addr, - // std::time::SystemTime::now(), - // )); - // } - - // let rpc_jh = spawn_local(rpc_system); - - // let reg_res: Result = (async { - // // Send the request and get the state object and the registration object - // let response = request - // .send() - // .promise - // .await - // .map_err(|e| format!("failed to send register request: {}", e))?; - // let response = response - // .get() - // .map_err(|e| format!("failed to get register response: {}", e))?; - - // // Get the registration object, which drops our connection when it is dropped - // let registration = response - // .get_registration() - // .map_err(|e| format!("failed to get registration object: {}", e))?; - - // // Get the initial veilid state - // let veilid_state = response - // .get_state() - // .map_err(|e| format!("failed to get initial veilid state: {}", e))?; - - // // Set up our state for the first time - // let veilid_state: VeilidState = deserialize_json(veilid_state) - // .map_err(|e| format!("failed to get deserialize veilid state: {}", e))?; - // self.process_veilid_state(veilid_state).await?; - - // // Save server settings - // let server_settings = response - // .get_settings() - // .map_err(|e| format!("failed to get initial veilid server settings: {}", e))? - // .to_owned(); - // self.inner.borrow_mut().server_settings = Some(server_settings.clone()); - - // // Don't drop the registration, doing so will remove the client - // // object mapping from the server which we need for the update backchannel - // Ok(registration) - // }) - // .await; - - // let _registration = match reg_res { - // Ok(v) => v, - // Err(e) => { - // rpc_jh.abort().await; - // return Err(e); - // } - // }; - - // // Wait until rpc system completion or disconnect was requested - // let res = rpc_jh.await; - // res.map_err(|e| format!("client RPC system error: {}", e)) - // } - async fn handle_connection(&self, connect_addr: SocketAddr) -> Result<(), String> { trace!("ClientApiConnection::handle_connection"); @@ -251,6 +148,7 @@ impl ClientApiConnection { // Requests to send let (requests_tx, requests_rx) = flume::unbounded(); + // Create disconnection mechanism let stop_token = { let stop_source = StopSource::new(); let token = stop_source.token(); @@ -267,16 +165,19 @@ impl ClientApiConnection { // Process lines let this = self.clone(); let recv_messages_future = async move { - let mut line = String::new(); - while let Ok(size) = reader.read_line(&mut line).await { + let mut linebuf = String::new(); + while let Ok(size) = reader.read_line(&mut linebuf).await { // Exit on EOF if size == 0 { // Disconnected break; } + let line = linebuf.trim().to_owned(); + linebuf.clear(); + // Unmarshal json - let j = match json::parse(line.trim()) { + let j = match json::parse(&line) { Ok(v) => v, Err(e) => { error!("failed to parse server response: {}", e); @@ -285,7 +186,7 @@ impl ClientApiConnection { }; if j["type"] == "Update" { - this.process_update(j).await; + this.process_veilid_update(j).await; } else if j["type"] == "Response" { this.process_response(j).await; } @@ -306,13 +207,28 @@ impl ClientApiConnection { }; unord.push(system_boxed(send_requests_future)); + // Request initial server state + let capi = self.clone(); + spawn_detached_local(async move { + let mut req = json::JsonValue::new_object(); + req["op"] = "GetState".into(); + let Some(resp) = capi.perform_request(req).await else { + error!("failed to get state"); + return; + }; + if resp.has_key("error") { + error!("failed to get state: {}", resp["error"]); + return; + } + capi.process_veilid_state(&resp["value"]).await; + }); + // Send and receive until we're done or a stop is requested while let Ok(Some(())) = unord.next().timeout_at(stop_token.clone()).await {} // // Drop the server and disconnector too (if we still have it) let mut inner = self.inner.lock(); let disconnect_requested = inner.disconnect_requested; - inner.server_settings = None; inner.request_sender = None; inner.disconnector = None; inner.disconnect_requested = false; diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 6cbc6331..636561d1 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -387,7 +387,7 @@ reply - reply to an AppCall not handled directly by the server self.inner().ui_sender.add_node_event(message); } - pub fn update_attachment(&self, attachment: json::JsonValue) { + pub fn update_attachment(&self, attachment: &json::JsonValue) { self.inner_mut().ui_sender.set_attachment_state( attachment["state"].as_str().unwrap_or_default().to_owned(), attachment["public_internet_ready"] @@ -399,7 +399,7 @@ reply - reply to an AppCall not handled directly by the server ); } - pub fn update_network_status(&self, network: json::JsonValue) { + pub fn update_network_status(&self, network: &json::JsonValue) { self.inner_mut().ui_sender.set_network_status( network["started"].as_bool().unwrap_or_default(), json_str_u64(&network["bps_down"]), @@ -410,10 +410,10 @@ reply - reply to an AppCall not handled directly by the server .collect::>(), ); } - pub fn update_config(&self, config: json::JsonValue) { + pub fn update_config(&self, config: &json::JsonValue) { self.inner_mut().ui_sender.set_config(&config["config"]) } - pub fn update_route(&self, route: json::JsonValue) { + pub fn update_route(&self, route: &json::JsonValue) { let mut out = String::new(); if route["dead_routes"].len() != 0 { out.push_str(&format!("Dead routes: {:?}", route["dead_routes"])); @@ -431,12 +431,12 @@ reply - reply to an AppCall not handled directly by the server self.inner().ui_sender.add_node_event(out); } } - pub fn update_value_change(&self, value_change: json::JsonValue) { + pub fn update_value_change(&self, value_change: &json::JsonValue) { let out = format!("Value change: {:?}", value_change.as_str().unwrap_or("???")); self.inner().ui_sender.add_node_event(out); } - pub fn update_log(&self, log: json::JsonValue) { + pub fn update_log(&self, log: &json::JsonValue) { self.inner().ui_sender.add_node_event(format!( "{}: {}{}", log["log_level"].as_str().unwrap_or("???"), @@ -449,7 +449,7 @@ reply - reply to an AppCall not handled directly by the server )); } - pub fn update_app_message(&self, msg: json::JsonValue) { + pub fn update_app_message(&self, msg: &json::JsonValue) { let message = json_str_vec_u8(&msg["message"]); // check is message body is ascii printable @@ -471,7 +471,7 @@ reply - reply to an AppCall not handled directly by the server .add_node_event(format!("AppMessage ({:?}): {}", msg["sender"], strmsg)); } - pub fn update_app_call(&self, call: json::JsonValue) { + pub fn update_app_call(&self, call: &json::JsonValue) { let message = json_str_vec_u8(&call["message"]); // check is message body is ascii printable diff --git a/veilid-core/src/veilid_api/types/veilid_log.rs b/veilid-core/src/veilid_api/types/veilid_log.rs index 66852e14..f377115c 100644 --- a/veilid-core/src/veilid_api/types/veilid_log.rs +++ b/veilid-core/src/veilid_api/types/veilid_log.rs @@ -64,19 +64,33 @@ impl VeilidLogLevel { } } +impl FromStr for VeilidLogLevel { + type Err = VeilidAPIError; + fn from_str(s: &str) -> Result { + Ok(match s { + "Error" => Self::Error, + "Warn" => Self::Warn, + "Info" => Self::Info, + "Debug" => Self::Debug, + "Trace" => Self::Trace, + _ => { + apibail_invalid_argument!("Can't convert str", "s", s); + } + }) + } +} impl fmt::Display for VeilidLogLevel { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { let text = match self { - Self::Error => "ERROR", - Self::Warn => "WARN", - Self::Info => "INFO", - Self::Debug => "DEBUG", - Self::Trace => "TRACE", + Self::Error => "Error", + Self::Warn => "Warn", + Self::Info => "Info", + Self::Debug => "Debug", + Self::Trace => "Trace", }; write!(f, "{}", text) } } - /// A VeilidCore log message with optional backtrace #[derive( Debug, diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index 8b3c3e7e..98badf13 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -545,6 +545,35 @@ impl Default for VeilidConfigLogLevel { Self::Off } } +impl FromStr for VeilidConfigLogLevel { + type Err = VeilidAPIError; + fn from_str(s: &str) -> Result { + Ok(match s { + "Off" => Self::Off, + "Error" => Self::Error, + "Warn" => Self::Warn, + "Info" => Self::Info, + "Debug" => Self::Debug, + "Trace" => Self::Trace, + _ => { + apibail_invalid_argument!("Can't convert str", "s", s); + } + }) + } +} +impl fmt::Display for VeilidConfigLogLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + let text = match self { + Self::Off => "Off", + Self::Error => "Error", + Self::Warn => "Warn", + Self::Info => "Info", + Self::Debug => "Debug", + Self::Trace => "Trace", + }; + write!(f, "{}", text) + } +} #[derive( Default, diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index e63b5e19..821d06ef 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -3,14 +3,14 @@ use crate::tools::*; use crate::veilid_logs::VeilidLogs; use cfg_if::*; use futures_util::{future::try_join_all, stream::FuturesUnordered, StreamExt}; - use parking_lot::Mutex; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use stop_token::future::FutureExt; +use stop_token::future::FutureExt as _; use stop_token::*; use tracing::*; +use veilid_core::json_api::JsonRequestProcessor; use veilid_core::tools::*; use veilid_core::*; use wg::AsyncWaitGroup; @@ -24,34 +24,18 @@ cfg_if! { use tokio::io::AsyncWriteExt; } } -// struct VeilidServerImpl { -// veilid_api: veilid_core::VeilidAPI, -// veilid_logs: VeilidLogs, -// settings: Settings, -// next_id: u64, -// } - -// impl VeilidServerImpl { -// #[instrument(level = "trace", skip_all)] -// pub fn new( -// veilid_api: veilid_core::VeilidAPI, -// veilid_logs: VeilidLogs, -// settings: Settings, -// ) -> Self { -// Self { -// next_id: 0, -// veilid_api, -// veilid_logs, -// settings, -// } -// } - -// } // --- Client API Server-Side --------------------------------- type ClientApiAllFuturesJoinHandle = MustJoinHandle>>; +struct RequestLine { + // Request to process + line: String, + // Where to send the response + responses_tx: flume::Sender, +} + struct ClientApiInner { veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs, @@ -177,7 +161,7 @@ impl ClientApi { if args.len() != 3 { apibail_generic!("wrong number of arguments"); } - let log_level: VeilidConfigLogLevel = deserialize_json(&args[2])?; + let log_level = VeilidConfigLogLevel::from_str(&args[2])?; self.change_log_level(args[1].clone(), log_level)?; Ok("".to_owned()) } else if args[0] == "GetServerSettings" { @@ -212,6 +196,107 @@ impl ClientApi { } } + async fn process_request_line( + self, + jrp: JsonRequestProcessor, + request_line: RequestLine, + ) -> VeilidAPIResult> { + let line = request_line.line; + let responses_tx = request_line.responses_tx; + + // Unmarshal NDJSON - newline => json + // (trim all whitespace around input lines just to make things more permissive for API users) + let request: json_api::Request = deserialize_json(&line)?; + + // See if this is a control message or a veilid-core message + let response = if let json_api::RequestOp::Control { args } = request.op { + // Process control messages + json_api::Response { + id: request.id, + op: json_api::ResponseOp::Control { + result: json_api::to_json_api_result(self.process_control(args).await), + }, + } + } else { + // Process with ndjson api + jrp.clone().process_request(request).await + }; + + // Marshal json + newline => NDJSON + let response_string = serialize_json(json_api::RecvMessage::Response(response)) + "\n"; + if let Err(e) = responses_tx.send_async(response_string).await { + warn!("response not sent: {}", e) + } + VeilidAPIResult::Ok(None) + } + + async fn next_request_line( + requests_rx: flume::Receiver>, + ) -> VeilidAPIResult> { + Ok(requests_rx.recv_async().await.ok().flatten()) + } + + async fn receive_requests( + self, + conn_tuple: (SocketAddr, SocketAddr), + mut reader: R, + requests_tx: flume::Sender>, + responses_tx: flume::Sender, + ) -> VeilidAPIResult> { + // responses_tx becomes owned by recv_requests_future + // Start sending updates + self.inner + .lock() + .update_channels + .insert(conn_tuple, responses_tx.clone()); + + let mut linebuf = String::new(); + while let Ok(size) = reader.read_line(&mut linebuf).await { + // Eof? + if size == 0 { + break; + } + + // Put the processing in the async queue + let line = linebuf.trim().to_owned(); + linebuf.clear(); + + // Ignore newlines + if line.len() == 0 { + continue; + } + + // Enqueue the line for processing in parallel + let request_line = RequestLine { + line, + responses_tx: responses_tx.clone(), + }; + if let Err(e) = requests_tx.send_async(Some(request_line)).await { + error!("failed to enqueue request: {}", e); + break; + } + } + + // Stop sending updates + // Will cause send_responses_future to stop because we drop the responses_tx + self.inner.lock().update_channels.remove(&conn_tuple); + + VeilidAPIResult::Ok(None) + } + + async fn send_responses( + self, + responses_rx: flume::Receiver, + mut writer: W, + ) -> VeilidAPIResult> { + while let Ok(resp) = responses_rx.recv_async().await { + if let Err(e) = writer.write_all(resp.as_bytes()).await { + error!("failed to write response: {}", e) + } + } + VeilidAPIResult::Ok(None) + } + pub async fn handle_connection(self, stream: TcpStream, awg: AsyncWaitGroup) { // Get address of peer let peer_addr = match stream.peer_addr() { @@ -246,10 +331,10 @@ impl ClientApi { if #[cfg(feature="rt-async-std")] { use futures_util::AsyncReadExt; let (reader, mut writer) = stream.split(); - let mut reader = BufReader::new(reader); + let reader = BufReader::new(reader); } else if #[cfg(feature="rt-tokio")] { - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); + let (reader, writer) = stream.into_split(); + let reader = BufReader::new(reader); } } @@ -259,105 +344,58 @@ impl ClientApi { // Futures to process unordered let mut unord = FuturesUnordered::new(); - let (more_futures_tx, more_futures_rx) = flume::unbounded(); - // Output to serialize + // Requests and responses are done serially to the socket + // but the requests are processed in parallel by the FuturesUnordered + let (requests_tx, requests_rx) = flume::unbounded(); let (responses_tx, responses_rx) = flume::unbounded(); - // Request receive processor - let this = self.clone(); - let recv_requests_future = async move { - // Start sending updates - this.inner - .lock() - .update_channels - .insert(conn_tuple, responses_tx.clone()); - - let mut line = String::new(); - while let Ok(size) = reader.read_line(&mut line).await { - // Eof? - if size == 0 { - break; - } - - // Put the processing in the async queue - let jrp = jrp.clone(); - let line = line.trim().to_owned(); - // Ignore newlines - if line.len() == 0 { - continue; - } - let responses_tx = responses_tx.clone(); - let this2 = this.clone(); - let process_request = async move { - // Unmarshal NDJSON - newline => json - // (trim all whitespace around input lines just to make things more permissive for API users) - let request: json_api::Request = deserialize_json(&line)?; - - // See if this is a control message or a veilid-core message - let response = if let json_api::RequestOp::Control { args } = request.op { - // Process control messages - json_api::Response { - id: request.id, - op: json_api::ResponseOp::Control { - result: json_api::to_json_api_result( - this2.process_control(args).await, - ), - }, - } - } else { - // Process with ndjson api - jrp.clone().process_request(request).await - }; - - // Marshal json + newline => NDJSON - let response_string = - serialize_json(json_api::RecvMessage::Response(response)) + "\n"; - if let Err(e) = responses_tx.send_async(response_string).await { - warn!("response not sent: {}", e) - } - VeilidAPIResult::Ok(()) - }; - if let Err(e) = more_futures_tx - .send_async(system_boxed(process_request)) - .await - { - warn!("request dropped: {}", e) - } - } - - // Stop sending updates - // Will cause send_responses_future to stop because we drop the responses_tx - this.inner.lock().update_channels.remove(&conn_tuple); - - VeilidAPIResult::Ok(()) - }; - unord.push(system_boxed(recv_requests_future)); + // Request receive processor future + // Receives from socket and enqueues RequestLines + // Completes when the connection is closed or there is a failure + unord.push(system_boxed(self.clone().receive_requests( + conn_tuple, + reader, + requests_tx, + responses_tx, + ))); // Response send processor - let send_responses_future = async move { - while let Ok(resp) = responses_rx.recv_async().await { - if let Err(e) = writer.write_all(resp.as_bytes()).await { - error!("failed to write response: {}", e) - } - } - VeilidAPIResult::Ok(()) - }; - unord.push(system_boxed(send_responses_future)); + // Sends finished response strings out the socket + // Completes when the responses channel is closed + unord.push(system_boxed( + self.clone().send_responses(responses_rx, writer), + )); + + // Add future to process first request + unord.push(system_boxed(Self::next_request_line(requests_rx.clone()))); // Send and receive until we're done or a stop is requested while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await { - match r { - Ok(()) => {} - Err(e) => { - warn!("JSON API Failure: {}", e); + // See if we got some work to do + let request_line = match r { + Ok(Some(request_line)) => { + // Add future to process next request + unord.push(system_boxed(Self::next_request_line(requests_rx.clone()))); + + // Socket receive future returned something to process + request_line } - } - // Add more futures if we had one that completed - // Allows processing requests in an async fashion - for fut in more_futures_rx.drain() { - unord.push(fut); - } + Ok(None) => { + // Non-request future finished + continue; + } + Err(e) => { + // Connection processing failure, abort + error!("Connection processing failure: {}", e); + break; + } + }; + + // Enqueue unordered future to process request line in parallel + unord.push(system_boxed( + self.clone().process_request_line(jrp.clone(), request_line), + )); } debug!(