debugging

This commit is contained in:
John Smith 2021-12-14 09:48:33 -05:00
parent 8fe99f6090
commit c4b66aad36
23 changed files with 411 additions and 143 deletions

1
Cargo.lock generated
View File

@ -3646,6 +3646,7 @@ dependencies = [
"async_executors", "async_executors",
"backtrace", "backtrace",
"blake3", "blake3",
"bugsalot",
"capnp", "capnp",
"capnpc", "capnpc",
"cfg-if 0.1.10", "cfg-if 0.1.10",

4
scripts/run_8.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
exec ./run_local_test.py 8 --config-file ./local-test.yml $1

View File

@ -141,7 +141,7 @@ def main():
# Run all secondaries and add primary to bootstrap # Run all secondaries and add primary to bootstrap
for n in range(1, args.count): for n in range(1, args.count):
# time.sleep(2) time.sleep(1)
sub_args = base_args.copy() sub_args = base_args.copy()
sub_args.append("--subnode_index={}".format(n)) sub_args.append("--subnode_index={}".format(n))

View File

@ -15,7 +15,8 @@ use log::*;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::rc::Rc; use std::rc::Rc;
use thiserror::Error; // use thiserror::Error;
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
/// ///
struct Dirty<T> { struct Dirty<T> {
@ -60,9 +61,9 @@ impl UIState {
} }
} }
#[derive(Error, Debug)] //#[derive(Error, Debug)]
#[error("???")] //#[error("???")]
struct UIError; //struct UIError;
pub struct UIInner { pub struct UIInner {
ui_state: UIState, ui_state: UIState,
@ -273,8 +274,10 @@ impl UI {
close_cb: UICallback, close_cb: UICallback,
) { ) {
// Creates a dialog around some text with a single button // Creates a dialog around some text with a single button
let close_cb = Rc::new(close_cb);
let close_cb2 = close_cb.clone();
s.add_layer( s.add_layer(
Dialog::around(TextView::new(contents)) Dialog::around(TextView::new(contents).scrollable())
.title(title) .title(title)
.button("Close", move |s| { .button("Close", move |s| {
s.pop_layer(); s.pop_layer();
@ -283,6 +286,11 @@ impl UI {
//.wrap_with(CircularFocus::new) //.wrap_with(CircularFocus::new)
//.wrap_tab(), //.wrap_tab(),
); );
s.set_global_callback(cursive::event::Event::Key(Key::Esc), move |s| {
s.set_global_callback(cursive::event::Event::Key(Key::Esc), UI::quit_handler);
s.pop_layer();
close_cb2(s);
});
} }
fn run_command(s: &mut Cursive, text: &str) -> Result<(), String> { fn run_command(s: &mut Cursive, text: &str) -> Result<(), String> {

View File

@ -65,6 +65,7 @@ serde_cbor = { version = "^0" }
if-addrs = { path = "../external/if-addrs" } if-addrs = { path = "../external/if-addrs" }
async_executors = { version = "^0", features = [ "async_std" ]} async_executors = { version = "^0", features = [ "async_std" ]}
socket2 = "^0" socket2 = "^0"
bugsalot = "^0"
# Dependencies for WASM builds only # Dependencies for WASM builds only
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]

View File

@ -64,7 +64,9 @@ impl ConnectionTable {
&self, &self,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
conn: NetworkConnection, conn: NetworkConnection,
) -> Result<ConnectionTableEntry, ()> { ) -> Result<ConnectionTableEntry, String> {
trace!("descriptor: {:?}", descriptor);
assert_ne!( assert_ne!(
descriptor.protocol_type(), descriptor.protocol_type(),
ProtocolType::UDP, ProtocolType::UDP,
@ -73,7 +75,10 @@ impl ConnectionTable {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if inner.conn_by_addr.contains_key(&descriptor) { if inner.conn_by_addr.contains_key(&descriptor) {
return Err(()); return Err(format!(
"Connection already added to table: {:?}",
descriptor
));
} }
let timestamp = get_timestamp(); let timestamp = get_timestamp();
@ -106,13 +111,15 @@ impl ConnectionTable {
pub fn remove_connection( pub fn remove_connection(
&self, &self,
descriptor: &ConnectionDescriptor, descriptor: &ConnectionDescriptor,
) -> Result<ConnectionTableEntry, ()> { ) -> Result<ConnectionTableEntry, String> {
trace!("descriptor: {:?}", descriptor);
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let res = inner.conn_by_addr.remove(descriptor); let res = inner.conn_by_addr.remove(descriptor);
match res { match res {
Some(v) => Ok(v), Some(v) => Ok(v),
None => Err(()), None => Err(format!("Connection not in table: {:?}", descriptor)),
} }
} }
} }

View File

@ -221,13 +221,13 @@ impl Crypto {
pub fn get_random_nonce() -> Nonce { pub fn get_random_nonce() -> Nonce {
let mut nonce = [0u8; 24]; let mut nonce = [0u8; 24];
let _ = random_bytes(&mut nonce).unwrap(); random_bytes(&mut nonce).unwrap();
nonce nonce
} }
pub fn get_random_secret() -> SharedSecret { pub fn get_random_secret() -> SharedSecret {
let mut s = [0u8; 32]; let mut s = [0u8; 32];
let _ = random_bytes(&mut s).unwrap(); random_bytes(&mut s).unwrap();
s s
} }
@ -251,7 +251,7 @@ impl Crypto {
associated_data: Option<&[u8]>, associated_data: Option<&[u8]>,
) -> Result<Vec<u8>, ()> { ) -> Result<Vec<u8>, ()> {
let mut out = body.to_vec(); let mut out = body.to_vec();
let _ = Self::decrypt_in_place(&mut out, nonce, shared_secret, associated_data)?; Self::decrypt_in_place(&mut out, nonce, shared_secret, associated_data)?;
Ok(out) Ok(out)
} }
@ -276,7 +276,7 @@ impl Crypto {
associated_data: Option<&[u8]>, associated_data: Option<&[u8]>,
) -> Result<Vec<u8>, ()> { ) -> Result<Vec<u8>, ()> {
let mut out = body.to_vec(); let mut out = body.to_vec();
let _ = Self::encrypt_in_place(&mut out, nonce, shared_secret, associated_data)?; Self::encrypt_in_place(&mut out, nonce, shared_secret, associated_data)?;
Ok(out) Ok(out)
} }
} }

View File

@ -219,7 +219,15 @@ macro_rules! byte_array_type {
impl fmt::Debug for $name { impl fmt::Debug for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, concat!(stringify!($name), "("))?; write!(f, concat!(stringify!($name), "("))?;
write!(f, "{}", String::from(self))?; write!(
f,
"{}",
if self.valid {
self.encode()
} else {
"".to_owned()
}
)?;
write!(f, ")") write!(f, ")")
} }
} }

View File

@ -586,10 +586,7 @@ impl Network {
// Return local dial infos we listen on // Return local dial infos we listen on
for ldi_addr in ldi_addrs { for ldi_addr in ldi_addrs {
out.push(DialInfo::udp( out.push(DialInfo::udp_from_socketaddr(ldi_addr));
Address::from_socket_addr(ldi_addr),
ldi_addr.port(),
));
} }
} }
} }
@ -748,7 +745,7 @@ impl Network {
.await .await
.map_err(|_| "failed to send message to UDP dial info".to_owned()); .map_err(|_| "failed to send message to UDP dial info".to_owned());
} else { } else {
return Err("no appropriate udp protocol handler for dial_info".to_owned()); return Err("no appropriate UDP protocol handler for dial_info".to_owned());
} }
} }
DialInfo::TCP(_) => { DialInfo::TCP(_) => {
@ -987,7 +984,7 @@ impl Network {
let mut dial_infos: Vec<DialInfo> = Vec::new(); let mut dial_infos: Vec<DialInfo> = Vec::new();
for (a, p) in addresses { for (a, p) in addresses {
let di = DialInfo::tcp(a, p); let di = DialInfo::tcp(a.to_canonical(), p);
dial_infos.push(di.clone()); dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static); routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
} }

View File

@ -135,7 +135,7 @@ impl RawTcpProtocolHandler {
let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)); let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream));
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
Address::from_socket_addr(socket_addr), Address::from_socket_addr(socket_addr).to_canonical(),
socket_addr.port(), socket_addr.port(),
ProtocolType::TCP, ProtocolType::TCP,
); );
@ -199,7 +199,7 @@ impl RawTcpProtocolHandler {
.map_err(|e| format!("couldn't get local address for tcp socket: {}", e))?; .map_err(|e| format!("couldn't get local address for tcp socket: {}", e))?;
let ps = AsyncPeekStream::new(ts); let ps = AsyncPeekStream::new(ts);
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
Address::from_socket_addr(remote_socket_addr), Address::from_socket_addr(remote_socket_addr).to_canonical(),
remote_socket_addr.port(), remote_socket_addr.port(),
ProtocolType::TCP, ProtocolType::TCP,
); );

View File

@ -31,8 +31,7 @@ pub async fn save_user_secret_string(
let krname = keyring_name(namespace); let krname = keyring_name(namespace);
let kr = get_keyring(krname.as_str(), key); let kr = get_keyring(krname.as_str(), key);
let existed = kr.get_password().is_ok(); let existed = kr.get_password().is_ok();
let _ = kr kr.set_password(value)
.set_password(value)
.map_err(|e| format!("Failed to save user secret: {}", e))?; .map_err(|e| format!("Failed to save user secret: {}", e))?;
Ok(existed) Ok(existed)
} }

View File

@ -298,7 +298,10 @@ impl NetworkManager {
.add_connection(descriptor.clone(), conn.clone()) .add_connection(descriptor.clone(), conn.clone())
{ {
Ok(e) => e, Ok(e) => e,
Err(_) => return, Err(err) => {
error!("{}", err);
return;
}
}; };
// //
@ -324,7 +327,9 @@ impl NetworkManager {
}; };
} }
let _ = this.connection_table().remove_connection(&descriptor); if let Err(err) = this.connection_table().remove_connection(&descriptor) {
error!("{}", err);
}
}) })
} }

View File

@ -38,6 +38,7 @@ pub struct BucketEntry {
impl BucketEntry { impl BucketEntry {
pub(super) fn new() -> Self { pub(super) fn new() -> Self {
let now = get_timestamp();
Self { Self {
ref_count: 0, ref_count: 0,
min_max_version: None, min_max_version: None,
@ -45,7 +46,7 @@ impl BucketEntry {
dial_info_entries: VecDeque::new(), dial_info_entries: VecDeque::new(),
stats_accounting: StatsAccounting::new(), stats_accounting: StatsAccounting::new(),
peer_stats: PeerStats { peer_stats: PeerStats {
time_added: get_timestamp(), time_added: now,
last_seen: None, last_seen: None,
ping_stats: PingStats::default(), ping_stats: PingStats::default(),
latency: None, latency: None,
@ -250,8 +251,9 @@ impl BucketEntry {
} }
pub(super) fn check_dead(&self, cur_ts: u64) -> bool { pub(super) fn check_dead(&self, cur_ts: u64) -> bool {
// if we have not heard from the node at all for the duration of the unreliable ping span // if we have not heard from the node at all for the duration of the unreliable ping span
// a node is not dead if we haven't heard from it yet
match self.peer_stats.last_seen { match self.peer_stats.last_seen {
None => true, None => false,
Some(ts) => { Some(ts) => {
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
} }
@ -353,6 +355,11 @@ impl BucketEntry {
self.stats_accounting.add_up(bytes); self.stats_accounting.add_up(bytes);
self.peer_stats.ping_stats.in_flight += 1; self.peer_stats.ping_stats.in_flight += 1;
self.peer_stats.ping_stats.last_pinged = Some(ts); self.peer_stats.ping_stats.last_pinged = Some(ts);
// if we haven't heard from this node yet and it's our first attempt at contacting it
// then we set the last_seen time
if self.peer_stats.last_seen.is_none() {
self.peer_stats.last_seen = Some(ts);
}
} }
pub(super) fn ping_rcvd(&mut self, ts: u64, bytes: u64) { pub(super) fn ping_rcvd(&mut self, ts: u64, bytes: u64) {
self.stats_accounting.add_down(bytes); self.stats_accounting.add_down(bytes);
@ -383,8 +390,13 @@ impl BucketEntry {
self.peer_stats.ping_stats.consecutive_pongs = 0; self.peer_stats.ping_stats.consecutive_pongs = 0;
self.peer_stats.ping_stats.first_consecutive_pong_time = None; self.peer_stats.ping_stats.first_consecutive_pong_time = None;
} }
pub(super) fn question_sent(&mut self, _ts: u64, bytes: u64) { pub(super) fn question_sent(&mut self, ts: u64, bytes: u64) {
self.stats_accounting.add_up(bytes); self.stats_accounting.add_up(bytes);
// if we haven't heard from this node yet and it's our first attempt at contacting it
// then we set the last_seen time
if self.peer_stats.last_seen.is_none() {
self.peer_stats.last_seen = Some(ts);
}
} }
pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) { pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) {
self.stats_accounting.add_down(bytes); self.stats_accounting.add_down(bytes);

View File

@ -0,0 +1,114 @@
use super::*;
impl RoutingTable {
pub fn debug_info_nodeinfo(&self) -> String {
let mut out = String::new();
let inner = self.inner.lock();
out += "Routing Table Info:\n";
out += &format!(" Node Id: {}\n", inner.node_id.encode());
out += &format!(" Stats Accounting: {:#?}\n\n", inner.stats_accounting);
out += &format!(" Transfer Stats: {:#?}\n\n", inner.transfer_stats);
out
}
pub fn debug_info_dialinfo(&self) -> String {
let ldis = self.local_dial_info();
let gdis = self.global_dial_info();
let mut out = String::new();
out += "Local Dial Info:\n";
for (n, ldi) in ldis.iter().enumerate() {
out += &format!(" {:>2}: {:?}\n", n, ldi);
}
out += "Global Dial Info:\n";
for (n, gdi) in gdis.iter().enumerate() {
out += &format!(" {:>2}: {:?}\n", n, gdi);
}
out
}
pub fn debug_info_entries(&self, limit: usize, min_state: BucketEntryState) -> String {
let inner = self.inner.lock();
let cur_ts = get_timestamp();
let mut out = String::new();
let blen = inner.buckets.len();
let mut b = 0;
let mut cnt = 0;
out += &format!("Entries: {}\n", inner.bucket_entry_count);
while b < blen {
if inner.buckets[b].entries().len() > 0 {
out += &format!(" Bucket #{}:\n", b);
for e in inner.buckets[b].entries() {
let state = e.1.state(cur_ts);
if state >= min_state {
out += &format!(
" {} [{}]\n",
e.0.encode(),
match state {
BucketEntryState::Reliable => "R",
BucketEntryState::Unreliable => "U",
BucketEntryState::Dead => "D",
}
);
cnt += 1;
if cnt >= limit {
break;
}
}
}
if cnt >= limit {
break;
}
}
b += 1;
}
out
}
pub fn debug_info_entry(&self, node_id: DHTKey) -> String {
let mut out = String::new();
out += &format!("Entry {:?}:\n", node_id);
if let Some(nr) = self.lookup_node_ref(node_id) {
out += &nr.operate(|e| format!("{:#?}\n", e));
} else {
out += "Entry not found\n";
}
out
}
pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.lock();
let cur_ts = get_timestamp();
let mut out = String::new();
const COLS: usize = 16;
let rows = inner.buckets.len() / COLS;
let mut r = 0;
let mut b = 0;
out += "Buckets:\n";
while r < rows {
let mut c = 0;
out += format!(" {:>3}: ", b).as_str();
while c < COLS {
let mut cnt = 0;
for e in inner.buckets[b].entries() {
if e.1.state(cur_ts) >= min_state {
cnt += 1;
}
}
out += format!("{:>3} ", cnt).as_str();
b += 1;
c += 1;
}
out += "\n";
r += 1;
}
out
}
}

View File

@ -1,5 +1,6 @@
mod bucket; mod bucket;
mod bucket_entry; mod bucket_entry;
mod debug;
mod dial_info_entry; mod dial_info_entry;
mod find_nodes; mod find_nodes;
mod node_ref; mod node_ref;
@ -15,6 +16,7 @@ use alloc::collections::VecDeque;
use alloc::str::FromStr; use alloc::str::FromStr;
use bucket::*; use bucket::*;
pub use bucket_entry::*; pub use bucket_entry::*;
pub use debug::*;
pub use dial_info_entry::*; pub use dial_info_entry::*;
pub use find_nodes::*; pub use find_nodes::*;
use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::stream::{FuturesUnordered, StreamExt};
@ -354,38 +356,6 @@ impl RoutingTable {
*self.inner.lock() = Self::new_inner(self.network_manager()); *self.inner.lock() = Self::new_inner(self.network_manager());
} }
// debugging info
pub fn debug_info(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.lock();
let cur_ts = get_timestamp();
let mut out = String::new();
const COLS: usize = 16;
let rows = inner.buckets.len() / COLS;
let mut r = 0;
let mut b = 0;
out += "Buckets:\n";
while r < rows {
let mut c = 0;
out += format!(" {:>3}: ", b).as_str();
while c < COLS {
let mut cnt = 0;
for e in inner.buckets[b].entries() {
if e.1.state(cur_ts) >= min_state {
cnt += 1;
}
}
out += format!("{:>3} ", cnt).as_str();
b += 1;
c += 1;
}
out += "\n";
r += 1;
}
out
}
// Just match address and port to help sort dialinfoentries for buckets // Just match address and port to help sort dialinfoentries for buckets
// because inbound connections will not have dialinfo associated with them // because inbound connections will not have dialinfo associated with them
// but should have ip addresses if they have changed // but should have ip addresses if they have changed
@ -590,7 +560,6 @@ impl RoutingTable {
for p in res.peers { for p in res.peers {
// if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table
if p.node_id.key == node_id { if p.node_id.key == node_id {
// however, it is useful to note when
continue; continue;
} }

View File

@ -1188,7 +1188,7 @@ impl RPCProcessor {
// add node information for the requesting node to our routing table // add node information for the requesting node to our routing table
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let _ = routing_table let _requesting_node_ref = routing_table
.register_node_with_dial_info(peer_info.node_id.key, &peer_info.dial_infos) .register_node_with_dial_info(peer_info.node_id.key, &peer_info.dial_infos)
.map_err(map_error_string!())?; .map_err(map_error_string!())?;
@ -1529,7 +1529,7 @@ impl RPCProcessor {
let mut respond_to = question.reborrow().init_respond_to(); let mut respond_to = question.reborrow().init_respond_to();
respond_to.set_sender(()); respond_to.set_sender(());
let detail = question.reborrow().init_detail(); let detail = question.reborrow().init_detail();
let _ = detail.init_info_q(); detail.init_info_q();
info_q_msg.into_reader() info_q_msg.into_reader()
}; };

View File

@ -69,27 +69,27 @@ pub async fn test_add_get_remove() {
let entry1 = table.add_connection(a1.clone(), c1.clone()).unwrap(); let entry1 = table.add_connection(a1.clone(), c1.clone()).unwrap();
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.remove_connection(&a3), Err(())); assert_err!(table.remove_connection(&a3));
assert_eq!(table.remove_connection(&a4), Err(())); assert_err!(table.remove_connection(&a4));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.get_connection(&a1), Some(entry1.clone()));
assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.get_connection(&a1), Some(entry1.clone()));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.add_connection(a1.clone(), c1.clone()), Err(())); assert_err!(table.add_connection(a1.clone(), c1.clone()));
assert_eq!(table.add_connection(a1.clone(), c2.clone()), Err(())); assert_err!(table.add_connection(a1.clone(), c2.clone()));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.get_connection(&a1), Some(entry1.clone()));
assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.get_connection(&a1), Some(entry1.clone()));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.remove_connection(&a2), Ok(entry1)); assert_eq!(table.remove_connection(&a2), Ok(entry1));
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
assert_eq!(table.remove_connection(&a2), Err(())); assert_err!(table.remove_connection(&a2));
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
assert_eq!(table.get_connection(&a2), None); assert_eq!(table.get_connection(&a2), None);
assert_eq!(table.get_connection(&a1), None); assert_eq!(table.get_connection(&a1), None);
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
let entry2 = table.add_connection(a1, c1.clone()).unwrap(); let entry2 = table.add_connection(a1, c1.clone()).unwrap();
assert_eq!(table.add_connection(a2.clone(), c1), Err(())); assert_err!(table.add_connection(a2.clone(), c1));
let entry3 = table.add_connection(a3.clone(), c2).unwrap(); let entry3 = table.add_connection(a3.clone(), c2).unwrap();
let entry4 = table.add_connection(a4.clone(), c3).unwrap(); let entry4 = table.add_connection(a4.clone(), c3).unwrap();
assert_eq!(table.connection_count(), 3); assert_eq!(table.connection_count(), 3);

View File

@ -361,13 +361,6 @@ macro_rules! assert_split_url_parse {
assert_eq!(su1.to_string(), url); assert_eq!(su1.to_string(), url);
}; };
} }
macro_rules! assert_err {
($ex:expr) => {
if let Ok(v) = $ex {
panic!("assertion failed, expected Err(..), got {:?}", v);
}
};
}
pub async fn test_split_url() { pub async fn test_split_url() {
info!("testing split_url"); info!("testing split_url");

View File

@ -0,0 +1,160 @@
////////////////////////////////////////////////////////////////
// Debugging
use super::*;
fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> {
if text == "dead" {
Some(BucketEntryState::Dead)
} else if text == "reliable" {
Some(BucketEntryState::Reliable)
} else if text == "unreliable" {
Some(BucketEntryState::Unreliable)
} else {
None
}
}
fn get_number(text: &str) -> Option<usize> {
usize::from_str(text).ok()
}
fn get_dht_key(text: &str) -> Option<DHTKey> {
DHTKey::try_decode(text).ok()
}
fn get_debug_argument<T, G: FnOnce(&str) -> Option<T>>(
value: &str,
context: &str,
argument: &str,
getter: G,
) -> Result<T, VeilidAPIError> {
if let Some(val) = getter(value) {
Ok(val)
} else {
Err(VeilidAPIError::InvalidArgument {
context: context.to_owned(),
argument: argument.to_owned(),
value: value.to_owned(),
})
}
}
fn get_debug_argument_at<T, G: FnOnce(&str) -> Option<T>>(
debug_args: &[String],
pos: usize,
context: &str,
argument: &str,
getter: G,
) -> Result<T, VeilidAPIError> {
if pos >= debug_args.len() {
return Err(VeilidAPIError::MissingArgument {
context: context.to_owned(),
argument: argument.to_owned(),
});
}
let value = &debug_args[pos];
if let Some(val) = getter(value) {
Ok(val)
} else {
Err(VeilidAPIError::InvalidArgument {
context: context.to_owned(),
argument: argument.to_owned(),
value: value.to_owned(),
})
}
}
impl VeilidAPI {
async fn debug_buckets(&self, debug_args: &[String]) -> Result<String, VeilidAPIError> {
let mut min_state = BucketEntryState::Unreliable;
if debug_args.len() == 1 {
min_state = get_debug_argument(
&debug_args[0],
"debug_buckets",
"min_state",
get_bucket_entry_state,
)?;
}
// Dump routing table bucket info
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_buckets(min_state))
}
async fn debug_dialinfo(&self, _debug_args: &[String]) -> Result<String, VeilidAPIError> {
// Dump routing table dialinfo
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_dialinfo())
}
async fn debug_entries(&self, debug_args: &[String]) -> Result<String, VeilidAPIError> {
let mut min_state = BucketEntryState::Unreliable;
let mut limit = 20;
for arg in debug_args {
if let Some(ms) = get_bucket_entry_state(arg) {
min_state = ms;
} else if let Some(lim) = get_number(arg) {
limit = lim;
} else {
return Err(VeilidAPIError::InvalidArgument {
context: "debug_entries".to_owned(),
argument: "unknown".to_owned(),
value: arg.clone(),
});
}
}
// Dump routing table entries
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_entries(limit, min_state))
}
async fn debug_entry(&self, debug_args: &[String]) -> Result<String, VeilidAPIError> {
let node_id = get_debug_argument_at(debug_args, 0, "debug_entry", "node_id", get_dht_key)?;
// Dump routing table entry
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_entry(node_id))
}
async fn debug_nodeinfo(&self, _debug_args: &[String]) -> Result<String, VeilidAPIError> {
// Dump routing table entry
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_nodeinfo())
}
pub async fn debug(&self, what: String) -> Result<String, VeilidAPIError> {
trace!("VeilidCore::debug");
let debug_args: Vec<String> = what
.split_ascii_whitespace()
.map(|s| s.to_owned())
.collect();
if debug_args.is_empty() {
return Ok(r#">>> Debug commands:
buckets [dead|reliable]
dialinfo
entries [dead|reliable] [limit]
entry [node_id]
nodeinfo
"#
.to_owned());
}
let mut out = String::new();
let arg = &debug_args[0];
if arg == "buckets" {
out += self.debug_buckets(&debug_args[1..]).await?.as_str();
} else if arg == "dialinfo" {
out += self.debug_dialinfo(&debug_args[1..]).await?.as_str();
} else if arg == "entries" {
out += self.debug_entries(&debug_args[1..]).await?.as_str();
} else if arg == "entry" {
out += self.debug_entry(&debug_args[1..]).await?.as_str();
} else if arg == "nodeinfo" {
out += self.debug_nodeinfo(&debug_args[1..]).await?.as_str();
} else {
out += ">>> Unknown command\n";
}
Ok(out)
}
}

View File

@ -1,3 +1,6 @@
mod debug;
pub use debug::*;
pub use crate::rpc_processor::InfoAnswer; pub use crate::rpc_processor::InfoAnswer;
use crate::*; use crate::*;
use attachment_manager::AttachmentManager; use attachment_manager::AttachmentManager;
@ -105,6 +108,16 @@ pub enum Address {
} }
impl Address { impl Address {
pub fn to_canonical(&self) -> Address {
match self {
Address::IPV4(v4) => Address::IPV4(*v4),
Address::IPV6(v6) => match v6.to_ipv4() {
Some(v4) => Address::IPV4(v4),
None => Address::IPV6(*v6),
},
Address::Hostname(h) => Address::Hostname(h.clone()),
}
}
pub fn from_socket_addr(sa: SocketAddr) -> Address { pub fn from_socket_addr(sa: SocketAddr) -> Address {
match sa { match sa {
SocketAddr::V4(v4) => Address::IPV4(*v4.ip()), SocketAddr::V4(v4) => Address::IPV4(*v4.ip()),
@ -199,23 +212,25 @@ pub enum DialInfo {
impl DialInfo { impl DialInfo {
pub fn udp_from_socketaddr(socketaddr: SocketAddr) -> Self { pub fn udp_from_socketaddr(socketaddr: SocketAddr) -> Self {
Self::UDP(DialInfoUDP { Self::UDP(DialInfoUDP {
address: Address::from_socket_addr(socketaddr), address: Address::from_socket_addr(socketaddr).to_canonical(),
port: socketaddr.port(), port: socketaddr.port(),
}) })
} }
pub fn tcp_from_socketaddr(socketaddr: SocketAddr) -> Self { pub fn tcp_from_socketaddr(socketaddr: SocketAddr) -> Self {
Self::TCP(DialInfoTCP { Self::TCP(DialInfoTCP {
address: Address::from_socket_addr(socketaddr), address: Address::from_socket_addr(socketaddr).to_canonical(),
port: socketaddr.port(), port: socketaddr.port(),
}) })
} }
pub fn udp(address: Address, port: u16) -> Self { pub fn udp(address: Address, port: u16) -> Self {
let address = address.to_canonical();
if let Address::Hostname(_) = address { if let Address::Hostname(_) = address {
panic!("invalid address type for protocol") panic!("invalid address type for protocol")
} }
Self::UDP(DialInfoUDP { address, port }) Self::UDP(DialInfoUDP { address, port })
} }
pub fn tcp(address: Address, port: u16) -> Self { pub fn tcp(address: Address, port: u16) -> Self {
let address = address.to_canonical();
if let Address::Hostname(_) = address { if let Address::Hostname(_) = address {
panic!("invalid address type for protocol") panic!("invalid address type for protocol")
} }
@ -673,11 +688,11 @@ pub struct PingStats {
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct PeerStats { pub struct PeerStats {
pub time_added: u64, // when the peer was added to the routing table pub time_added: u64, // when the peer was added to the routing table
pub last_seen: Option<u64>, // when the peer was last seen for any reason pub last_seen: Option<u64>, // when the peer was last seen for any reason, including when we first attempted to reach out to it
pub ping_stats: PingStats, // information about pings pub ping_stats: PingStats, // information about pings
pub latency: Option<LatencyStats>, // latencies for communications with the peer pub latency: Option<LatencyStats>, // latencies for communications with the peer
pub transfer: TransferStatsDownUp, // Stats for communications with the peer pub transfer: TransferStatsDownUp, // Stats for communications with the peer
pub node_info: Option<NodeInfo>, // Last known node info pub node_info: Option<NodeInfo>, // Last known node info
} }
cfg_if! { cfg_if! {
@ -698,6 +713,15 @@ pub enum VeilidAPIError {
NoDialInfo(NodeId), NoDialInfo(NodeId),
Internal(String), Internal(String),
Unimplemented(String), Unimplemented(String),
InvalidArgument {
context: String,
argument: String,
value: String,
},
MissingArgument {
context: String,
argument: String,
},
} }
fn convert_rpc_error(x: RPCError) -> VeilidAPIError { fn convert_rpc_error(x: RPCError) -> VeilidAPIError {
@ -962,51 +986,6 @@ impl VeilidAPI {
self.inner.lock().core.is_none() self.inner.lock().core.is_none()
} }
////////////////////////////////////////////////////////////////
// Debugging
async fn debug_buckets(&self, mut debug_args: Vec<String>) -> Result<String, VeilidAPIError> {
let min_state = {
if let Some(min_state) = debug_args.pop() {
if min_state == "dead" {
BucketEntryState::Dead
} else if min_state == "reliable" {
BucketEntryState::Reliable
} else {
return Err(VeilidAPIError::Internal(format!(
"Invalid argument '{}'",
min_state
)));
}
} else {
BucketEntryState::Unreliable
}
};
// Dump routing table bucket info
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info(min_state))
}
pub async fn debug(&self, what: String) -> Result<String, VeilidAPIError> {
trace!("VeilidCore::debug");
let mut out = String::new();
let mut debug_args: Vec<String> = what
.split_ascii_whitespace()
.map(|s| s.to_owned())
.collect();
if let Some(arg) = debug_args.pop() {
if arg == "buckets" {
out += self.debug_buckets(debug_args).await?.as_str();
} else {
out += ">>> Unknown command\n";
}
} else {
out += ">>> Debug commands:\n buckets [dead|reliable]\n";
}
Ok(out)
}
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Attach/Detach // Attach/Detach

View File

@ -1,6 +1,15 @@
use crate::xx::*; use crate::xx::*;
use alloc::string::ToString; use alloc::string::ToString;
#[macro_export]
macro_rules! assert_err {
($ex:expr) => {
if let Ok(v) = $ex {
panic!("assertion failed, expected Err(..), got {:?}", v);
}
};
}
pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> { pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> {
if let Some(split) = name.rfind(':') { if let Some(split) = name.rfind(':') {
let hoststr = &name[0..split]; let hoststr = &name[0..split];

View File

@ -221,7 +221,9 @@ impl ClientApi {
inner.join_handle.take().unwrap() inner.join_handle.take().unwrap()
}; };
trace!("ClientApi::stop: waiting for stop"); trace!("ClientApi::stop: waiting for stop");
let _ = jh.await; if let Err(err) = jh.await {
error!("{}", err);
}
trace!("ClientApi::stop: stopped"); trace!("ClientApi::stop: stopped");
} }

View File

@ -35,13 +35,13 @@ fn parse_command_line(default_config_path: &OsStr) -> Result<clap::ArgMatches, c
.arg( .arg(
Arg::with_name("debug") Arg::with_name("debug")
.long("debug") .long("debug")
.help("Turn on debug logging"), .help("Turn on debug logging on the terminal"),
) )
.arg( .arg(
Arg::with_name("trace") Arg::with_name("trace")
.long("trace") .long("trace")
.conflicts_with("debug") .conflicts_with("debug")
.help("Turn on trace logging"), .help("Turn on trace logging on the terminal"),
) )
.arg( .arg(
Arg::with_name("generate-id") Arg::with_name("generate-id")
@ -72,15 +72,14 @@ fn parse_command_line(default_config_path: &OsStr) -> Result<clap::ArgMatches, c
.possible_values(&["false", "true"]) .possible_values(&["false", "true"])
.help("Automatically attach the server to the Veilid network"), .help("Automatically attach the server to the Veilid network"),
) )
.arg( ;
Arg::with_name("wait-for-debug") #[cfg(debug_assertions)]
.long("wait-for-debug") let matches = matches.arg(
.help("Wait for debugger to attach"), Arg::with_name("wait-for-debug")
) .long("wait-for-debug")
.help("Wait for debugger to attach"),
.get_matches(); );
Ok(matches.get_matches())
Ok(matches)
} }
lazy_static! { lazy_static! {
@ -108,6 +107,7 @@ pub async fn main() -> Result<(), String> {
.map_err(|e| format!("failed to parse command line: {}", e))?; .map_err(|e| format!("failed to parse command line: {}", e))?;
// Check for one-off commands // Check for one-off commands
#[cfg(debug_assertions)]
if matches.occurrences_of("wait-for-debug") != 0 { if matches.occurrences_of("wait-for-debug") != 0 {
use bugsalot::debugger; use bugsalot::debugger;
debugger::wait_until_attached(None).expect("state() not implemented on this platform"); debugger::wait_until_attached(None).expect("state() not implemented on this platform");
@ -148,12 +148,12 @@ pub async fn main() -> Result<(), String> {
settingsrw.testing.subnode_index = subnode_index; settingsrw.testing.subnode_index = subnode_index;
} }
if matches.occurrences_of("debug") != 0 { if matches.occurrences_of("debug") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = settings::LogLevel::Debug; settingsrw.logging.terminal.level = settings::LogLevel::Debug;
settingsrw.logging.file.level = settings::LogLevel::Debug;
} }
if matches.occurrences_of("trace") != 0 { if matches.occurrences_of("trace") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = settings::LogLevel::Trace; settingsrw.logging.terminal.level = settings::LogLevel::Trace;
settingsrw.logging.file.level = settings::LogLevel::Trace;
} }
if matches.is_present("attach") { if matches.is_present("attach") {
settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("false")); settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("false"));
@ -234,7 +234,7 @@ pub async fn main() -> Result<(), String> {
client_log_channel = Some(clog); client_log_channel = Some(clog);
client_log_channel_closer = Some(clogcloser); client_log_channel_closer = Some(clogcloser);
logs.push(WriteLogger::new( logs.push(WriteLogger::new(
settings::convert_loglevel(settingsr.logging.file.level), settings::convert_loglevel(settingsr.logging.client.level),
cb.build(), cb.build(),
clogwriter, clogwriter,
)) ))