diff --git a/scripts/debug_main_node.sh b/scripts/debug_main_node.sh index f768c7bf..22a99e2a 100755 --- a/scripts/debug_main_node.sh +++ b/scripts/debug_main_node.sh @@ -1,2 +1,2 @@ #!/bin/bash -exec ./run_local_test.py 20 -w 0 --config-file ./no-timeout.cfg $1 +exec ./run_local_test.py 20 -w 0 --config-file ./local-test.yml $1 diff --git a/scripts/debug_subnode_1.sh b/scripts/debug_subnode_1.sh index cee6a50e..6274ed59 100755 --- a/scripts/debug_subnode_1.sh +++ b/scripts/debug_subnode_1.sh @@ -1,2 +1,2 @@ #!/bin/bash -exec ./run_local_test.py 2 -w 1 --log_trace --config-file ./no-timeout.cfg +exec ./run_local_test.py 2 -w 1 --log_trace --config-file ./local-test.yml diff --git a/scripts/local-test.yml b/scripts/local-test.yml new file mode 100644 index 00000000..bcbf8d5f --- /dev/null +++ b/scripts/local-test.yml @@ -0,0 +1,6 @@ +--- +core: + network: + dht: + min_peer_count: 1 + address_filter: false diff --git a/scripts/no-timeout.yml b/scripts/no-timeout.yml deleted file mode 100644 index 31294a68..00000000 --- a/scripts/no-timeout.yml +++ /dev/null @@ -1,12 +0,0 @@ ---- -core: - network: - rpc: - max_timestamp_behind: - max_timestamp_ahead: - timeout: 86400000000 - dht: - resolve_node_timeout: - get_value_timeout: - set_value_timeout: - address_filter: false diff --git a/scripts/run_2.sh b/scripts/run_2.sh new file mode 100755 index 00000000..7000b345 --- /dev/null +++ b/scripts/run_2.sh @@ -0,0 +1,4 @@ +#!/bin/bash +exec ./run_local_test.py 2 --config-file ./local-test.yml $1 + + diff --git a/scripts/run_20.sh b/scripts/run_20.sh new file mode 100755 index 00000000..195d9cfb --- /dev/null +++ b/scripts/run_20.sh @@ -0,0 +1,4 @@ +#!/bin/bash +exec ./run_local_test.py 20 --config-file ./local-test.yml $1 + + diff --git a/scripts/run_20_no_timeout.sh b/scripts/run_20_no_timeout.sh deleted file mode 100755 index 02d9199b..00000000 --- a/scripts/run_20_no_timeout.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -exec ./run_local_test.py 20 --config-file ./no-timeout.cfg $1 - - diff --git a/scripts/run_2_no_timeout.sh b/scripts/run_2_no_timeout.sh deleted file mode 100755 index b945d44f..00000000 --- a/scripts/run_2_no_timeout.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -exec ./run_local_test.py 2 --config-file ./no-timeout.cfg $1 - - diff --git a/scripts/run_4.sh b/scripts/run_4.sh new file mode 100755 index 00000000..dc008976 --- /dev/null +++ b/scripts/run_4.sh @@ -0,0 +1,4 @@ +#!/bin/bash +exec ./run_local_test.py 4 --config-file ./local-test.yml $1 + + diff --git a/scripts/run_4_no_timeout.sh b/scripts/run_4_no_timeout.sh deleted file mode 100755 index 829df0b9..00000000 --- a/scripts/run_4_no_timeout.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -exec ./run_local_test.py 4 --config-file ./no-timeout.cfg $1 - - diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index d2d16e82..363150a4 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -142,7 +142,7 @@ impl ClientApiConnection { } } - pub async fn server_attach(&mut self) -> Result { + pub async fn server_attach(&mut self) -> Result<()> { trace!("ClientApiConnection::server_attach"); let server = { let inner = self.inner.borrow(); @@ -154,10 +154,10 @@ impl ClientApiConnection { }; let request = server.borrow().attach_request(); let response = request.send().promise.await?; - Ok(response.get()?.get_result()) + response.get().map(drop).map_err(|e| anyhow!(e)) } - pub async fn server_detach(&mut self) -> Result { + pub async fn server_detach(&mut self) -> Result<()> { trace!("ClientApiConnection::server_detach"); let server = { let inner = self.inner.borrow(); @@ -169,10 +169,10 @@ impl ClientApiConnection { }; let request = server.borrow().detach_request(); let response = request.send().promise.await?; - Ok(response.get()?.get_result()) + response.get().map(drop).map_err(|e| anyhow!(e)) } - pub async fn server_shutdown(&mut self) -> Result { + pub async fn server_shutdown(&mut self) -> Result<()> { trace!("ClientApiConnection::server_shutdown"); let server = { let inner = self.inner.borrow(); @@ -184,7 +184,27 @@ impl ClientApiConnection { }; let request = server.borrow().shutdown_request(); let response = request.send().promise.await?; - Ok(response.get()?.get_result()) + response.get().map(drop).map_err(|e| anyhow!(e)) + } + + pub async fn server_debug(&mut self, what: String) -> Result { + trace!("ClientApiConnection::server_debug"); + let server = { + let inner = self.inner.borrow(); + inner + .server + .as_ref() + .ok_or(anyhow!("Not connected, ignoring attach request"))? + .clone() + }; + let mut request = server.borrow().debug_request(); + request.get().set_what(&what); + let response = request.send().promise.await?; + response + .get()? + .get_output() + .map(|o| o.to_owned()) + .map_err(|e| anyhow!(e)) } // Start Client API connection diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 6897971e..8ade71f2 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -97,19 +97,19 @@ disconnect - disconnect the client from the Veilid node shutdown - shut the server down attach - attach the server to the Veilid network detach - detach the server from the Veilid network +debug - send a debugging command to the Veilid server "#, ); let ui = self.ui(); - callback(ui); + ui.send_callback(callback); Ok(()) } pub fn cmd_exit(&self, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_exit"); let ui = self.ui(); - callback(ui); - // - self.ui().quit(); + ui.send_callback(callback); + ui.quit(); Ok(()) } @@ -121,7 +121,7 @@ detach - detach the server from the Veilid network if let Err(e) = capi.server_shutdown().await { error!("Server command 'shutdown' failed to execute: {}", e); } - callback(ui); + ui.send_callback(callback); }); Ok(()) } @@ -134,7 +134,7 @@ detach - detach the server from the Veilid network if let Err(e) = capi.server_attach().await { error!("Server command 'attach' failed to execute: {}", e); } - callback(ui); + ui.send_callback(callback); }); Ok(()) } @@ -147,7 +147,7 @@ detach - detach the server from the Veilid network if let Err(e) = capi.server_detach().await { error!("Server command 'detach' failed to execute: {}", e); } - callback(ui); + ui.send_callback(callback); }); Ok(()) } @@ -158,7 +158,23 @@ detach - detach the server from the Veilid network let ui = self.ui(); async_std::task::spawn_local(async move { capi.disconnect().await; - callback(ui); + ui.send_callback(callback); + }); + Ok(()) + } + + pub fn cmd_debug(&self, rest: Option, callback: UICallback) -> Result<(), String> { + trace!("CommandProcessor::cmd_debug"); + let mut capi = self.capi(); + let ui = self.ui(); + async_std::task::spawn_local(async move { + match capi.server_debug(rest.unwrap_or_default()).await { + Ok(output) => ui.display_string_dialog("Debug Output", output, callback), + Err(e) => { + error!("Server command 'debug' failed to execute: {}", e); + ui.send_callback(callback); + } + } }); Ok(()) } @@ -166,7 +182,6 @@ detach - detach the server from the Veilid network pub fn run_command(&self, command_line: &str, callback: UICallback) -> Result<(), String> { // let (cmd, rest) = Self::word_split(command_line); - match cmd.as_str() { "help" => self.cmd_help(rest, callback), "exit" => self.cmd_exit(callback), @@ -175,8 +190,10 @@ detach - detach the server from the Veilid network "shutdown" => self.cmd_shutdown(callback), "attach" => self.cmd_attach(callback), "detach" => self.cmd_detach(callback), + "debug" => self.cmd_debug(rest, callback), _ => { - callback(self.ui()); + let ui = self.ui(); + ui.send_callback(callback); Err(format!("Invalid command: {}", cmd)) } } diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index d94d3d40..5e81c49c 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -44,7 +44,7 @@ impl Dirty { } } -pub type UICallback = Box; +pub type UICallback = Box; struct UIState { attachment_state: Dirty, @@ -84,126 +84,8 @@ pub struct UI { } impl UI { - pub fn new(node_log_scrollback: usize, settings: &Settings) -> Self { - cursive_flexi_logger_view::resize(node_log_scrollback); - - // Instantiate the cursive runnable - /* - // reduces flicker, but it costs cpu - let mut runnable = CursiveRunnable::new( - || -> Result, UIError> { - let crossterm_backend = cursive::backends::crossterm::Backend::init().unwrap(); - let buffered_backend = - cursive_buffered_backend::BufferedBackend::new(crossterm_backend); - - Ok(Box::new(buffered_backend)) - }, - ); - */ - let runnable = cursive::default(); - // Make the callback mechanism easily reachable - let cb_sink = runnable.cb_sink().clone(); - - // Create the UI object - let this = Self { - siv: Rc::new(RefCell::new(runnable)), - inner: Rc::new(RefCell::new(UIInner { - ui_state: UIState::new(), - log_colors: Default::default(), - cmdproc: None, - cmd_history: { - let mut vd = VecDeque::new(); - vd.push_back("".to_string()); - vd - }, - cmd_history_position: 0, - cmd_history_max_size: settings.interface.command_line.history_size, - connection_dialog_state: None, - cb_sink, - })), - }; - - let mut siv = this.siv.borrow_mut(); - let mut inner = this.inner.borrow_mut(); - - // Make the inner object accessible in callbacks easily - siv.set_user_data(this.inner.clone()); - - // Create layouts - let mut mainlayout = LinearLayout::vertical().with_name("main-layout"); - mainlayout.get_mut().add_child( - Panel::new( - FlexiLoggerView::new_scrollable() - .with_name("node-events") - .full_screen(), - ) - .title_position(HAlign::Left) - .title("Node Events"), - ); - mainlayout.get_mut().add_child( - Panel::new(ScrollView::new( - TextView::new("Peer Table") - .with_name("peers") - .fixed_height(8) - .scrollable(), - )) - .title_position(HAlign::Left) - .title("Peers"), - ); - let mut command = StyledString::new(); - command.append_styled("Command> ", ColorStyle::title_primary()); - // - mainlayout.get_mut().add_child( - LinearLayout::horizontal() - .child(TextView::new(command)) - .child( - EditView::new() - .on_submit(UI::on_command_line_entered) - .on_edit(UI::on_command_line_edit) - .on_up_down(UI::on_command_line_history) - .style(ColorStyle::new( - PaletteColor::Background, - PaletteColor::Secondary, - )) - .with_name("command-line") - .full_screen() - .fixed_height(1), - ) - .child( - Button::new("Attach", |s| { - UI::on_button_attach_pressed(s); - }) - .with_name("button-attach"), - ), - ); - let mut version = StyledString::new(); - version.append_styled( - concat!(" | veilid-cli v", env!("CARGO_PKG_VERSION")), - ColorStyle::highlight_inactive(), - ); - - mainlayout.get_mut().add_child( - LinearLayout::horizontal() - .color(Some(ColorStyle::highlight_inactive())) - .child( - TextView::new("") - .with_name("status-bar") - .full_screen() - .fixed_height(1), - ) - .child(TextView::new(version)), - ); - - siv.add_fullscreen_layer(mainlayout); - - UI::setup_colors(&mut siv, &mut inner, settings); - UI::setup_quit_handler(&mut siv); - - drop(inner); - drop(siv); - - this - } + ///////////////////////////////////////////////////////////////////////////////////// + // Private functions fn command_processor(s: &mut Cursive) -> CommandProcessor { let inner = Self::inner(s); @@ -317,53 +199,6 @@ impl UI { }); } - pub fn cursive_flexi_logger(&mut self) -> Box { - let mut flv = - cursive_flexi_logger_view::cursive_flexi_logger(self.siv.borrow().cb_sink().clone()); - flv.set_colors(self.inner.borrow().log_colors.clone()); - flv - } - pub fn set_command_processor(&mut self, cmdproc: CommandProcessor) { - let mut inner = self.inner.borrow_mut(); - inner.cmdproc = Some(cmdproc); - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn set_attachment_state(&mut self, state: AttachmentState) { - let mut inner = self.inner.borrow_mut(); - inner.ui_state.attachment_state.set(state); - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn set_connection_state(&mut self, state: ConnectionState) { - let mut inner = self.inner.borrow_mut(); - inner.ui_state.connection_state.set(state); - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn add_node_event(&mut self, event: &str) { - let inner = self.inner.borrow_mut(); - let color = *inner.log_colors.get(&Level::Info).unwrap(); - for line in event.lines() { - cursive_flexi_logger_view::push_to_log(StyledString::styled(line, color)); - } - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn quit(&mut self) { - let inner = self.inner.borrow_mut(); - let _ = inner.cb_sink.send(Box::new(|s| { - s.quit(); - })); - } - // Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this - pub async fn run_async(&mut self) { - let mut siv = self.siv.borrow_mut(); - siv.run_async().await; - } - // pub fn run(&mut self) { - // let mut siv = self.siv.borrow_mut(); - // siv.run(); - // } - - ///////////////////////////////////////////////////////////////////////////////////// - // Private functions // fn main_layout(s: &mut Cursive) -> ViewRef { // s.find_name("main-layout").unwrap() // } @@ -426,11 +261,30 @@ impl UI { inner.cmd_history[hlen - 1] = text.to_owned(); } - pub fn enable_command_ui(s: &mut Cursive, enabled: bool) { + fn enable_command_ui(s: &mut Cursive, enabled: bool) { Self::command_line(s).set_enabled(enabled); Self::button_attach(s).set_enabled(enabled); } + fn display_string_dialog_cb( + s: &mut Cursive, + title: String, + contents: String, + close_cb: UICallback, + ) { + // Creates a dialog around some text with a single button + s.add_layer( + Dialog::around(TextView::new(contents)) + .title(title) + .button("Close", move |s| { + s.pop_layer(); + close_cb(s); + }) + //.wrap_with(CircularFocus::new) + //.wrap_tab(), + ); + } + fn run_command(s: &mut Cursive, text: &str) -> Result<(), String> { // disable ui Self::enable_command_ui(s, false); @@ -438,15 +292,8 @@ impl UI { let cmdproc = Self::command_processor(s); cmdproc.run_command( text, - Box::new(|ui: UI| { - let _ = ui - .inner - .borrow() - .cb_sink - .send(Box::new(|s| { - Self::enable_command_ui(s, true); - })) - .unwrap(); + Box::new(|s| { + Self::enable_command_ui(s, true); }), ) } @@ -772,4 +619,193 @@ impl UI { Self::refresh_connection_dialog(s); } } + + //////////////////////////////////////////////////////////////////////////// + // Public functions + + pub fn new(node_log_scrollback: usize, settings: &Settings) -> Self { + cursive_flexi_logger_view::resize(node_log_scrollback); + + // Instantiate the cursive runnable + /* + // reduces flicker, but it costs cpu + let mut runnable = CursiveRunnable::new( + || -> Result, UIError> { + let crossterm_backend = cursive::backends::crossterm::Backend::init().unwrap(); + let buffered_backend = + cursive_buffered_backend::BufferedBackend::new(crossterm_backend); + + Ok(Box::new(buffered_backend)) + }, + ); + */ + let runnable = cursive::default(); + // Make the callback mechanism easily reachable + let cb_sink = runnable.cb_sink().clone(); + + // Create the UI object + let this = Self { + siv: Rc::new(RefCell::new(runnable)), + inner: Rc::new(RefCell::new(UIInner { + ui_state: UIState::new(), + log_colors: Default::default(), + cmdproc: None, + cmd_history: { + let mut vd = VecDeque::new(); + vd.push_back("".to_string()); + vd + }, + cmd_history_position: 0, + cmd_history_max_size: settings.interface.command_line.history_size, + connection_dialog_state: None, + cb_sink, + })), + }; + + let mut siv = this.siv.borrow_mut(); + let mut inner = this.inner.borrow_mut(); + + // Make the inner object accessible in callbacks easily + siv.set_user_data(this.inner.clone()); + + // Create layouts + let mut mainlayout = LinearLayout::vertical().with_name("main-layout"); + mainlayout.get_mut().add_child( + Panel::new( + FlexiLoggerView::new_scrollable() + .with_name("node-events") + .full_screen(), + ) + .title_position(HAlign::Left) + .title("Node Events"), + ); + mainlayout.get_mut().add_child( + Panel::new(ScrollView::new( + TextView::new("Peer Table") + .with_name("peers") + .fixed_height(8) + .scrollable(), + )) + .title_position(HAlign::Left) + .title("Peers"), + ); + let mut command = StyledString::new(); + command.append_styled("Command> ", ColorStyle::title_primary()); + // + mainlayout.get_mut().add_child( + LinearLayout::horizontal() + .child(TextView::new(command)) + .child( + EditView::new() + .on_submit(UI::on_command_line_entered) + .on_edit(UI::on_command_line_edit) + .on_up_down(UI::on_command_line_history) + .style(ColorStyle::new( + PaletteColor::Background, + PaletteColor::Secondary, + )) + .with_name("command-line") + .full_screen() + .fixed_height(1), + ) + .child( + Button::new("Attach", |s| { + UI::on_button_attach_pressed(s); + }) + .with_name("button-attach"), + ), + ); + let mut version = StyledString::new(); + version.append_styled( + concat!(" | veilid-cli v", env!("CARGO_PKG_VERSION")), + ColorStyle::highlight_inactive(), + ); + + mainlayout.get_mut().add_child( + LinearLayout::horizontal() + .color(Some(ColorStyle::highlight_inactive())) + .child( + TextView::new("") + .with_name("status-bar") + .full_screen() + .fixed_height(1), + ) + .child(TextView::new(version)), + ); + + siv.add_fullscreen_layer(mainlayout); + + UI::setup_colors(&mut siv, &mut inner, settings); + UI::setup_quit_handler(&mut siv); + + drop(inner); + drop(siv); + + this + } + pub fn cursive_flexi_logger(&self) -> Box { + let mut flv = + cursive_flexi_logger_view::cursive_flexi_logger(self.siv.borrow().cb_sink().clone()); + flv.set_colors(self.inner.borrow().log_colors.clone()); + flv + } + pub fn set_command_processor(&mut self, cmdproc: CommandProcessor) { + let mut inner = self.inner.borrow_mut(); + inner.cmdproc = Some(cmdproc); + let _ = inner.cb_sink.send(Box::new(UI::update_cb)); + } + pub fn set_attachment_state(&mut self, state: AttachmentState) { + let mut inner = self.inner.borrow_mut(); + inner.ui_state.attachment_state.set(state); + let _ = inner.cb_sink.send(Box::new(UI::update_cb)); + } + pub fn set_connection_state(&mut self, state: ConnectionState) { + let mut inner = self.inner.borrow_mut(); + inner.ui_state.connection_state.set(state); + let _ = inner.cb_sink.send(Box::new(UI::update_cb)); + } + pub fn add_node_event(&self, event: &str) { + let inner = self.inner.borrow(); + let color = *inner.log_colors.get(&Level::Info).unwrap(); + for line in event.lines() { + cursive_flexi_logger_view::push_to_log(StyledString::styled(line, color)); + } + let _ = inner.cb_sink.send(Box::new(UI::update_cb)); + } + + pub fn display_string_dialog( + &self, + title: T, + text: S, + close_cb: UICallback, + ) { + let title = title.to_string(); + let text = text.to_string(); + let inner = self.inner.borrow(); + let _ = inner.cb_sink.send(Box::new(move |s| { + UI::display_string_dialog_cb(s, title, text, close_cb) + })); + } + + pub fn quit(&self) { + let inner = self.inner.borrow(); + let _ = inner.cb_sink.send(Box::new(|s| { + s.quit(); + })); + } + + pub fn send_callback(&self, callback: UICallback) { + let inner = self.inner.borrow(); + let _ = inner.cb_sink.send(Box::new(move |s| callback(s))); + } + + // Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this + pub async fn run_async(&mut self) { + let mut siv = self.siv.borrow_mut(); + siv.run_async().await; + } + // pub fn run(&mut self) { + // let mut siv = self.siv.borrow_mut(); + // siv.run(); + // } } diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 685a0973..023c9d8c 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -8,7 +8,7 @@ pub struct Bucket { } pub(super) type EntriesIterMut<'a> = alloc::collections::btree_map::IterMut<'a, DHTKey, BucketEntry>; -//pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>; +pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>; fn state_ordering(state: BucketEntryState) -> usize { match state { @@ -61,9 +61,9 @@ impl Bucket { self.entries.get_mut(key) } - // pub(super) fn entries(&self) -> EntriesIter { - // self.entries.iter() - // } + pub(super) fn entries(&self) -> EntriesIter { + self.entries.iter() + } pub(super) fn entries_mut(&mut self) -> EntriesIterMut { self.entries.iter_mut() @@ -72,9 +72,12 @@ impl Bucket { pub(super) fn kick(&mut self, bucket_depth: usize) -> Option> { // Get number of entries to attempt to purge from bucket let bucket_len = self.entries.len(); + + // Don't bother kicking bucket unless it is full if bucket_len <= bucket_depth { return None; } + // Try to purge the newest entries that overflow the bucket let mut dead_node_ids: BTreeSet = BTreeSet::new(); let mut extra_entries = bucket_len - bucket_depth; diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 18a9ffea..d05b4782 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -19,11 +19,11 @@ const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0; const UNRELIABLE_PING_SPAN_SECS: u32 = 60; const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum BucketEntryState { - Reliable, - Unreliable, Dead, + Unreliable, + Reliable, } #[derive(Debug, Clone)] @@ -243,14 +243,18 @@ impl BucketEntry { // if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS match self.peer_stats.ping_stats.first_consecutive_pong_time { None => false, - Some(ts) => (cur_ts - ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64), + Some(ts) => { + cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) + } } } pub(super) fn check_dead(&self, cur_ts: u64) -> bool { // if we have not heard from the node at all for the duration of the unreliable ping span match self.peer_stats.last_seen { None => true, - Some(ts) => (cur_ts - ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64), + Some(ts) => { + cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) + } } } @@ -268,9 +272,10 @@ impl BucketEntry { .first_consecutive_pong_time .unwrap(); let start_of_reliable_time = first_consecutive_pong_time - + (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64); - let reliable_cur = cur_ts - start_of_reliable_time; - let reliable_last = last_pinged - start_of_reliable_time; + + ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64 + * 1000000u64); + let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time); + let reliable_last = last_pinged.saturating_sub(start_of_reliable_time); retry_falloff_log( reliable_last, @@ -287,7 +292,7 @@ impl BucketEntry { match self.peer_stats.ping_stats.last_pinged { None => true, Some(last_pinged) => { - (cur_ts - last_pinged) + cur_ts.saturating_sub(last_pinged) >= (UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64) } } @@ -303,6 +308,43 @@ impl BucketEntry { self.peer_stats.last_seen = Some(ts); } + pub(super) fn state_debug_info(&self, cur_ts: u64) -> String { + let last_pinged = if let Some(last_pinged) = self.peer_stats.ping_stats.last_pinged { + format!( + "{}s ago", + timestamp_to_secs(cur_ts.saturating_sub(last_pinged)) + ) + } else { + "never".to_owned() + }; + let first_consecutive_pong_time = if let Some(first_consecutive_pong_time) = + self.peer_stats.ping_stats.first_consecutive_pong_time + { + format!( + "{}s ago", + timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_pong_time)) + ) + } else { + "never".to_owned() + }; + let last_seen = if let Some(last_seen) = self.peer_stats.last_seen { + format!( + "{}s ago", + timestamp_to_secs(cur_ts.saturating_sub(last_seen)) + ) + } else { + "never".to_owned() + }; + + format!( + "state: {:?}, first_consecutive_pong_time: {}, last_pinged: {}, last_seen: {}", + self.state(cur_ts), + first_consecutive_pong_time, + last_pinged, + last_seen + ) + } + //////////////////////////////////////////////////////////////// /// Called when rpc processor things happen diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index ac8ab4da..581d20b7 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -213,10 +213,14 @@ impl RoutingTable { .to_string(), ); debug!(" Origin: {:?}", origin); + + Self::trigger_changed_dial_info(&mut *inner); } pub fn clear_local_dial_info(&self) { - self.inner.lock().local_dial_info.clear(); + let mut inner = self.inner.lock(); + inner.local_dial_info.clear(); + Self::trigger_changed_dial_info(&mut *inner); } pub fn has_global_dial_info(&self) -> bool { @@ -290,10 +294,13 @@ impl RoutingTable { ); debug!(" Origin: {:?}", origin); debug!(" Network Class: {:?}", network_class); + Self::trigger_changed_dial_info(&mut *inner); } pub fn clear_global_dial_info(&self) { - self.inner.lock().global_dial_info.clear(); + let mut inner = self.inner.lock(); + inner.global_dial_info.clear(); + Self::trigger_changed_dial_info(&mut *inner); } pub async fn wait_changed_dial_info(&self) { @@ -304,14 +311,10 @@ impl RoutingTable { .instance_empty(); inst.await; } - pub async fn trigger_changed_dial_info(&self) { - let eventual = { - let mut inner = self.inner.lock(); - let mut new_eventual = Eventual::new(); - core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual); - new_eventual - }; - eventual.resolve().await; + fn trigger_changed_dial_info(inner: &mut RoutingTableInner) { + let mut new_eventual = Eventual::new(); + core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual); + spawn(new_eventual.resolve()).detach(); } fn bucket_depth(index: usize) -> usize { @@ -351,6 +354,38 @@ impl RoutingTable { *self.inner.lock() = Self::new_inner(self.network_manager()); } + // debugging info + pub fn debug_info(&self, min_state: BucketEntryState) -> String { + let inner = self.inner.lock(); + let cur_ts = get_timestamp(); + + let mut out = String::new(); + const COLS: usize = 16; + let rows = inner.buckets.len() / COLS; + let mut r = 0; + let mut b = 0; + out += "Buckets:\n"; + while r < rows { + let mut c = 0; + out += format!(" {:>3}: ", b).as_str(); + while c < COLS { + let mut cnt = 0; + for e in inner.buckets[b].entries() { + if e.1.state(cur_ts) >= min_state { + cnt += 1; + } + } + out += format!("{:>3} ", cnt).as_str(); + b += 1; + c += 1; + } + out += "\n"; + r += 1; + } + + out + } + // Just match address and port to help sort dialinfoentries for buckets // because inbound connections will not have dialinfo associated with them // but should have ip addresses if they have changed @@ -613,7 +648,7 @@ impl RoutingTable { c.network.bootstrap.clone() }; - trace!("Bootstrap task with: {:?}", bootstrap); + debug!("--- bootstrap_task"); // Map all bootstrap entries to a single key with multiple dialinfo let mut bsmap: BTreeMap> = BTreeMap::new(); @@ -630,6 +665,7 @@ impl RoutingTable { .or_insert_with(Vec::new) .push(ndis.dial_info); } + trace!(" bootstrap list: {:?}", bsmap); // Run all bootstrap operations concurrently let mut unord = FuturesUnordered::new(); @@ -641,7 +677,7 @@ impl RoutingTable { } }; - info!("Bootstrapping {} with {:?}", k.encode(), &v); + trace!(" bootstrapping {} with {:?}", k.encode(), &v); unord.push(self.reverse_find_node(nr, true)); } while unord.next().await.is_some() {} @@ -654,6 +690,8 @@ impl RoutingTable { // Ask our remaining peers to give us more peers before we go // back to the bootstrap servers to keep us from bothering them too much async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> { + trace!("--- peer_minimum_refresh task"); + // get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too let noderefs = { let mut inner = self.inner.lock(); @@ -665,11 +703,12 @@ impl RoutingTable { } noderefs }; + trace!(" refreshing with nodes: {:?}", noderefs); // do peer minimum search concurrently let mut unord = FuturesUnordered::new(); for nr in noderefs { - debug!("Peer minimum search with {:?}", nr); + debug!(" --- peer minimum search with {:?}", nr); unord.push(self.reverse_find_node(nr, false)); } while unord.next().await.is_some() {} @@ -680,12 +719,18 @@ impl RoutingTable { // Ping each node in the routing table if they need to be pinged // to determine their reliability async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { + trace!("--- ping_validator task"); let rpc = self.rpc_processor(); let mut inner = self.inner.lock(); for b in &mut inner.buckets { for (k, entry) in b.entries_mut() { if entry.needs_ping(cur_ts) { let nr = NodeRef::new(self.clone(), *k, entry); + debug!( + " --- ping validating: {:?} ({})", + nr, + entry.state_debug_info(cur_ts) + ); intf::spawn_local(rpc.clone().rpc_call_info(nr)).detach(); } } @@ -695,6 +740,7 @@ impl RoutingTable { // Compute transfer statistics to determine how 'fast' a node is async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { + trace!("--- rolling_transfers task"); let inner = &mut *self.inner.lock(); // Roll our own node's transfers diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 08c46497..41f71961 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1502,7 +1502,7 @@ impl RPCProcessor { body: Vec, peer_noderef: NodeRef, ) -> Result<(), ()> { - debug!("enqueue_message: body len = {}", body.len()); + trace!("enqueue_message: body len = {}", body.len()); let msg = RPCMessage { header: RPCMessageHeader { timestamp: get_timestamp(), diff --git a/veilid-core/src/veilid_api.rs b/veilid-core/src/veilid_api.rs index 9eb7acdb..55b09764 100644 --- a/veilid-core/src/veilid_api.rs +++ b/veilid-core/src/veilid_api.rs @@ -3,6 +3,7 @@ use crate::*; use attachment_manager::AttachmentManager; use core::fmt; use network_manager::NetworkManager; +use routing_table::*; use rpc_processor::{RPCError, RPCProcessor}; use xx::*; @@ -961,6 +962,51 @@ impl VeilidAPI { self.inner.lock().core.is_none() } + //////////////////////////////////////////////////////////////// + // Debugging + + async fn debug_buckets(&self, mut debug_args: Vec) -> Result { + let min_state = { + if let Some(min_state) = debug_args.pop() { + if min_state == "dead" { + BucketEntryState::Dead + } else if min_state == "reliable" { + BucketEntryState::Reliable + } else { + return Err(VeilidAPIError::Internal(format!( + "Invalid argument '{}'", + min_state + ))); + } + } else { + BucketEntryState::Unreliable + } + }; + // Dump routing table bucket info + let rpc = self.rpc_processor()?; + let routing_table = rpc.routing_table(); + Ok(routing_table.debug_info(min_state)) + } + + pub async fn debug(&self, what: String) -> Result { + trace!("VeilidCore::debug"); + let mut out = String::new(); + let mut debug_args: Vec = what + .split_ascii_whitespace() + .map(|s| s.to_owned()) + .collect(); + if let Some(arg) = debug_args.pop() { + if arg == "buckets" { + out += self.debug_buckets(debug_args).await?.as_str(); + } else { + out += ">>> Unknown command\n"; + } + } else { + out += ">>> Debug commands:\n buckets [dead|reliable]\n"; + } + Ok(out) + } + //////////////////////////////////////////////////////////////// // Attach/Detach diff --git a/veilid-server/proto/veilid-client.capnp b/veilid-server/proto/veilid-client.capnp index 8322a358..bead2659 100644 --- a/veilid-server/proto/veilid-client.capnp +++ b/veilid-server/proto/veilid-client.capnp @@ -34,14 +34,17 @@ interface VeilidServer { register @0 (veilidClient: VeilidClient) -> (registration: Registration); - attach @1 () -> (result: Bool); - detach @2 () -> (result: Bool); - shutdown @3 () -> (result: Bool); + attach @1 (); + detach @2 (); + shutdown @3 (); + + debug @4 (what: Text) -> (output: Text); } interface VeilidClient { stateChanged @0 (changed: VeilidStateChange); + logMessage @1 (message: Text); } \ No newline at end of file diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index aab1b81f..3d6b695a 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -160,6 +160,25 @@ impl veilid_server::Server for VeilidServerImpl { Promise::ok(()) } + + fn debug( + &mut self, + params: veilid_server::DebugParams, + mut results: veilid_server::DebugResults, + ) -> Promise<(), ::capnp::Error> { + trace!("VeilidServerImpl::attach"); + let veilid_api = self.veilid_api.clone(); + let what = pry!(pry!(params.get()).get_what()).to_owned(); + + Promise::from_future(async move { + let output = veilid_api + .debug(what) + .await + .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?; + results.get().set_output(output.as_str()); + Ok(()) + }) + } } // --- Client API Server-Side --------------------------------- @@ -268,9 +287,11 @@ impl ClientApi { } } - pub fn handle_state_change(self: Rc, changed: veilid_core::VeilidStateChange) { - trace!("state changed: {:?}", changed); - + fn send_request_to_all_clients(self: Rc, request: F) + where + F: Fn(u64, &mut RegistrationHandle) -> ::capnp::capability::RemotePromise, + T: capnp::traits::Pipelined + for<'a> capnp::traits::Owned<'a> + 'static + Unpin, + { // Send status update to each registered client let registration_map = self.inner.borrow().registration_map.clone(); let registration_map1 = registration_map.clone(); @@ -278,17 +299,16 @@ impl ClientApi { for (&id, mut registration) in regs.iter_mut() { if registration.requests_in_flight > 5 { debug!( - "too many requests in flight for status updates: {}", + "too many requests in flight: {}", registration.requests_in_flight ); } registration.requests_in_flight += 1; - // Make a state changed request - let mut request = registration.client.state_changed_request(); - let rpc_changed = request.get().init_changed(); - ClientApi::convert_state_changed(&changed, rpc_changed); + + let request_promise = request(id, registration); + let registration_map2 = registration_map1.clone(); - async_std::task::spawn_local(request.send().promise.map(move |r| match r { + async_std::task::spawn_local(request_promise.promise.map(move |r| match r { Ok(_) => { if let Some(ref mut s) = registration_map2.borrow_mut().registrations.get_mut(&id) @@ -304,6 +324,23 @@ impl ClientApi { } } + pub fn handle_state_change(self: Rc, changed: veilid_core::VeilidStateChange) { + self.send_request_to_all_clients(|_id, registration| { + let mut request = registration.client.state_changed_request(); + let rpc_changed = request.get().init_changed(); + ClientApi::convert_state_changed(&changed, rpc_changed); + request.send() + }); + } + + pub fn handle_client_log(self: Rc, message: String) { + self.send_request_to_all_clients(|_id, registration| { + let mut request = registration.client.log_message_request(); + request.get().set_message(&message); + request.send() + }); + } + pub fn run(self: Rc, bind_addrs: Vec) { // Create client api VeilidServer let veilid_server_impl = VeilidServerImpl::new(self.inner.borrow().veilid_api.clone()); diff --git a/veilid-server/src/client_log_channel.rs b/veilid-server/src/client_log_channel.rs new file mode 100644 index 00000000..5e370e10 --- /dev/null +++ b/veilid-server/src/client_log_channel.rs @@ -0,0 +1,48 @@ +use async_std::channel::{bounded, Receiver, RecvError, Sender, TrySendError}; +use std::sync::Arc; + +#[derive(Debug)] +struct ClientLogChannelInner { + sender: Sender, + receiver: Receiver, +} + +#[derive(Debug, Clone)] +pub struct ClientLogChannel { + inner: Arc, +} + +impl ClientLogChannel { + pub fn new() -> Self { + let (sender, receiver) = bounded(1); + Self { + inner: Arc::new(ClientLogChannelInner { sender, receiver }), + } + } + + pub async fn recv(&self) -> Result { + self.inner.receiver.recv().await + } +} + +impl std::io::Write for ClientLogChannel { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + if let Err(e) = self + .inner + .sender + .try_send(String::from_utf8_lossy(buf).to_string()) + { + match e { + TrySendError::Full(_) => Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)), + TrySendError::Closed(_) => { + Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) + } + } + } else { + Ok(buf.len()) + } + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 52785164..2605afea 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -3,6 +3,7 @@ #![deny(unused_must_use)] mod client_api; +mod client_log_channel; mod settings; #[allow(clippy::all)] diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 55eabdab..3d4bbc22 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -29,6 +29,9 @@ logging: path: "" append: true level: "info" + client: + enable: false + level: "info" testing: subnode_index: 0 core: @@ -283,6 +286,12 @@ pub struct File { pub level: LogLevel, } +#[derive(Debug, Deserialize)] +pub struct Client { + pub enabled: bool, + pub level: LogLevel, +} + #[derive(Debug, Deserialize)] pub struct ClientApi { pub enabled: bool, @@ -293,6 +302,7 @@ pub struct ClientApi { pub struct Logging { pub terminal: Terminal, pub file: File, + pub client: Client, } #[derive(Debug, Deserialize)] @@ -928,6 +938,8 @@ mod tests { assert_eq!(s.logging.file.path, ""); assert_eq!(s.logging.file.append, true); assert_eq!(s.logging.file.level, LogLevel::Info); + assert_eq!(s.logging.client.enabled, true); + assert_eq!(s.logging.client.level, LogLevel::Info); assert_eq!(s.testing.subnode_index, 0); assert_eq!( s.core.tablestore.directory, diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 1f919011..d2edf933 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -1,8 +1,10 @@ #[cfg(unix)] use crate::client_api; +use crate::client_log_channel::*; use crate::settings; use async_std::channel::{bounded, Receiver, Sender}; use clap::{App, Arg}; +use futures::*; use lazy_static::*; use log::*; use parking_lot::Mutex; @@ -187,7 +189,7 @@ pub async fn main() -> Result<(), String> { // Set up loggers let mut logs: Vec> = Vec::new(); - + let mut client_log_channel: Option = None; let mut cb = ConfigBuilder::new(); cb.add_filter_ignore_str("async_std"); cb.add_filter_ignore_str("async_io"); @@ -228,6 +230,15 @@ pub async fn main() -> Result<(), String> { logfile, )) } + if settingsr.logging.client.enabled { + let clog = ClientLogChannel::new(); + client_log_channel = Some(clog.clone()); + logs.push(WriteLogger::new( + settings::convert_loglevel(settingsr.logging.file.level), + cb.build(), + clog, + )) + } CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?; // Create Veilid Core @@ -280,12 +291,27 @@ pub async fn main() -> Result<(), String> { let capi_jh = async_std::task::spawn_local(async move { trace!("state change processing started"); while let Ok(change) = receiver.recv().await { - if let Some(c) = capi2.borrow_mut().as_mut().cloned() { + if let Some(c) = capi2.borrow().as_ref().cloned() { c.handle_state_change(change); } } trace!("state change processing stopped"); }); + // Handle log messages on main thread for capnproto rpc + let capi2 = capi.clone(); + let capi_jh2 = if let Some(client_log_channel) = client_log_channel { + Some(async_std::task::spawn_local(async move { + trace!("client logging started"); + while let Ok(message) = client_log_channel.recv().await { + if let Some(c) = capi2.borrow().as_ref().cloned() { + c.handle_client_log(message); + } + } + trace!("client logging stopped") + })) + } else { + None + }; // Auto-attach if desired if auto_attach { @@ -313,8 +339,11 @@ pub async fn main() -> Result<(), String> { // Shut down Veilid API veilid_api.shutdown().await; - // Wait for statechanged handler to exit + // Wait for client api handlers to exit capi_jh.await; + if let Some(capi_jh2) = capi_jh2 { + capi_jh2.await; + } Ok(()) }