better debug commands for dht

This commit is contained in:
John Smith 2023-12-08 10:57:58 -05:00 committed by Christien Rioux
parent 99fb135b5b
commit cdf823c1cc
4 changed files with 222 additions and 105 deletions

View File

@ -137,10 +137,8 @@ impl RoutingDomainEditor {
None None
}; };
// Debug print
log_rtab!(debug "[{:?}] COMMIT: {:?}", self.routing_domain, self.changes);
// Apply changes // Apply changes
log_rtab!("[{:?}] COMMIT: {:?}", self.routing_domain, self.changes);
let mut peer_info_changed = false; let mut peer_info_changed = false;
{ {
let mut inner = self.routing_table.inner.write(); let mut inner = self.routing_table.inner.write();
@ -181,7 +179,7 @@ impl RoutingDomainEditor {
peer_info_changed = true; peer_info_changed = true;
} }
RoutingDomainChange::SetRelayNodeKeepalive { ts } => { RoutingDomainChange::SetRelayNodeKeepalive { ts } => {
debug!("[{:?}] relay node keepalive: {:?}", self.routing_domain, ts); trace!("[{:?}] relay node keepalive: {:?}", self.routing_domain, ts);
detail.common_mut().set_relay_node_last_keepalive(ts); detail.common_mut().set_relay_node_last_keepalive(ts);
} }
RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => { RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => {

View File

@ -100,7 +100,7 @@ impl RPCProcessor {
if opt_watcher.is_some() { "+W " } else { "" }, if opt_watcher.is_some() { "+W " } else { "" },
subkeys, subkeys,
expiration, expiration,
peer.len() peer.len(),
dest dest
); );

View File

@ -3,16 +3,20 @@
use super::*; use super::*;
use data_encoding::BASE64URL_NOPAD; use data_encoding::BASE64URL_NOPAD;
use hashlink::LinkedHashMap;
use network_manager::*; use network_manager::*;
use once_cell::sync::Lazy;
use routing_table::*; use routing_table::*;
#[derive(Default, Debug)] #[derive(Default)]
struct DebugCache { struct DebugCache {
imported_routes: Vec<RouteId>, imported_routes: Vec<RouteId>,
opened_record_contexts: Lazy<LinkedHashMap<TypedKey, RoutingContext>>,
} }
static DEBUG_CACHE: Mutex<DebugCache> = Mutex::new(DebugCache { static DEBUG_CACHE: Mutex<DebugCache> = Mutex::new(DebugCache {
imported_routes: Vec::new(), imported_routes: Vec::new(),
opened_record_contexts: Lazy::new(LinkedHashMap::new),
}); });
fn format_opt_ts(ts: Option<TimestampDuration>) -> String { fn format_opt_ts(ts: Option<TimestampDuration>) -> String {
@ -283,6 +287,7 @@ fn get_destination(
fn get_number(text: &str) -> Option<usize> { fn get_number(text: &str) -> Option<usize> {
usize::from_str(text).ok() usize::from_str(text).ok()
} }
fn get_typed_key(text: &str) -> Option<TypedKey> { fn get_typed_key(text: &str) -> Option<TypedKey> {
TypedKey::from_str(text).ok() TypedKey::from_str(text).ok()
} }
@ -301,6 +306,18 @@ fn get_crypto_system_version(crypto: Crypto) -> impl FnOnce(&str) -> Option<Cryp
} }
} }
fn get_dht_key_no_safety(text: &str) -> Option<TypedKey> {
let key = if let Some(key) = get_public_key(text) {
TypedKey::new(best_crypto_kind(), key)
} else if let Some(key) = get_typed_key(text) {
key
} else {
return None;
};
Some(key)
}
fn get_dht_key( fn get_dht_key(
routing_table: RoutingTable, routing_table: RoutingTable,
) -> impl FnOnce(&str) -> Option<(TypedKey, Option<SafetySelection>)> { ) -> impl FnOnce(&str) -> Option<(TypedKey, Option<SafetySelection>)> {
@ -515,6 +532,34 @@ async fn async_get_debug_argument_at<T, G: FnOnce(&str) -> SendPinBoxFuture<Opti
Ok(val) Ok(val)
} }
fn get_opened_dht_record_context(
args: &[String],
context: &str,
key: &str,
arg: usize,
) -> VeilidAPIResult<(TypedKey, RoutingContext)> {
let dc = DEBUG_CACHE.lock();
let key = match get_debug_argument_at(args, arg, context, key, get_dht_key_no_safety)
.ok()
.or_else(|| {
// If unspecified, use the most recent key opened or created
dc.opened_record_contexts.back().map(|kv| kv.0).copied()
}) {
Some(k) => k,
None => {
apibail_missing_argument!("no keys are opened", "key");
}
};
// Get routing context for record
let Some(rc) = dc.opened_record_contexts.get(&key).cloned() else {
apibail_missing_argument!("key is not opened", "key");
};
Ok((key, rc))
}
pub fn print_data(data: &[u8], truncate_len: Option<usize>) -> String { pub fn print_data(data: &[u8], truncate_len: Option<usize>) -> String {
// check if message body is ascii printable // check if message body is ascii printable
let mut printable = true; let mut printable = true;
@ -1410,25 +1455,88 @@ impl VeilidAPI {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)), Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v, Ok(v) => v,
}; };
match rc.close_dht_record(*record.key()).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)), // Save routing context for record
Ok(v) => v, let mut dc = DEBUG_CACHE.lock();
}; dc.opened_record_contexts.insert(*record.key(), rc);
debug!("DHT Record Created:\n{:#?}", record); debug!("DHT Record Created:\n{:#?}", record);
Ok(format!("{:?}", record)) Ok(format!("Created: {:?}", record))
} }
async fn debug_record_get(&self, args: Vec<String>) -> VeilidAPIResult<String> { async fn debug_record_open(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let netman = self.network_manager()?; let netman = self.network_manager()?;
let routing_table = netman.routing_table(); let routing_table = netman.routing_table();
let (key, ss) = get_debug_argument_at( let (key, ss) = get_debug_argument_at(
&args, &args,
1, 1,
"debug_record_get", "debug_record_open",
"key", "key",
get_dht_key(routing_table), get_dht_key(routing_table),
)?; )?;
let writer =
get_debug_argument_at(&args, 2, "debug_record_open", "writer", get_keypair).ok();
// Get routing context with optional safety
let rc = self.routing_context()?;
let rc = if let Some(ss) = ss {
match rc.with_safety(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
}
} else {
rc
};
// Do a record open
let record = match rc.open_dht_record(key, writer).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
// Save routing context for record
let mut dc = DEBUG_CACHE.lock();
dc.opened_record_contexts.insert(*record.key(), rc);
debug!("DHT Record Opened:\n{:#?}", record);
Ok(format!("Opened: {:?}", record))
}
async fn debug_record_close(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_close", "key", 1)?;
// Do a record close
if let Err(e) = rc.close_dht_record(key).await {
return Ok(format!("Can't close DHT record: {}", e));
};
debug!("DHT Record Closed:\n{:#?}", key);
Ok(format!("Closed: {:?}", key))
}
async fn debug_record_set(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_set", "key", 1)?;
let subkey = get_debug_argument_at(&args, 2, "debug_record_set", "subkey", get_number)?;
let data = get_debug_argument_at(&args, 3, "debug_record_set", "data", get_data)?;
// Do a record set
let value = match rc.set_dht_value(key, subkey as ValueSubkey, data).await {
Err(e) => {
return Ok(format!("Can't set DHT value: {}", e));
}
Ok(v) => v,
};
let out = if let Some(value) = value {
format!("{:?}", value)
} else {
"No value data returned".to_owned()
};
Ok(out)
}
async fn debug_record_get(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_get", "key", 1)?;
let subkey = get_debug_argument_at(&args, 2, "debug_record_get", "subkey", get_number)?; let subkey = get_debug_argument_at(&args, 2, "debug_record_get", "subkey", get_number)?;
let force_refresh = if args.len() >= 4 { let force_refresh = if args.len() >= 4 {
Some(get_debug_argument_at( Some(get_debug_argument_at(
@ -1452,36 +1560,12 @@ impl VeilidAPI {
false false
}; };
// Get routing context with optional privacy
let rc = self.routing_context()?;
let rc = if let Some(ss) = ss {
match rc.with_safety(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
}
} else {
rc
};
// Do a record get // Do a record get
let _record = match rc.open_dht_record(key, None).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
let value = match rc let value = match rc
.get_dht_value(key, subkey as ValueSubkey, force_refresh) .get_dht_value(key, subkey as ValueSubkey, force_refresh)
.await .await
{ {
Err(e) => { Err(e) => {
match rc.close_dht_record(key).await {
Err(e) => {
return Ok(format!(
"Can't get DHT value and can't close DHT record: {}",
e
))
}
Ok(v) => v,
};
return Ok(format!("Can't get DHT value: {}", e)); return Ok(format!("Can't get DHT value: {}", e));
} }
Ok(v) => v, Ok(v) => v,
@ -1491,75 +1575,19 @@ impl VeilidAPI {
} else { } else {
"No value data returned".to_owned() "No value data returned".to_owned()
}; };
match rc.close_dht_record(key).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)),
Ok(v) => v,
};
Ok(out)
}
async fn debug_record_set(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let netman = self.network_manager()?;
let routing_table = netman.routing_table();
let (key, ss) = get_debug_argument_at(
&args,
1,
"debug_record_set",
"key",
get_dht_key(routing_table),
)?;
let subkey = get_debug_argument_at(&args, 2, "debug_record_set", "subkey", get_number)?;
let writer = get_debug_argument_at(&args, 3, "debug_record_set", "writer", get_keypair)?;
let data = get_debug_argument_at(&args, 4, "debug_record_set", "data", get_data)?;
// Get routing context with optional privacy
let rc = self.routing_context()?;
let rc = if let Some(ss) = ss {
match rc.with_safety(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
}
} else {
rc
};
// Do a record get
let _record = match rc.open_dht_record(key, Some(writer)).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
let value = match rc.set_dht_value(key, subkey as ValueSubkey, data).await {
Err(e) => {
match rc.close_dht_record(key).await {
Err(e) => {
return Ok(format!(
"Can't set DHT value and can't close DHT record: {}",
e
))
}
Ok(v) => v,
};
return Ok(format!("Can't set DHT value: {}", e));
}
Ok(v) => v,
};
let out = if let Some(value) = value {
format!("{:?}", value)
} else {
"No value data returned".to_owned()
};
match rc.close_dht_record(key).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)),
Ok(v) => v,
};
Ok(out) Ok(out)
} }
async fn debug_record_delete(&self, args: Vec<String>) -> VeilidAPIResult<String> { async fn debug_record_delete(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let key = get_debug_argument_at(&args, 1, "debug_record_delete", "key", get_typed_key)?; let key = get_debug_argument_at(
&args,
1,
"debug_record_delete",
"key",
get_dht_key_no_safety,
)?;
// Do a record delete // Do a record delete (can use any routing context here)
let rc = self.routing_context()?; let rc = self.routing_context()?;
match rc.delete_dht_record(key).await { match rc.delete_dht_record(key).await {
Err(e) => return Ok(format!("Can't delete DHT record: {}", e)), Err(e) => return Ok(format!("Can't delete DHT record: {}", e)),
@ -1571,7 +1599,8 @@ impl VeilidAPI {
async fn debug_record_info(&self, args: Vec<String>) -> VeilidAPIResult<String> { async fn debug_record_info(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let storage_manager = self.storage_manager()?; let storage_manager = self.storage_manager()?;
let key = get_debug_argument_at(&args, 1, "debug_record_info", "key", get_typed_key)?; let key =
get_debug_argument_at(&args, 1, "debug_record_info", "key", get_dht_key_no_safety)?;
let subkey = let subkey =
get_debug_argument_at(&args, 2, "debug_record_info", "subkey", get_number).ok(); get_debug_argument_at(&args, 2, "debug_record_info", "subkey", get_number).ok();
@ -1595,6 +1624,53 @@ impl VeilidAPI {
Ok(out) Ok(out)
} }
async fn debug_record_watch(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?;
let subkeys = get_debug_argument_at(&args, 2, "debug_record_watch", "subkeys", get_subkeys)
.ok()
.unwrap_or_default();
let expiration =
get_debug_argument_at(&args, 3, "debug_record_watch", "expiration", parse_duration)
.ok()
.unwrap_or_default();
let count = get_debug_argument_at(&args, 4, "debug_record_watch", "count", get_number)
.ok()
.unwrap_or(usize::MAX) as u32;
// Do a record watch
let ts = match rc
.watch_dht_values(key, subkeys, Timestamp::new(expiration), count)
.await
{
Err(e) => {
return Ok(format!("Can't watch DHT value: {}", e));
}
Ok(v) => v,
};
Ok(format!("Expiration at: {:?}", debug_ts(ts.as_u64())))
}
async fn debug_record_cancel(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?;
let subkeys = get_debug_argument_at(&args, 2, "debug_record_watch", "subkeys", get_subkeys)
.ok()
.unwrap_or_default();
// Do a record watch cancel
let still_active = match rc.cancel_dht_watch(key, subkeys).await {
Err(e) => {
return Ok(format!("Can't cancel DHT watch: {}", e));
}
Ok(v) => v,
};
Ok(format!(
"Still Active: {:?}",
if still_active { "true" } else { "false" }
))
}
async fn debug_record(&self, args: String) -> VeilidAPIResult<String> { async fn debug_record(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = let args: Vec<String> =
shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?; shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?;
@ -1607,6 +1683,10 @@ impl VeilidAPI {
self.debug_record_purge(args).await self.debug_record_purge(args).await
} else if command == "create" { } else if command == "create" {
self.debug_record_create(args).await self.debug_record_create(args).await
} else if command == "open" {
self.debug_record_open(args).await
} else if command == "close" {
self.debug_record_close(args).await
} else if command == "get" { } else if command == "get" {
self.debug_record_get(args).await self.debug_record_get(args).await
} else if command == "set" { } else if command == "set" {
@ -1615,6 +1695,10 @@ impl VeilidAPI {
self.debug_record_delete(args).await self.debug_record_delete(args).await
} else if command == "info" { } else if command == "info" {
self.debug_record_info(args).await self.debug_record_info(args).await
} else if command == "watch" {
self.debug_record_info(args).await
} else if command == "cancel" {
self.debug_record_info(args).await
} else { } else {
Ok(">>> Unknown command\n".to_owned()) Ok(">>> Unknown command\n".to_owned())
} }
@ -1676,10 +1760,14 @@ route allocate [ord|*ord] [rel] [<count>] [in|out]
record list <local|remote> record list <local|remote>
purge <local|remote> [bytes] purge <local|remote> [bytes]
create <dhtschema> [<cryptokind> [<safety>]] create <dhtschema> [<cryptokind> [<safety>]]
set <key>[+<safety>] <subkey> <writer> <data> open <key>[+<safety>] [<writer>]
get <key>[+<safety>] <subkey> [force] close [<key>]
set [<key>] <subkey> <data>
get [<key>] <subkey> [force]
delete <key> delete <key>
info <key> [subkey] info [<key>] [subkey]
watch [<key>] [<subkeys>] [<expiration>] [<count>]
cancel [<key>] [<subkeys>]
-------------------------------------------------------------------- --------------------------------------------------------------------
<key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4 <key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4
* also <node>, <relay>, <target>, <route> * also <node>, <relay>, <target>, <route>

View File

@ -142,3 +142,34 @@ pub fn debug_duration(dur: u64) -> String {
msecs msecs
) )
} }
pub fn parse_duration(s: &str) -> Option<u64> {
let mut dur_total: u64 = 0;
let mut dur: u64 = 0;
for c in s.as_bytes() {
match c {
b'0'..=b'9' => {
dur *= 10;
dur += (c - b'0') as u64;
}
b'h' => {
dur *= 3_600_000u64;
dur_total += dur;
dur = 0;
}
b'm' => {
dur *= 60_000u64;
dur_total += dur;
dur = 0;
}
b's' => {
dur *= 1_000u64;
dur_total += dur;
dur = 0;
}
_ => return None,
}
}
dur_total += dur;
Some(dur_total * 1_000u64)
}