diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 3044b122..e0345437 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -699,7 +699,11 @@ impl RouteSpecStore { } } // Remove from end nodes cache - match inner.cache.used_nodes.entry(*detail.hops.last().unwrap()) { + match inner + .cache + .used_end_nodes + .entry(*detail.hops.last().unwrap()) + { std::collections::hash_map::Entry::Occupied(mut o) => { *o.get_mut() -= 1; if *o.get() == 0 { @@ -707,7 +711,7 @@ impl RouteSpecStore { } } std::collections::hash_map::Entry::Vacant(_) => { - panic!("used_nodes cache should have contained hop"); + panic!("used_end_nodes cache should have contained hop"); } } } else { @@ -1161,12 +1165,24 @@ impl RouteSpecStore { let mut pr_builder = pr_message.init_root::(); encode_private_route(&private_route, &mut pr_builder) .wrap_err("failed to encode private route")?; - builder_to_vec(pr_message).wrap_err("failed to convert builder to vec") + + let mut buffer = vec![]; + capnp::serialize_packed::write_message(&mut buffer, &pr_message) + .wrap_err("failed to convert builder to vec")?; + Ok(buffer) + + // builder_to_vec(pr_message).wrap_err("failed to convert builder to vec") } /// Convert binary blob to private route pub fn blob_to_private_route(blob: Vec) -> EyreResult { - let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default()); + let reader = capnp::serialize_packed::read_message( + blob.as_slice(), + capnp::message::ReaderOptions::new(), + ) + .wrap_err("failed to make message reader")?; + + //let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default()); let pr_reader = reader .get_root::() .wrap_err("failed to make reader for private_route")?; diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index e0fc17a6..f2186b05 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -30,7 +30,6 @@ pub use rpc_status::*; use super::*; use crate::crypto::*; use crate::xx::*; -use capnp::message::ReaderSegments; use futures_util::StreamExt; use network_manager::*; use receipt_manager::*; @@ -98,18 +97,28 @@ impl RPCMessageData { pub fn new(contents: Vec) -> Self { Self { contents } } -} -impl ReaderSegments for RPCMessageData { - fn get_segment(&self, idx: u32) -> Option<&[u8]> { - if idx > 0 { - None - } else { - Some(self.contents.as_slice()) - } + pub fn get_reader( + &self, + ) -> Result, RPCError> { + capnp::serialize_packed::read_message( + self.contents.as_slice(), + capnp::message::ReaderOptions::new(), + ) + .map_err(RPCError::protocol) } } +// impl ReaderSegments for RPCMessageData { +// fn get_segment(&self, idx: u32) -> Option<&[u8]> { +// if idx > 0 { +// None +// } else { +// Some(self.contents.as_slice()) +// } +// } +// } + #[derive(Debug)] struct RPCMessageEncoded { header: RPCMessageHeader, @@ -127,12 +136,17 @@ pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder) -> Result(reader: &capnp::message::Reader) -> Result, RPCError> @@ -899,7 +913,7 @@ impl RPCProcessor { // Decode the RPC message let operation = { - let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); + let reader = encoded_msg.data.get_reader()?; let op_reader = reader .get_root::() .map_err(RPCError::protocol) @@ -945,7 +959,7 @@ impl RPCProcessor { RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { // Decode the RPC message let operation = { - let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); + let reader = encoded_msg.data.get_reader()?; let op_reader = reader .get_root::() .map_err(RPCError::protocol) diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 257a2b71..56848b18 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -325,15 +325,8 @@ impl RPCProcessor { .cached_dh(&route.safety_route.public_key, &node_id_secret) .map_err(RPCError::protocol)?; let dec_blob_data = Crypto::decrypt_aead(blob_data, &d.nonce, &dh_secret, None) - .map_err(RPCError::map_internal( - "decryption of safety route hop failed", - ))?; - let dec_blob_reader = capnp::message::Reader::new( - RPCMessageData { - contents: dec_blob_data, - }, - Default::default(), - ); + .map_err(RPCError::protocol)?; + let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?; // Decode the blob appropriately if blob_tag == 1 { @@ -387,15 +380,8 @@ impl RPCProcessor { .map_err(RPCError::protocol)?; let dec_blob_data = Crypto::decrypt_aead(&next_hop.blob, &next_hop.nonce, &dh_secret, None) - .map_err(RPCError::map_internal( - "decryption of private route hop failed", - ))?; - let dec_blob_reader = capnp::message::Reader::new( - RPCMessageData { - contents: dec_blob_data, - }, - Default::default(), - ); + .map_err(RPCError::protocol)?; + let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?; // Decode next RouteHop let route_hop = { diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 4ae72f28..ee00b36e 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -2,9 +2,19 @@ // Debugging use super::*; +use data_encoding::BASE64URL_NOPAD; use routing_table::*; use rpc_processor::*; +#[derive(Default, Debug)] +struct DebugCache { + imported_routes: Vec, +} + +static DEBUG_CACHE: Mutex = Mutex::new(DebugCache { + imported_routes: Vec::new(), +}); + fn get_bucket_entry_state(text: &str) -> Option { if text == "dead" { Some(BucketEntryState::Dead) @@ -44,6 +54,126 @@ fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option { }; } +fn get_safety_selection(text: &str, rss: RouteSpecStore) -> Option { + if text.len() == 0 { + return None; + } + if &text[0..1] == "-" { + // Unsafe + let text = &text[1..]; + let seq = get_sequencing(text).unwrap_or(Sequencing::NoPreference); + Some(SafetySelection::Unsafe(seq)) + } else { + // Safe + let mut preferred_route = None; + let mut hop_count = 2; + let mut stability = Stability::LowLatency; + let mut sequencing = Sequencing::NoPreference; + for x in text.split(",") { + let x = x.trim(); + if let Some(pr) = get_route_id(rss.clone())(x) { + preferred_route = Some(pr) + } + if let Some(n) = get_number(x) { + hop_count = n; + } + if let Some(s) = get_stability(x) { + stability = s; + } + if let Some(s) = get_sequencing(x) { + sequencing = s; + } + } + let ss = SafetySpec { + preferred_route, + hop_count, + stability, + sequencing, + }; + Some(SafetySelection::Safe(ss)) + } +} + +fn get_node_ref_modifiers(mut node_ref: NodeRef) -> impl FnOnce(&str) -> Option { + move |text| { + for m in text.split("/") { + if let Some(pt) = get_protocol_type(m) { + node_ref.merge_filter(NodeRefFilter::new().with_protocol_type(pt)); + } else if let Some(at) = get_address_type(m) { + node_ref.merge_filter(NodeRefFilter::new().with_address_type(at)); + } else if let Some(rd) = get_routing_domain(m) { + node_ref.merge_filter(NodeRefFilter::new().with_routing_domain(rd)); + } else { + return None; + } + } + Some(node_ref) + } +} + +fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option { + move |text| { + // Safety selection + let (text, ss) = if let Some((first, second)) = text.split_once('+') { + let ss = get_safety_selection(second, routing_table.route_spec_store())?; + (first, Some(ss)) + } else { + (text, None) + }; + if text.len() == 0 { + return None; + } + if &text[0..1] == "#" { + // Private route + let text = &text[1..]; + let n = get_number(text)?; + let dc = DEBUG_CACHE.lock(); + let r = dc.imported_routes.get(n)?; + Some(Destination::private_route( + r.clone(), + ss.unwrap_or(SafetySelection::Unsafe(Sequencing::NoPreference)), + )) + } else { + let (text, mods) = text + .split_once('/') + .map(|x| (x.0, Some(x.1))) + .unwrap_or((text, None)); + if let Some((first, second)) = text.split_once('@') { + // Relay + let relay_id = get_dht_key(second)?; + let mut relay_nr = routing_table.lookup_node_ref(relay_id)?; + let target_id = get_dht_key(first)?; + + if let Some(mods) = mods { + relay_nr = get_node_ref_modifiers(relay_nr)(mods)?; + } + + let mut d = Destination::relay(relay_nr, target_id); + if let Some(ss) = ss { + d = d.with_safety(ss) + } + + Some(d) + } else { + // Direct + let target_id = get_dht_key(text)?; + let mut target_nr = routing_table.lookup_node_ref(target_id)?; + + if let Some(mods) = mods { + target_nr = get_node_ref_modifiers(target_nr)(mods)?; + } + + let mut d = Destination::direct(target_nr); + if let Some(ss) = ss { + d = d.with_safety(ss) + } + + Some(d) + } + } + } +} + fn get_number(text: &str) -> Option { usize::from_str(text).ok() } @@ -51,6 +181,22 @@ fn get_dht_key(text: &str) -> Option { DHTKey::try_decode(text).ok() } +fn get_node_ref(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option { + move |text| { + let (text, mods) = text + .split_once('/') + .map(|x| (x.0, Some(x.1))) + .unwrap_or((text, None)); + + let node_id = get_dht_key(text)?; + let mut nr = routing_table.lookup_node_ref(node_id)?; + if let Some(mods) = mods { + nr = get_node_ref_modifiers(nr)(mods)?; + } + Some(nr) + } +} + fn get_protocol_type(text: &str) -> Option { let lctext = text.to_ascii_lowercase(); if lctext == "udp" { @@ -366,55 +512,19 @@ impl VeilidAPI { async fn debug_contact(&self, args: String) -> Result { let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); - let node_id = get_debug_argument_at(&args, 0, "debug_contact", "node_id", get_dht_key)?; - let network_manager = self.network_manager()?; let routing_table = network_manager.routing_table(); - let mut nr = match routing_table.lookup_node_ref(node_id) { - Some(nr) => nr, - None => return Ok("Node id not found in routing table".to_owned()), - }; - - let mut ai = 1; - let mut routing_domain = None; - while ai < args.len() { - if let Ok(pt) = get_debug_argument_at( - &args, - ai, - "debug_contact", - "protocol_type", - get_protocol_type, - ) { - nr.merge_filter(NodeRefFilter::new().with_protocol_type(pt)); - } else if let Ok(at) = - get_debug_argument_at(&args, ai, "debug_contact", "address_type", get_address_type) - { - nr.merge_filter(NodeRefFilter::new().with_address_type(at)); - } else if let Ok(rd) = get_debug_argument_at( - &args, - ai, - "debug_contact", - "routing_domain", - get_routing_domain, - ) { - if routing_domain.is_none() { - routing_domain = Some(rd); - } else { - return Ok("Multiple routing domains specified".to_owned()); - } - } else { - return Ok(format!("Invalid argument specified: {}", args[ai])); - } - ai += 1; - } - - if let Some(routing_domain) = routing_domain { - nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain)) - } + let node_ref = get_debug_argument_at( + &args, + 0, + "debug_contact", + "node_ref", + get_node_ref(routing_table), + )?; let cm = network_manager - .get_node_contact_method(nr) + .get_node_contact_method(node_ref) .map_err(VeilidAPIError::internal)?; Ok(format!("{:#?}", cm)) @@ -427,49 +537,17 @@ impl VeilidAPI { let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); - let node_id = get_debug_argument_at(&args, 0, "debug_ping", "node_id", get_dht_key)?; - - let mut nr = match routing_table.lookup_node_ref(node_id) { - Some(nr) => nr, - None => return Ok("Node id not found in routing table".to_owned()), - }; - - let mut ai = 1; - let mut routing_domain = None; - while ai < args.len() { - if let Ok(pt) = - get_debug_argument_at(&args, ai, "debug_ping", "protocol_type", get_protocol_type) - { - nr.merge_filter(NodeRefFilter::new().with_protocol_type(pt)); - } else if let Ok(at) = - get_debug_argument_at(&args, ai, "debug_ping", "address_type", get_address_type) - { - nr.merge_filter(NodeRefFilter::new().with_address_type(at)); - } else if let Ok(rd) = get_debug_argument_at( - &args, - ai, - "debug_ping", - "routing_domain", - get_routing_domain, - ) { - if routing_domain.is_none() { - routing_domain = Some(rd); - } else { - return Ok("Multiple routing domains specified".to_owned()); - } - } else { - return Ok(format!("Invalid argument specified: {}", args[ai])); - } - ai += 1; - } - - if let Some(routing_domain) = routing_domain { - nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain)) - } + let dest = get_debug_argument_at( + &args, + 0, + "debug_ping", + "destination", + get_destination(routing_table), + )?; // Dump routing table entry let out = match rpc - .rpc_call_status(Destination::direct(nr)) + .rpc_call_status(dest) .await .map_err(VeilidAPIError::internal)? { @@ -595,7 +673,14 @@ impl VeilidAPI { // Convert to blob let blob_data = RouteSpecStore::private_route_to_blob(&private_route) .map_err(VeilidAPIError::internal)?; - data_encoding::BASE64URL_NOPAD.encode(&blob_data) + let out = BASE64URL_NOPAD.encode(&blob_data); + info!( + "Published route {} as {} bytes:\n{}", + route_id.encode(), + blob_data.len(), + out + ); + format!("Published route {}", route_id.encode()) } Err(e) => { format!("Couldn't assemble private route: {}", e) @@ -646,9 +731,21 @@ impl VeilidAPI { } Ok(out) } - async fn debug_route_import(&self, _args: Vec) -> Result { + async fn debug_route_import(&self, args: Vec) -> Result { // - let out = format!(""); + + let blob = get_debug_argument_at(&args, 1, "debug_route", "blob", get_string)?; + let blob_dec = BASE64URL_NOPAD + .decode(blob.as_bytes()) + .map_err(VeilidAPIError::generic)?; + let pr = + RouteSpecStore::blob_to_private_route(blob_dec).map_err(VeilidAPIError::generic)?; + + let mut dc = DEBUG_CACHE.lock(); + let n = dc.imported_routes.len(); + let out = format!("Private route #{} imported: {}", n, pr.public_key); + dc.imported_routes.push(pr); + return Ok(out); } @@ -682,22 +779,34 @@ impl VeilidAPI { buckets [dead|reliable] dialinfo entries [dead|reliable] [limit] - entry + entry nodeinfo config [key [new value]] purge attach detach restart network - ping [protocol_type][address_type][routing_domain] - contact [protocol_type [address_type]] + ping + contact [] route allocate [ord|*ord] [rel] [] [in|out] - route release - route publish [full] - route unpublish - route print - route list - route import + release + publish [full] + unpublish + print + list + import + + is: + * direct: [+][] + * relay: @[+][] + * private: #[+] + is: + * unsafe: -[ord|*ord] + * safe: [route][,ord|*ord][,rel][,] + is: [/][/][/] + is: udp|tcp|ws|wss + is: ipv4|ipv6 + is: public|local "# .to_owned()) }