diff --git a/Cargo.lock b/Cargo.lock index 9866cef7..556f52af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6329,9 +6329,6 @@ dependencies = [ "async-tungstenite 0.22.2", "backtrace", "bugsalot", - "capnp", - "capnp-rpc", - "capnpc", "cfg-if 1.0.0", "clap 3.2.25", "color-eyre", diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 977e55a1..57dd2ba1 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -3,7 +3,6 @@ name = "veilid-server" version = "0.1.0" authors = ["John Smith "] edition = "2021" -build = "build.rs" license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)" [[bin]] @@ -39,9 +38,7 @@ color-eyre = { version = "^0", default-features = false } backtrace = "^0" clap = "^3" directories = "^4" -capnp = "^0" parking_lot = "^0" -capnp-rpc = "^0" config = { version = "^0", features = ["yaml"] } cfg-if = "^1" serde = "^1" @@ -73,7 +70,4 @@ nix = "^0" tracing-journald = "^0" [dev-dependencies] -serial_test = "^0" - -[build-dependencies] -capnpc = "^0" +serial_test = "^0" \ No newline at end of file diff --git a/veilid-server/build.rs b/veilid-server/build.rs deleted file mode 100644 index a9e4db80..00000000 --- a/veilid-server/build.rs +++ /dev/null @@ -1,6 +0,0 @@ -fn main() { - ::capnpc::CompilerCommand::new() - .file("proto/veilid-client.capnp") - .run() - .expect("compiling schema"); -} diff --git a/veilid-server/proto/veilid-client.capnp b/veilid-server/proto/veilid-client.capnp deleted file mode 100644 index 469a3b09..00000000 --- a/veilid-server/proto/veilid-client.capnp +++ /dev/null @@ -1,25 +0,0 @@ -@0xd29582d26b2fb073; - -struct ApiResult @0x8111724bdb812929 { - union { - ok @0 :Text; - err @1 :Text; - } -} - -interface Registration @0xdd45f30a7c22e391 {} - -interface VeilidServer @0xcb2c699f14537f94 { - register @0 (veilidClient :VeilidClient) -> (registration :Registration, state :Text, settings :Text); - debug @1 (command :Text) -> (result :ApiResult); - attach @2 () -> (result :ApiResult); - detach @3 () -> (result :ApiResult); - shutdown @4 (); - getState @5 () -> (result :ApiResult); - changeLogLevel @6 (layer :Text, logLevel :Text) -> (result :ApiResult); - appCallReply @7 (id :UInt64, message :Data) -> (result :ApiResult); -} - -interface VeilidClient @0xbfcea60fb2ba4736 { - update @0 (veilidUpdate :Text); -} \ No newline at end of file diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 269dce3a..b3bc15cc 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -2,8 +2,6 @@ use crate::settings::*; use crate::tools::*; use crate::veilid_client_capnp::*; use crate::veilid_logs::VeilidLogs; -use capnp::capability::Promise; -use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; use cfg_if::*; use futures_util::{future::try_join_all, FutureExt as FuturesFutureExt, StreamExt}; use serde::*; @@ -11,267 +9,72 @@ use std::cell::RefCell; use std::collections::HashMap; use std::fmt; use std::net::SocketAddr; -use std::rc::Rc; use stop_token::future::FutureExt; use stop_token::*; use tracing::*; use veilid_core::*; -// Encoding for ApiResult -fn encode_api_result( - result: &VeilidAPIResult, - builder: &mut api_result::Builder, -) { - match result { - Ok(v) => { - builder.set_ok(&serialize_json(v)); - } - Err(e) => { - builder.set_err(&serialize_json(e)); - } - } -} +// struct VeilidServerImpl { +// veilid_api: veilid_core::VeilidAPI, +// veilid_logs: VeilidLogs, +// settings: Settings, +// next_id: u64, +// } -// --- interface Registration --------------------------------- +// impl VeilidServerImpl { +// #[instrument(level = "trace", skip_all)] +// pub fn new( +// veilid_api: veilid_core::VeilidAPI, +// veilid_logs: VeilidLogs, +// settings: Settings, +// ) -> Self { +// Self { +// next_id: 0, +// veilid_api, +// veilid_logs, +// settings, +// } +// } -struct RegistrationHandle { - client: veilid_client::Client, - requests_in_flight: i32, -} +// #[instrument(level = "trace", skip_all)] +// fn shutdown( +// &mut self, +// _params: veilid_server::ShutdownParams, +// mut _results: veilid_server::ShutdownResults, +// ) -> Promise<(), ::capnp::Error> { +// trace!("VeilidServerImpl::shutdown"); -struct RegistrationMap { - registrations: HashMap, -} +// cfg_if::cfg_if! { +// if #[cfg(windows)] { +// assert!(false, "write me!"); +// } +// else { +// crate::server::shutdown(); +// } +// } -impl RegistrationMap { - fn new() -> Self { - Self { - registrations: HashMap::new(), - } - } -} +// Promise::ok(()) +// } -struct RegistrationImpl { - id: u64, - registration_map: Rc>, -} +// #[instrument(level = "trace", skip_all)] +// fn change_log_level( +// &mut self, +// params: veilid_server::ChangeLogLevelParams, +// mut results: veilid_server::ChangeLogLevelResults, +// ) -> Promise<(), ::capnp::Error> { +// trace!("VeilidServerImpl::change_log_level"); -impl RegistrationImpl { - fn new(id: u64, registrations: Rc>) -> Self { - Self { - id, - registration_map: registrations, - } - } -} +// let layer = pry!(pry!(params.get()).get_layer()).to_owned(); +// let log_level_json = pry!(pry!(params.get()).get_log_level()).to_owned(); +// let log_level: veilid_core::VeilidConfigLogLevel = +// pry!(veilid_core::deserialize_json(&log_level_json) +// .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))); -impl Drop for RegistrationImpl { - fn drop(&mut self) { - debug!("Registration dropped"); - self.registration_map - .borrow_mut() - .registrations - .remove(&self.id); - } -} - -impl registration::Server for RegistrationImpl {} - -// --- interface VeilidServer --------------------------------- - -struct VeilidServerImpl { - veilid_api: veilid_core::VeilidAPI, - veilid_logs: VeilidLogs, - settings: Settings, - next_id: u64, - pub registration_map: Rc>, -} - -impl VeilidServerImpl { - #[instrument(level = "trace", skip_all)] - pub fn new( - veilid_api: veilid_core::VeilidAPI, - veilid_logs: VeilidLogs, - settings: Settings, - ) -> Self { - Self { - next_id: 0, - registration_map: Rc::new(RefCell::new(RegistrationMap::new())), - veilid_api, - veilid_logs, - settings, - } - } -} - -impl veilid_server::Server for VeilidServerImpl { - #[instrument(level = "trace", skip_all)] - fn register( - &mut self, - params: veilid_server::RegisterParams, - mut results: veilid_server::RegisterResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::register"); - - self.registration_map.borrow_mut().registrations.insert( - self.next_id, - RegistrationHandle { - client: pry!(pry!(params.get()).get_veilid_client()), - requests_in_flight: 0, - }, - ); - - let veilid_api = self.veilid_api.clone(); - let settings = self.settings.clone(); - let registration = capnp_rpc::new_client(RegistrationImpl::new( - self.next_id, - self.registration_map.clone(), - )); - self.next_id += 1; - - Promise::from_future(async move { - let state = veilid_api - .get_state() - .await - .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?; - let state = serialize_json(state); - - let mut res = results.get(); - res.set_registration(registration); - res.set_state(&state); - - let settings = &*settings.read(); - let settings_json_string = serialize_json(settings); - let mut settings_json = json::parse(&settings_json_string) - .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?; - settings_json["core"]["network"].remove("node_id_secret"); - let safe_settings_json = settings_json.to_string(); - res.set_settings(&safe_settings_json); - - Ok(()) - }) - } - - #[instrument(level = "trace", skip_all)] - fn debug( - &mut self, - params: veilid_server::DebugParams, - mut results: veilid_server::DebugResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::debug"); - let veilid_api = self.veilid_api.clone(); - let command = pry!(pry!(params.get()).get_command()).to_owned(); - - Promise::from_future(async move { - let result = veilid_api.debug(command).await; - encode_api_result(&result, &mut results.get().init_result()); - Ok(()) - }) - } - - #[instrument(level = "trace", skip_all)] - fn attach( - &mut self, - _params: veilid_server::AttachParams, - mut results: veilid_server::AttachResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::attach"); - let veilid_api = self.veilid_api.clone(); - Promise::from_future(async move { - let result = veilid_api.attach().await; - encode_api_result(&result, &mut results.get().init_result()); - Ok(()) - }) - } - - #[instrument(level = "trace", skip_all)] - fn detach( - &mut self, - _params: veilid_server::DetachParams, - mut results: veilid_server::DetachResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::detach"); - let veilid_api = self.veilid_api.clone(); - Promise::from_future(async move { - let result = veilid_api.detach().await; - encode_api_result(&result, &mut results.get().init_result()); - Ok(()) - }) - } - - #[instrument(level = "trace", skip_all)] - fn shutdown( - &mut self, - _params: veilid_server::ShutdownParams, - mut _results: veilid_server::ShutdownResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::shutdown"); - - cfg_if::cfg_if! { - if #[cfg(windows)] { - assert!(false, "write me!"); - } - else { - crate::server::shutdown(); - } - } - - Promise::ok(()) - } - - #[instrument(level = "trace", skip_all)] - fn get_state( - &mut self, - _params: veilid_server::GetStateParams, - mut results: veilid_server::GetStateResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::get_state"); - let veilid_api = self.veilid_api.clone(); - Promise::from_future(async move { - let result = veilid_api.get_state().await; - encode_api_result(&result, &mut results.get().init_result()); - Ok(()) - }) - } - - #[instrument(level = "trace", skip_all)] - fn change_log_level( - &mut self, - params: veilid_server::ChangeLogLevelParams, - mut results: veilid_server::ChangeLogLevelResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::change_log_level"); - - let layer = pry!(pry!(params.get()).get_layer()).to_owned(); - let log_level_json = pry!(pry!(params.get()).get_log_level()).to_owned(); - let log_level: veilid_core::VeilidConfigLogLevel = - pry!(veilid_core::deserialize_json(&log_level_json) - .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))); - - let result = self.veilid_logs.change_log_level(layer, log_level); - encode_api_result(&result, &mut results.get().init_result()); - Promise::ok(()) - } - - #[instrument(level = "trace", skip_all)] - fn app_call_reply( - &mut self, - params: veilid_server::AppCallReplyParams, - mut results: veilid_server::AppCallReplyResults, - ) -> Promise<(), ::capnp::Error> { - trace!("VeilidServerImpl::app_call_reply"); - - let id = OperationId::new(pry!(params.get()).get_id()); - let message = pry!(pry!(params.get()).get_message()).to_owned(); - - let veilid_api = self.veilid_api.clone(); - Promise::from_future(async move { - let result = veilid_api.app_call_reply(id, message).await; - encode_api_result(&result, &mut results.get().init_result()); - Ok(()) - }) - } -} +// let result = self.veilid_logs.change_log_level(layer, log_level); +// encode_api_result(&result, &mut results.get().init_result()); +// Promise::ok(()) +// } +// } // --- Client API Server-Side --------------------------------- @@ -282,13 +85,12 @@ struct ClientApiInner { veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs, settings: Settings, - registration_map: Rc>, stop: Option, join_handle: Option, } pub struct ClientApi { - inner: RefCell, + inner: Arc>, } impl ClientApi { @@ -303,7 +105,6 @@ impl ClientApi { veilid_api, veilid_logs, settings, - registration_map: Rc::new(RefCell::new(RegistrationMap::new())), stop: Some(StopSource::new()), join_handle: None, }), @@ -329,11 +130,10 @@ impl ClientApi { trace!("ClientApi::stop: stopped"); } - #[instrument(level = "trace", skip(self, client), err)] + #[instrument(level = "trace", skip(self), err)] async fn handle_incoming( - self: Rc, + self, bind_addr: SocketAddr, - client: veilid_server::Client, ) -> Result<(), Box> { let listener = TcpListener::bind(bind_addr).await?; debug!("Client API listening on: {:?}", bind_addr); @@ -347,7 +147,7 @@ impl ClientApi { } } - let stop_token = self.inner.borrow().stop.as_ref().unwrap().token(); + let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); let incoming_loop = async move { while let Ok(Some(stream_result)) = incoming_stream.next().timeout_at(stop_token.clone()).await @@ -365,15 +165,8 @@ impl ClientApi { let writer = writer.compat_write(); } } - let network = twoparty::VatNetwork::new( - reader, - writer, - rpc_twoparty_capnp::Side::Server, - Default::default(), - ); - - let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client)); + xxx spawn json_api handler spawn_local(rpc_system.map(drop)); } Ok::<(), Box>(()) @@ -382,44 +175,6 @@ impl ClientApi { incoming_loop.await } - #[instrument(level = "trace", skip_all)] - fn send_request_to_all_clients(self: Rc, request: F) - where - F: Fn(u64, &mut RegistrationHandle) -> Option<::capnp::capability::RemotePromise>, - T: capnp::traits::Pipelined + capnp::traits::Owned + 'static + Unpin, - { - // Send status update to each registered client - let registration_map = self.inner.borrow().registration_map.clone(); - let registration_map1 = registration_map.clone(); - let regs = &mut registration_map.borrow_mut().registrations; - for (&id, mut registration) in regs.iter_mut() { - if registration.requests_in_flight >= 256 { - println!( - "too many requests in flight: {}", - registration.requests_in_flight - ); - } - registration.requests_in_flight += 1; - - if let Some(request_promise) = request(id, registration) { - let registration_map2 = registration_map1.clone(); - spawn_local(request_promise.promise.map(move |r| match r { - Ok(_) => { - if let Some(ref mut s) = - registration_map2.borrow_mut().registrations.get_mut(&id) - { - s.requests_in_flight -= 1; - } - } - Err(e) => { - println!("Got error: {:?}. Dropping registation.", e); - registration_map2.borrow_mut().registrations.remove(&id); - } - })); - } - } - } - #[instrument(level = "trace", skip(self))] pub fn handle_update(self: Rc, veilid_update: veilid_core::VeilidUpdate) { // serialize update @@ -444,21 +199,10 @@ impl ClientApi { } #[instrument(level = "trace", skip(self))] - pub fn run(self: Rc, bind_addrs: Vec) { - // Create client api VeilidServer - let veilid_server_impl = VeilidServerImpl::new( - self.inner.borrow().veilid_api.clone(), - self.inner.borrow().veilid_logs.clone(), - self.inner.borrow().settings.clone(), - ); - self.inner.borrow_mut().registration_map = veilid_server_impl.registration_map.clone(); - - // Make a client object for the server to send to each rpc client - let client: veilid_server::Client = capnp_rpc::new_client(veilid_server_impl); - + pub fn run(&self, bind_addrs: Vec) { let bind_futures = bind_addrs .iter() - .map(|addr| self.clone().handle_incoming(*addr, client.clone())); + .map(|addr| self.clone().handle_incoming(*addr)); let bind_futures_join = try_join_all(bind_futures); self.inner.borrow_mut().join_handle = Some(spawn_local(bind_futures_join)); } diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 31a60cc3..b228a68e 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -24,11 +24,6 @@ use tools::*; use tracing::*; use veilid_logs::*; -#[allow(clippy::all)] -pub mod veilid_client_capnp { - include!(concat!(env!("OUT_DIR"), "/proto/veilid_client_capnp.rs")); -} - #[instrument(err)] fn main() -> EyreResult<()> { #[cfg(windows)] @@ -76,7 +71,7 @@ fn main() -> EyreResult<()> { if matches.occurrences_of("emit-schema") != 0 { if let Some(esstr) = matches.value_of("emit-schema") { let mut schemas = HashMap::::new(); - veilid_core::emit_schemas(&mut schemas); + veilid_core::json_api::emit_schemas(&mut schemas); if let Some(schema) = schemas.get(esstr) { println!("{}", schema);