diff --git a/veilid-core/src/rpc_processor/coders/operations/answer.rs b/veilid-core/src/rpc_processor/coders/operations/answer.rs index c0b305ed..eb727600 100644 --- a/veilid-core/src/rpc_processor/coders/operations/answer.rs +++ b/veilid-core/src/rpc_processor/coders/operations/answer.rs @@ -14,6 +14,9 @@ impl RPCAnswer { pub fn detail(&self) -> &RPCAnswerDetail { &self.detail } + pub fn into_detail(self) -> RPCAnswerDetail { + self.detail + } pub fn desc(&self) -> &'static str { self.detail.desc() } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index 1d0d0cf6..06a7eae9 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -93,6 +93,10 @@ impl RPCOperation { &self.kind } + pub fn into_kind(&self) -> RPCOperationKind { + self.kind + } + pub fn decode( operation_reader: &veilid_capnp::operation::Reader, sender_node_id: &DHTKey, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs index becd3fea..54741954 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationCancelTunnelQ { - id: TunnelId, + pub id: TunnelId, } impl RPCOperationCancelTunnelQ { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs index bc11b474..66e1b21e 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs @@ -3,10 +3,10 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationCompleteTunnelQ { - id: TunnelId, - local_mode: TunnelMode, - depth: u8, - endpoint: TunnelEndpoint, + pub id: TunnelId, + pub local_mode: TunnelMode, + pub depth: u8, + pub endpoint: TunnelEndpoint, } impl RPCOperationCompleteTunnelQ { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs index b6fab81c..389a9db1 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationFindBlockQ { - block_id: DHTKey, + pub block_id: DHTKey, } impl RPCOperationFindBlockQ { @@ -28,9 +28,9 @@ impl RPCOperationFindBlockQ { #[derive(Debug, Clone)] pub struct RPCOperationFindBlockA { - data: Vec, - suppliers: Vec, - peers: Vec, + pub data: Vec, + pub suppliers: Vec, + pub peers: Vec, } impl RPCOperationFindBlockA { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs index 06f30455..972fe08a 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationFindNodeQ { - node_id: DHTKey, + pub node_id: DHTKey, } impl RPCOperationFindNodeQ { @@ -26,7 +26,7 @@ impl RPCOperationFindNodeQ { #[derive(Debug, Clone)] pub struct RPCOperationFindNodeA { - peers: Vec, + pub peers: Vec, } impl RPCOperationFindNodeA { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index 92070adb..af10bf4c 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationGetValueQ { - key: ValueKey, + pub key: ValueKey, } impl RPCOperationGetValueQ { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs b/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs index fc9d954d..0e92c0cc 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationNodeInfoUpdate { - signed_node_info: SignedNodeInfo, + pub signed_node_info: SignedNodeInfo, } impl RPCOperationNodeInfoUpdate { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs index 9e93cf29..70acb47a 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationReturnReceipt { - receipt: Vec, + pub receipt: Vec, } impl RPCOperationReturnReceipt { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs index 33de91a6..4691dd46 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs @@ -3,9 +3,9 @@ use rpc_processor::*; #[derive(Debug, Clone)] struct RoutedOperation { - signatures: Vec, - nonce: Nonce, - data: Vec, + pub signatures: Vec, + pub nonce: Nonce, + pub data: Vec, } impl RoutedOperation { @@ -62,8 +62,8 @@ impl RoutedOperation { #[derive(Debug, Clone)] pub struct RPCOperationRoute { - safety_route: SafetyRoute, - operation: RoutedOperation, + pub safety_route: SafetyRoute, + pub operation: RoutedOperation, } impl RPCOperationRoute { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs index 9b2494f3..dac59058 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs @@ -3,8 +3,8 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationSetValueQ { - key: ValueKey, - value: ValueData, + pub key: ValueKey, + pub value: ValueData, } impl RPCOperationSetValueQ { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs index 43c47aff..7feea158 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationSignal { - signal_info: SignalInfo, + pub signal_info: SignalInfo, } impl RPCOperationSignal { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs index 2600775a..ca211b00 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs @@ -3,9 +3,9 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationStartTunnelQ { - id: TunnelId, - local_mode: TunnelMode, - depth: u8, + pub id: TunnelId, + pub local_mode: TunnelMode, + pub depth: u8, } impl RPCOperationStartTunnelQ { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs index 09bf7a7a..7599484e 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationStatusQ { - node_status: NodeStatus, + pub node_status: NodeStatus, } impl RPCOperationStatusQ { @@ -26,8 +26,8 @@ impl RPCOperationStatusQ { #[derive(Debug, Clone)] pub struct RPCOperationStatusA { - node_status: NodeStatus, - sender_info: SenderInfo, + pub node_status: NodeStatus, + pub sender_info: SenderInfo, } impl RPCOperationStatusA { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs index 8b3b8191..6bc09053 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationSupplyBlockQ { - block_id: DHTKey, + pub block_id: DHTKey, } impl RPCOperationSupplyBlockQ { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs index 6a6e8456..86684185 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs @@ -3,9 +3,9 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationValidateDialInfo { - dial_info: DialInfo, - receipt: Vec, - redirect: bool, + pub dial_info: DialInfo, + pub receipt: Vec, + pub redirect: bool, } impl RPCOperationValidateDialInfo { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index b41e31d2..97515d4a 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -3,8 +3,8 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationValueChanged { - key: ValueKey, - value: ValueData, + pub key: ValueKey, + pub value: ValueData, } impl RPCOperationValueChanged { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs index 372e7dac..c8dcb237 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs @@ -3,7 +3,7 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub struct RPCOperationWatchValueQ { - key: ValueKey, + pub key: ValueKey, } impl RPCOperationWatchValueQ { @@ -26,8 +26,8 @@ impl RPCOperationWatchValueQ { #[derive(Debug, Clone)] pub struct RPCOperationWatchValueA { - expiration: u64, - peers: Vec, + pub expiration: u64, + pub peers: Vec, } impl RPCOperationWatchValueA { diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index 570b3677..165c822b 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -18,6 +18,12 @@ impl RPCQuestion { pub fn detail(&self) -> &RPCQuestionDetail { &self.detail } + pub fn into_detail(self) -> RPCQuestionDetail { + self.detail + } + pub fn into_respond_to_detail(self) -> (RespondTo, RPCQuestionDetail) { + (self.respond_to, self.detail) + } pub fn desc(&self) -> &'static str { self.detail.desc() } diff --git a/veilid-core/src/rpc_processor/coders/operations/statement.rs b/veilid-core/src/rpc_processor/coders/operations/statement.rs index c5947702..9dfffa83 100644 --- a/veilid-core/src/rpc_processor/coders/operations/statement.rs +++ b/veilid-core/src/rpc_processor/coders/operations/statement.rs @@ -14,6 +14,9 @@ impl RPCStatement { pub fn detail(&self) -> &RPCStatementDetail { &self.detail } + pub fn into_detail(self) -> RPCQuestionDetail { + self.detail + } pub fn desc(&self) -> &'static str { self.detail.desc() } diff --git a/veilid-core/src/rpc_processor/coders/tunnel.rs b/veilid-core/src/rpc_processor/coders/tunnel.rs index 2f413fa7..90beef94 100644 --- a/veilid-core/src/rpc_processor/coders/tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/tunnel.rs @@ -47,7 +47,10 @@ pub fn decode_tunnel_endpoint( reader: &veilid_capnp::tunnel_endpoint::Reader, ) -> Result { let mode = decode_tunnel_mode(reader.get_mode().map_err(map_error_capnp_notinschema!())?); - let description = reader.get_description(); + let description = reader + .get_description() + .map_err(map_error_capnp_error!())? + .to_owned(); Ok(TunnelEndpoint { mode, description }) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 717ccff2..7bff1129 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,10 +1,27 @@ mod coders; mod debug; mod private_route; +mod rpc_cancel_tunnel; +mod rpc_complete_tunnel; +mod rpc_find_block; +mod rpc_find_node; +mod rpc_get_value; +mod rpc_node_info_update; +mod rpc_return_receipt; +mod rpc_route; +mod rpc_set_value; +mod rpc_signal; +mod rpc_start_tunnel; +mod rpc_status; +mod rpc_supply_block; +mod rpc_validate_dial_info; +mod rpc_value_changed; +mod rpc_watch_value; pub use debug::*; pub use private_route::*; +use super::*; use crate::dht::*; use crate::xx::*; use capnp::message::ReaderSegments; @@ -14,7 +31,6 @@ use network_manager::*; use receipt_manager::*; use routing_table::*; use stop_token::future::FutureExt; -use super::*; ///////////////////////////////////////////////////////////////////// @@ -45,7 +61,7 @@ impl fmt::Display for Destination { #[derive(Debug, Clone)] struct RPCMessageHeader { - timestamp: u64, // time the message was received, not sent + timestamp: u64, // time the message was received, not sent envelope: Envelope, body_len: u64, peer_noderef: NodeRef, // ensures node doesn't get evicted from routing table until we're done with it @@ -53,7 +69,7 @@ struct RPCMessageHeader { #[derive(Debug)] struct RPCMessageData { - contents: Vec, // rpc messages must be a canonicalized single segment + contents: Vec, // rpc messages must be a canonicalized single segment } impl ReaderSegments for RPCMessageData { @@ -67,14 +83,14 @@ impl ReaderSegments for RPCMessageData { } #[derive(Debug)] -struct RPCMessage { +struct RPCMessageEncoded { header: RPCMessageHeader, data: RPCMessageData, } -struct RPCMessageReader { +struct RPCMessage { header: RPCMessageHeader, - reader: capnp::message::Reader, + operation: RPCOperation, opt_sender_nr: Option, } @@ -103,7 +119,7 @@ where #[derive(Debug)] struct WaitableReply { op_id: OperationId, - eventual: EventualValue, + eventual: EventualValue, timeout: u64, node_ref: NodeRef, send_ts: u64, @@ -113,23 +129,21 @@ struct WaitableReply { ///////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, Default)] -pub struct StatusAnswer { - pub latency: u64, - pub node_status: NodeStatus, - pub sender_info: SenderInfo, +pub struct Answer { + pub latency: u64, // how long it took to get this answer + pub answer: T, // the answer itself } - -#[derive(Clone, Debug, Default)] -pub struct FindNodeAnswer { - pub latency: u64, // how long it took to get this answer - pub peers: Vec, // the list of closer peers +impl Answer { + pub fn new(latency: u64, answer: T) -> Self { + Self { latency, answer } + } } struct RenderedOperation { - out: Vec, // The rendered operation bytes - out_node_id: DHTKey, // Node id we're sending to - out_noderef: Option, // Node to send envelope to (may not be destination node id in case of relay) - hopcount: usize, // Total safety + private route hop count + out: Vec, // The rendered operation bytes + out_node_id: DHTKey, // Node id we're sending to + out_noderef: Option, // Node to send envelope to (may not be destination node id in case of relay) + hopcount: usize, // Total safety + private route hop count } ///////////////////////////////////////////////////////////////////// @@ -138,10 +152,10 @@ pub struct RPCProcessorInner { routing_table: RoutingTable, node_id: DHTKey, node_id_secret: DHTKeySecret, - send_channel: Option>, + send_channel: Option>, timeout: u64, max_route_hop_count: usize, - waiting_rpc_table: BTreeMap>, + waiting_rpc_table: BTreeMap>, stop_source: Option, worker_join_handles: Vec>, } @@ -255,10 +269,7 @@ impl RPCProcessor { // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference // Note: This routine can possible be recursive, hence the SystemPinBoxFuture async form - pub fn resolve_node( - &self, - node_id: DHTKey, - ) -> SystemPinBoxFuture> { + pub fn resolve_node(&self, node_id: DHTKey) -> SystemPinBoxFuture> { let this = self.clone(); Box::pin(async move { let routing_table = this.routing_table(); @@ -296,7 +307,7 @@ impl RPCProcessor { } // set up wait for reply - fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue { + fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue { let mut inner = self.inner.lock(); let e = EventualValue::new(); inner.waiting_rpc_table.insert(op_id, e.clone()); @@ -310,11 +321,8 @@ impl RPCProcessor { } // complete the reply - async fn complete_op_id_waiter( - &self, - op_id: OperationId, - rpcreader: RPCMessageReader, - ) -> Result<(), RPCError> { + async fn complete_op_id_waiter(&self, msg: RPCMessage) -> Result<(), RPCError> { + let op_id = msg.operation.op_id(); let eventual = { let mut inner = self.inner.lock(); inner @@ -322,7 +330,7 @@ impl RPCProcessor { .remove(&op_id) .ok_or_else(|| rpc_error_internal("Unmatched operation id"))? }; - eventual.resolve(rpcreader).await; + eventual.resolve(msg).await; Ok(()) } @@ -330,7 +338,7 @@ impl RPCProcessor { async fn do_wait_for_reply( &self, waitable_reply: &WaitableReply, - ) -> Result<(RPCMessageReader, u64), RPCError> { + ) -> Result<(RPCMessage, u64), RPCError> { let timeout_ms = u32::try_from(waitable_reply.timeout / 1000u64) .map_err(map_error_internal!("invalid timeout"))?; // wait for eventualvalue @@ -342,10 +350,11 @@ impl RPCProcessor { let end_ts = intf::get_timestamp(); Ok((rpcreader, end_ts - start_ts)) } + async fn wait_for_reply( &self, waitable_reply: WaitableReply, - ) -> Result<(RPCMessageReader, u64), RPCError> { + ) -> Result<(RPCMessage, u64), RPCError> { let out = self.do_wait_for_reply(&waitable_reply).await; match &out { Err(_) => { @@ -372,9 +381,32 @@ impl RPCProcessor { out } + // Gets a 'RespondTo::Sender' that contains either our dial info, + // or None if the peer has seen our dial info before or our node info is not yet valid + // because of an unknown network class + pub fn make_respond_to_sender(&self, peer: NodeRef) -> RespondTo { + if peer.has_seen_our_node_info() + || matches!( + self.network_manager() + .get_network_class() + .unwrap_or(NetworkClass::Invalid), + NetworkClass::Invalid + ) + { + RespondTo::Sender(None) + } else { + let our_sni = self.routing_table().get_own_signed_node_info(); + RespondTo::Sender(Some(our_sni)) + } + } + #[instrument(level = "debug", skip(self, operation, safety_route_spec), err)] - fn render_operation(&self, dest: Destination, operation: &RPCOperation, safety_route_spec: Option<&SafetyRouteSpec>) -> Result - { + fn render_operation( + &self, + dest: &Destination, + operation: &RPCOperation, + safety_route_spec: Option<&SafetyRouteSpec>, + ) -> Result { // Encode message to a builder and make a message reader for it let mut msg_builder = ::capnp::message::Builder::new_default(); let mut op_builder = msg_builder.init_root::(); @@ -395,7 +427,7 @@ impl RPCProcessor { // Get the actual destination node id accounting for relays let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest { - (node_ref.clone(), dht_key) + (node_ref.clone(), dht_key.clone()) } else { let node_id = node_ref.node_id(); (node_ref.clone(), node_id) @@ -481,7 +513,7 @@ impl RPCProcessor { if hopcount > self.inner.lock().max_route_hop_count { return Err(rpc_error_internal("hop count too long for route")) .map_err(logthru_rpc!(warn)); - } + } Ok(RenderedOperation { out, @@ -499,20 +531,22 @@ impl RPCProcessor { question: RPCQuestion, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result { - // Wrap question in operation let operation = RPCOperation::new_question(question); // Produce rendered operation let RenderedOperation { - out, out_node_id, out_noderef, hopcount - } = self.render_operation(dest, &operation, safety_route_spec)?; + out, + out_node_id, + out_noderef, + hopcount, + } = self.render_operation(&dest, &operation, safety_route_spec)?; // Calculate answer timeout // Timeout is number of hops times the timeout per hop let timeout = self.inner.lock().timeout * (hopcount as u64); - // if we need to resolve the first hop, do it + // If we need to resolve the first hop, do it let node_ref = match out_noderef { None => { // resolve node @@ -531,7 +565,7 @@ impl RPCProcessor { let eventual = self.add_op_id_waiter(op_id); // Log rpc send - debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc()); + debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest); // Send question let bytes = out.len() as u64; @@ -569,137 +603,125 @@ impl RPCProcessor { }) } - xxxx continue here, make 'statement' sender + // Issue a statement over the network, possibly using an anonymized route + #[instrument(level = "debug", skip(self, statement, safety_route_spec), err)] + async fn statement( + &self, + dest: Destination, + statement: RPCStatement, + safety_route_spec: Option<&SafetyRouteSpec>, + ) -> Result<(), RPCError> { + // Wrap statement in operation + let operation = RPCOperation::new_statement(statement); + + // Produce rendered operation + let RenderedOperation { + out, + out_node_id, + out_noderef, + hopcount, + } = self.render_operation(&dest, &operation, safety_route_spec)?; + + // Calculate answer timeout + // Timeout is number of hops times the timeout per hop + let timeout = self.inner.lock().timeout * (hopcount as u64); + + // If we need to resolve the first hop, do it + let node_ref = match out_noderef { + None => { + // resolve node + self.resolve_node(out_node_id) + .await + .map_err(logthru_rpc!(error))? + } + Some(nr) => { + // got the node in the routing table already + nr + } + }; + + // Log rpc send + debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); + + // Send statement + let bytes = out.len() as u64; + let send_ts = intf::get_timestamp(); + let send_data_kind = match self + .network_manager() + .send_envelope(node_ref.clone(), Some(out_node_id), out) + .await + .map_err(RPCError::Internal) + { + Ok(v) => v, + Err(e) => { + self.routing_table() + .stats_failed_to_send(node_ref, send_ts, true); + return Err(e); + } + }; + + // Successfully sent + self.routing_table() + .stats_question_sent(node_ref.clone(), send_ts, bytes, true); + + Ok(()) + } + + // Convert the 'RespondTo' into a 'Destination' for a response + fn get_respond_to_destination(&self, request: &RPCMessage) -> Destination { + // Get the question 'respond to' + let respond_to = match request.operation.kind() { + RPCOperationKind::Question(q) => q.respond_to(), + _ => { + panic!("not a question"); + } + }; + + // To where should we respond? + match respond_to { + RespondTo::Sender(_) => { + // Reply directly to the request's source + let sender_id = request.header.envelope.get_sender_id(); + + // This may be a different node's reference than the 'sender' in the case of a relay + let peer_noderef = request.header.peer_noderef.clone(); + + // If the sender_id is that of the peer, then this is a direct reply + // else it is a relayed reply through the peer + if peer_noderef.node_id() == sender_id { + Destination::Direct(peer_noderef) + } else { + Destination::Relay(peer_noderef, sender_id) + } + } + RespondTo::PrivateRoute(pr) => Destination::PrivateRoute(pr.clone()), + } + } // Issue a reply over the network, possibly using an anonymized route // The request must want a response, or this routine fails - #[instrument(level = "debug", skip(self, request_rpcreader, reply_msg, safety_route_spec), err)] - async fn answer( + #[instrument(level = "debug", skip(self, request, answer, safety_route_spec), err)] + async fn answer( &self, - request_rpcreader: RPCMessageReader, - reply_msg: capnp::message::Reader, + request: RPCMessage, + answer: RPCAnswer, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result<(), RPCError> { - // - let out_node_id; - let mut out_noderef: Option = None; + // Wrap answer in operation + let operation = RPCOperation::new_answer(&request.operation, answer); - let out = { - let out; + // Extract destination from respond_to + let dest = self.get_respond_to_destination(&request); - let request_operation = request_rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; + // Produce rendered operation + let RenderedOperation { + out, + out_node_id, + out_noderef, + hopcount, + } = self.render_operation(&dest, &operation, safety_route_spec)?; - let reply_vec = reader_to_vec(&reply_msg)?; - - // To where should we respond? - match request_operation - .get_respond_to() - .which() - .map_err(map_error_internal!("invalid request operation")) - .map_err(logthru_rpc!())? - { - veilid_capnp::operation::respond_to::None(_) => { - // Do not respond - // -------------- - return Err(rpc_error_internal("no response requested")) - .map_err(logthru_rpc!()); - } - veilid_capnp::operation::respond_to::SenderWithInfo(_) - | veilid_capnp::operation::respond_to::Sender(_) => { - // Respond to envelope source node, possibly through a relay if the request arrived that way - // ------------------------------- - match safety_route_spec { - None => { - // If no safety route is being used, and we're not replying to a private - // route, we can use a direct envelope instead of routing - out = reply_vec; - - // Reply directly to the request's source - out_node_id = request_rpcreader.header.envelope.get_sender_id(); - - // This may be a different node's reference than the 'sender' in the case of a relay - // But in that case replies to inbound requests are returned through the inbound relay anyway - out_noderef = Some(request_rpcreader.header.peer_noderef.clone()); - } - Some(sr) => { - // No private route was specified for the return - // but we are using a safety route, so we must create an empty private route - let mut pr_builder = ::capnp::message::Builder::new_default(); - let private_route = self - .new_stub_private_route( - request_rpcreader.header.envelope.get_sender_id(), - &mut pr_builder, - ) - .map_err(logthru_rpc!())?; - - out = self.wrap_with_route(Some(sr), private_route, reply_vec)?; - // first - out_node_id = sr - .hops - .first() - .ok_or_else(|| rpc_error_internal("no hop in safety route")) - .map_err(logthru_rpc!())? - .dial_info - .node_id - .key; - } - }; - } - veilid_capnp::operation::respond_to::PrivateRoute(pr) => { - // Respond to private route - // ------------------------ - - // Extract private route for reply - let private_route = match pr { - Ok(v) => v, - Err(_) => { - return Err(rpc_error_internal("invalid private route")) - .map_err(logthru_rpc!()) - } - }; - - // Reply with 'route' operation - out = self.wrap_with_route(safety_route_spec, private_route, reply_vec)?; - out_node_id = match safety_route_spec { - None => { - // If no safety route, the first node is the first hop of the private route - if !private_route.has_first_hop() { - return Err(rpc_error_internal("private route has no hops")) - .map_err(logthru_rpc!()); - } - let hop = private_route - .get_first_hop() - .map_err(map_error_internal!("not a valid first hop"))?; - decode_public_key( - &hop.get_dial_info() - .map_err(map_error_internal!("not a valid dial info")) - .map_err(logthru_rpc!())? - .get_node_id() - .map_err(map_error_internal!("not a valid node id")) - .map_err(logthru_rpc!())?, - ) - } - Some(sr) => { - // If safety route is in use, first node is the first hop of the safety route - sr.hops - .first() - .ok_or_else(|| rpc_error_internal("no hop in safety route")) - .map_err(logthru_rpc!())? - .dial_info - .node_id - .key - } - } - } - } - out - }; - - // if we need to resolve the first hop, do it + // If we need to resolve the first hop, do it let node_ref = match out_noderef { None => { // resolve node @@ -711,15 +733,10 @@ impl RPCProcessor { } }; + // Log rpc send + debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); + // Send the reply - log_rpc!(debug "==>> REPLY({}) -> {}{:?}", - self.get_rpc_message_debug_info(&reply_msg), - if out_node_id == node_ref.node_id() { - "".to_owned() - } else { - format!("{} via ", out_node_id) - }, - node_ref); let bytes = out.len() as u64; let send_ts = intf::get_timestamp(); self.network_manager() @@ -738,8 +755,6 @@ impl RPCProcessor { Ok(()) } - ////////////////////////////////////////////////////////////////////// - async fn generate_sender_info(peer_noderef: NodeRef) -> SenderInfo { let socket_address = peer_noderef .last_connection() @@ -748,564 +763,117 @@ impl RPCProcessor { SenderInfo { socket_address } } - async fn process_status_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - let peer_noderef = rpcreader.header.peer_noderef.clone(); - let sender_info = Self::generate_sender_info(peer_noderef).await; - - let reply_msg = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // Don't bother unless we are going to answer - if !Self::wants_answer(&operation)? { - return Ok(()); - } - - // get StatusQ reader - let which_reader = operation.get_detail().which().expect("missing which operation"); - let statusq_reader = match which_reader { - veilid_capnp::operation::detail::Which::StatusQ(Ok(x)) => x, - _ => panic!("invalid operation type in process_status_q"), - }; - - // Parse out fields - let node_status = decode_node_status( - &statusq_reader - .get_node_status() - .map_err(map_error_internal!("no valid node status"))?, - )?; - - // update node status for the requesting node to our routing table - if let Some(sender_nr) = rpcreader.opt_sender_nr.clone() { - // Update latest node status in routing table for the statusq sender - sender_nr.operate_mut(|e| { - e.update_node_status(node_status); - }); - } - - // Send info answer - let mut reply_msg = ::capnp::message::Builder::new_default(); - let mut answer = reply_msg.init_root::(); - answer.set_op_id(operation.get_op_id()); - let mut respond_to = answer.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = answer.reborrow().init_detail(); - let mut status_a = detail.init_status_a(); - - // Add node status - let node_status = self.network_manager().generate_node_status(); - let mut nsb = status_a.reborrow().init_node_status(); - encode_node_status(&node_status, &mut nsb)?; - - // Add sender info - let mut sib = status_a.reborrow().init_sender_info(); - encode_sender_info(&sender_info, &mut sib)?; - - reply_msg.into_reader() - }; - - self.reply(rpcreader, reply_msg, None).await - } - - async fn process_validate_dial_info( - &self, - rpcreader: RPCMessageReader, - ) -> Result<(), RPCError> { - // - let (redirect, dial_info, receipt) = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format( - "validate dial info should not want answer", - )); - } - - // get validateDialInfo reader - let vdi_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::ValidateDialInfo(Ok(x))) => x, - _ => panic!("invalid operation type in process_validate_dial_info"), - }; - - // Parse out fields - let redirect = vdi_reader.get_redirect(); - let dial_info = decode_dial_info( - &vdi_reader - .get_dial_info() - .map_err(map_error_internal!("no valid dial info in process_validate_dial_info"))?, - )?; - let receipt = vdi_reader - .get_receipt() - .map_err(map_error_internal!("no valid receipt in process_validate_dial_info"))?.to_vec(); - - (redirect, dial_info, receipt) - }; - - // Redirect this request if we are asked to - if redirect { - - // Find peers capable of validating this dial info - // We filter on the -outgoing- protocol capability status not the node's dial info - // Use the address type though, to ensure we reach an ipv6 capable node if this is - // an ipv6 address - let routing_table = self.routing_table(); - let filter = DialInfoFilter::global().with_address_type(dial_info.address_type()); - let sender_id = rpcreader.header.envelope.get_sender_id(); - let node_count = { - let c = self.config.get(); - c.network.dht.max_find_node_count as usize - }; - let mut peers = routing_table.find_fast_public_nodes_filtered(node_count, &filter); - if peers.is_empty() { - return Err(rpc_error_internal(format!( - "no peers matching filter '{:?}'", - filter - ))); - } - for peer in &mut peers { - - // Ensure the peer is not the one asking for the validation - if peer.node_id() == sender_id { - continue; - } - - // Release the filter on the peer because we don't need to send the redirect with the filter - // we just wanted to make sure we only selected nodes that were capable of - // using the correct protocol for the dial info being validated - peer.set_filter(None); - - // Ensure the peer's status is known and that it is capable of - // making outbound connections for the dial info we want to verify - // and if this peer can validate dial info - let can_contact_dial_info = peer.operate(|e: &BucketEntryInner| { - if let Some(ni) = e.node_info() { - ni.outbound_protocols.contains(dial_info.protocol_type()) && ni.can_validate_dial_info() - } else { - false - } - }); - if !can_contact_dial_info { - continue; - } - - // See if this peer will validate dial info - let will_validate_dial_info = peer.operate(|e: &BucketEntryInner| { - if let Some(status) = &e.peer_stats().status { - status.will_validate_dial_info - } else { - true - } - }); - if !will_validate_dial_info { - continue; - } - - // Make a copy of the request, without the redirect flag - let vdi_msg_reader = { - let mut vdi_msg = ::capnp::message::Builder::new_default(); - let mut question = vdi_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let mut vdi_builder = detail.init_validate_dial_info(); - vdi_builder.set_redirect(false); - let mut di_builder = vdi_builder.reborrow().init_dial_info(); - encode_dial_info(&dial_info, &mut di_builder)?; - let r_builder = vdi_builder.init_receipt(receipt.len().try_into().map_err( - map_error_protocol!("invalid receipt length in process_validate_dial_info"), - )?); - r_builder.copy_from_slice(&receipt); - vdi_msg.into_reader() - }; - - // Send the validate_dial_info request until we succeed - self.request(Destination::Direct(peer.clone()), vdi_msg_reader, None) - .await?; - } - return Ok(()); - }; - - // Otherwise send a return receipt directly - // Possibly from an alternate port - let network_manager = self.network_manager(); - network_manager - .send_out_of_band_receipt(dial_info.clone(), receipt) - .await - .map_err(map_error_string!()) - .map_err( - logthru_net!(error "failed to send direct receipt to dial info: {}", dial_info), - )?; - - Ok(()) - } - - async fn process_find_node_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - // - let reply_msg = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // find_node must always want an answer - if !self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format("find_node_q should want answer")); - } - - // get findNodeQ reader - let fnq_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::FindNodeQ(Ok(x))) => x, - _ => panic!("invalid operation type in process_find_node_q"), - }; - - // get the node id we want to look up - let target_node_id = decode_public_key( - &fnq_reader - .get_node_id() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?, - ); - - // add node information for the requesting node to our routing table - let routing_table = self.routing_table(); - - // find N nodes closest to the target node in our routing table - let own_peer_info = routing_table.get_own_peer_info(); - let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); - - let closest_nodes = routing_table.find_closest_nodes( - target_node_id, - // filter - Some(move |_k, v| { - RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid) - }), - // transform - move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info), - ); - log_rpc!(">>>> Returning {} closest peers", closest_nodes.len()); - - // Send find_node answer - let mut reply_msg = ::capnp::message::Builder::new_default(); - let mut answer = reply_msg.init_root::(); - answer.set_op_id(operation.get_op_id()); - let mut respond_to = answer.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = answer.reborrow().init_detail(); - let fna = detail.init_find_node_a(); - let mut peers_builder = fna.init_peers( - closest_nodes - .len() - .try_into() - .map_err(map_error_internal!("invalid closest nodes list length"))?, - ); - for (i, closest_node) in closest_nodes.iter().enumerate() { - let mut pi_builder = peers_builder.reborrow().get(i as u32); - encode_peer_info(closest_node, &mut pi_builder)?; - } - reply_msg.into_reader() - }; - - self.reply(rpcreader, reply_msg, None).await - } - - async fn process_route(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - // xxx do not process latency for routed messages - Err(rpc_error_unimplemented("process_route")) - } - - async fn process_node_info_update(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - // - let sender_node_id = rpcreader.header.envelope.get_sender_id(); - let signed_node_info = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format( - "node_info_update should not want answer", - )); - } - - // get nodeInfoUpdate reader - let niumsg_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::NodeInfoUpdate(Ok(x))) => x, - _ => panic!("invalid operation type in process_node_info_update"), - }; - - // Parse out fields - let sni_reader = niumsg_reader - .get_signed_node_info() - .map_err(map_error_internal!("no valid signed node info"))?; - decode_signed_node_info(&sni_reader, &sender_node_id, true)? - }; - - // Update our routing table with signed node info - if !self.filter_peer_scope(&signed_node_info.node_info) { - return Err(rpc_error_invalid_format( - "node_info_update has invalid peer scope", - )); - } - let _ = self - .routing_table() - .register_node_with_signed_node_info(sender_node_id, signed_node_info) - .map_err(RPCError::Internal)?; - - Ok(()) - } - - async fn process_get_value_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_get_value_q")) - } - - async fn process_set_value_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_set_value_q")) - } - - async fn process_watch_value_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_watch_value_q")) - } - - async fn process_value_changed(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_value_changed")) - } - - async fn process_supply_block_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_supply_block_q")) - } - - async fn process_find_block_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_find_block_q")) - } - - async fn process_signal(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - let signal_info = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format("signal should not want answer")); - } - - // get signal reader - let sig_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::Signal(Ok(x))) => x, - _ => panic!("invalid operation type in process_signal"), - }; - - // Get signal info - decode_signal_info(&sig_reader)? - }; - - // Handle it - let network_manager = self.network_manager(); - network_manager - .handle_signal(signal_info) - .await - .map_err(map_error_string!()) - } - - async fn process_return_receipt(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - let receipt = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format( - "return receipt should not want answer", - )); - } - - // get returnReceipt reader - let rr_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::ReturnReceipt(Ok(x))) => x, - _ => panic!("invalid operation type in process_return_receipt"), - }; - - // Get receipt - rr_reader - .get_receipt() - .map_err(map_error_internal!("no valid receipt in process_return_receipt"))?.to_vec() - }; - - // Handle it - let network_manager = self.network_manager(); - network_manager - .handle_in_band_receipt(receipt, rpcreader.header.peer_noderef) - .await - .map_err(map_error_string!()) - } - - async fn process_start_tunnel_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_start_tunnel_q")) - } - - async fn process_complete_tunnel_q( - &self, - _rpcreader: RPCMessageReader, - ) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_complete_tunnel_q")) - } - - async fn process_cancel_tunnel_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_cancel_tunnel_q")) - } - - async fn process_answer(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { - // pass answer to the appropriate rpc waiter - let op_id = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - operation.get_op_id() - }; - - Ok(self.complete_op_id_waiter(op_id, rpcreader).await?) - } - ////////////////////////////////////////////////////////////////////// - async fn process_rpc_message_version_0(&self, msg: RPCMessage) -> Result<(), RPCError> { - let reader = capnp::message::Reader::new(msg.data, Default::default()); - let sender_node_id = msg.header.envelope.get_sender_id(); + async fn process_rpc_message_version_0( + &self, + encoded_msg: RPCMessageEncoded, + ) -> Result<(), RPCError> { + // Make an operation reader + let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); + let sender_node_id = encoded_msg.header.envelope.get_sender_id(); + let operation = reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // Decode the RPC message + let operation = RPCOperation::decode(&operation, &sender_node_id)?; + + // Get the sender noderef, incorporating and 'sender node info' we have from a question let mut opt_sender_nr: Option = None; - let which = { - let operation = reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - let which_reader = operation - .get_detail() - .which() - .map_err(map_error_capnp_notinschema!())?; - - let (which, is_q) = match which_reader - { - veilid_capnp::operation::detail::StatusQ(_) => (0u32, true), - veilid_capnp::operation::detail::StatusA(_) => (1u32, false), - veilid_capnp::operation::detail::ValidateDialInfo(_) => (2u32, true), - veilid_capnp::operation::detail::FindNodeQ(_) => (3u32, true), - veilid_capnp::operation::detail::FindNodeA(_) => (4u32, false), - veilid_capnp::operation::detail::Route(_) => (5u32, true), - veilid_capnp::operation::detail::NodeInfoUpdate(_) => (6u32, true), - veilid_capnp::operation::detail::GetValueQ(_) => (7u32, true), - veilid_capnp::operation::detail::GetValueA(_) => (8u32, false), - veilid_capnp::operation::detail::SetValueQ(_) => (9u32, true), - veilid_capnp::operation::detail::SetValueA(_) => (10u32, false), - veilid_capnp::operation::detail::WatchValueQ(_) => (11u32, true), - veilid_capnp::operation::detail::WatchValueA(_) => (12u32, false), - veilid_capnp::operation::detail::ValueChanged(_) => (13u32, true), - veilid_capnp::operation::detail::SupplyBlockQ(_) => (14u32, true), - veilid_capnp::operation::detail::SupplyBlockA(_) => (15u32, false), - veilid_capnp::operation::detail::FindBlockQ(_) => (16u32, true), - veilid_capnp::operation::detail::FindBlockA(_) => (17u32, false), - veilid_capnp::operation::detail::Signal(_) => (18u32, true), - veilid_capnp::operation::detail::ReturnReceipt(_) => (19u32, true), - veilid_capnp::operation::detail::StartTunnelQ(_) => (20u32, true), - veilid_capnp::operation::detail::StartTunnelA(_) => (21u32, false), - veilid_capnp::operation::detail::CompleteTunnelQ(_) => (22u32, true), - veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false), - veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true), - veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), - }; - - // Log rpc receive - debug!(target: "rpc_message", dir = "recv", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode()); - - // Accounting for questions we receive - if is_q { - // See if we have some Sender NodeInfo to incorporate - opt_sender_nr = if let Some(sender_ni) = - self.get_respond_to_sender_signed_node_info(&operation, &sender_node_id)? - { - // Sender NodeInfo was specified, update our routing table with it - if !self.filter_peer_scope(&sender_ni.node_info) { - return Err(rpc_error_invalid_format( - "respond_to_sender_signed_node_info has invalid peer scope", - )); + match operation.kind() { + RPCOperationKind::Question(q) => { + match q.respond_to() { + RespondTo::Sender(Some(sender_ni)) => { + // Sender NodeInfo was specified, update our routing table with it + if !self.filter_peer_scope(&sender_ni.node_info) { + return Err(rpc_error_invalid_format( + "respond_to_sender_signed_node_info has invalid peer scope", + )); + } + let nr = self + .routing_table() + .register_node_with_signed_node_info(sender_node_id, sender_ni.clone()) + .map_err(RPCError::Internal)?; + opt_sender_nr = Some(nr); } - let nr = self - .routing_table() - .register_node_with_signed_node_info(sender_node_id, sender_ni) - .map_err(RPCError::Internal)?; - Some(nr) - } else { - // look up sender node, in case it's different than our peer due to relaying - self.routing_table().lookup_node_ref(sender_node_id) - }; + _ => {} + } + } + _ => {} + }; + if opt_sender_nr.is_none() { + // look up sender node, in case it's different than our peer due to relaying + opt_sender_nr = self.routing_table().lookup_node_ref(sender_node_id) + } - if let Some(sender_nr) = opt_sender_nr.clone() { + // Make the RPC message + let msg = RPCMessage { + header: encoded_msg.header, + operation, + opt_sender_nr, + }; + + // Process stats + let kind = match msg.operation.kind() { + RPCOperationKind::Question(_) => { + if let Some(sender_nr) = msg.opt_sender_nr.clone() { self.routing_table().stats_question_rcvd( sender_nr, msg.header.timestamp, msg.header.body_len, ); } - }; - - which + "question" + } + RPCOperationKind::Statement(_) => { + if let Some(sender_nr) = msg.opt_sender_nr.clone() { + self.routing_table().stats_question_rcvd( + sender_nr, + msg.header.timestamp, + msg.header.body_len, + ); + } + "statement" + } + RPCOperationKind::Answer(_) => { + // Answer stats are processed in wait_for_reply + "answer" + } }; - let rpcreader = RPCMessageReader { - header: msg.header, - reader, - opt_sender_nr, - }; + // Log rpc receive + debug!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id(), desc = msg.operation.kind().desc(), sender_id = ?sender_node_id); - match which { - 0 => self.process_status_q(rpcreader).await, // StatusQ - 1 => self.process_answer(rpcreader).await, // StatusA - 2 => self.process_validate_dial_info(rpcreader).await, // ValidateDialInfo - 3 => self.process_find_node_q(rpcreader).await, // FindNodeQ - 4 => self.process_answer(rpcreader).await, // FindNodeA - 5 => self.process_route(rpcreader).await, // Route - 6 => self.process_node_info_update(rpcreader).await, // NodeInfoUpdate - 7 => self.process_get_value_q(rpcreader).await, // GetValueQ - 8 => self.process_answer(rpcreader).await, // GetValueA - 9 => self.process_set_value_q(rpcreader).await, // SetValueQ - 10 => self.process_answer(rpcreader).await, // SetValueA - 11 => self.process_watch_value_q(rpcreader).await, // WatchValueQ - 12 => self.process_answer(rpcreader).await, // WatchValueA - 13 => self.process_value_changed(rpcreader).await, // ValueChanged - 14 => self.process_supply_block_q(rpcreader).await, // SupplyBlockQ - 15 => self.process_answer(rpcreader).await, // SupplyBlockA - 16 => self.process_find_block_q(rpcreader).await, // FindBlockQ - 17 => self.process_answer(rpcreader).await, // FindBlockA - 18 => self.process_signal(rpcreader).await, // SignalQ - 19 => self.process_return_receipt(rpcreader).await, // ReturnReceipt - 20 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ - 21 => self.process_answer(rpcreader).await, // StartTunnelA - 22 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ - 23 => self.process_answer(rpcreader).await, // CompleteTunnelA - 24 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ - 25 => self.process_answer(rpcreader).await, // CancelTunnelA - _ => panic!("must update rpc table"), + // Process specific message kind + match msg.operation.kind() { + RPCOperationKind::Question(q) => match q.detail() { + RPCQuestionDetail::StatusQ(_) => self.process_status_q(msg).await, + RPCQuestionDetail::FindNodeQ(_) => self.process_find_node_q(msg).await, + RPCQuestionDetail::GetValueQ(_) => self.process_get_value_q(msg).await, + RPCQuestionDetail::SetValueQ(_) => self.process_set_value_q(msg).await, + RPCQuestionDetail::WatchValueQ(_) => self.process_watch_value_q(msg).await, + RPCQuestionDetail::SupplyBlockQ(_) => self.process_supply_block_q(msg).await, + RPCQuestionDetail::FindBlockQ(_) => self.process_find_block_q(msg).await, + RPCQuestionDetail::StartTunnelQ(_) => self.process_start_tunnel_q(msg).await, + RPCQuestionDetail::CompleteTunnelQ(_) => self.process_complete_tunnel_q(msg).await, + RPCQuestionDetail::CancelTunnelQ(_) => self.process_cancel_tunnel_q(msg).await, + }, + RPCOperationKind::Statement(s) => match s.detail() { + RPCStatementDetail::ValidateDialInfo(_) => { + self.process_validate_dial_info(msg).await + } + RPCStatementDetail::Route(_) => self.process_route(msg).await, + RPCStatementDetail::NodeInfoUpdate(_) => self.process_node_info_update(msg).await, + RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await, + RPCStatementDetail::Signal(_) => self.process_signal(msg).await, + RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await, + }, + RPCOperationKind::Answer(a) => self.complete_op_id_waiter(msg).await, } } - async fn process_rpc_message(&self, msg: RPCMessage) -> Result<(), RPCError> { + async fn process_rpc_message(&self, msg: RPCMessageEncoded) -> Result<(), RPCError> { if msg.header.envelope.get_version() == 0 { self.process_rpc_message_version_0(msg).await } else { @@ -1316,7 +884,7 @@ impl RPCProcessor { } } - async fn rpc_worker(self, stop_token: StopToken, receiver: flume::Receiver) { + async fn rpc_worker(self, stop_token: StopToken, receiver: flume::Receiver) { while let Ok(Ok(msg)) = receiver.recv_async().timeout_at(stop_token.clone()).await { let _ = self .process_rpc_message(msg) @@ -1365,7 +933,11 @@ impl RPCProcessor { for _ in 0..concurrency { let this = self.clone(); let receiver = channel.1.clone(); - let jh = intf::spawn(Self::rpc_worker(this, inner.stop_source.as_ref().unwrap().token(), receiver)); + let jh = intf::spawn(Self::rpc_worker( + this, + inner.stop_source.as_ref().unwrap().token(), + receiver, + )); inner.worker_join_handles.push(jh); } @@ -1375,7 +947,7 @@ impl RPCProcessor { #[instrument(level = "debug", skip_all)] pub async fn shutdown(&self) { debug!("starting rpc processor shutdown"); - + // Stop the rpc workers let mut unord = FuturesUnordered::new(); { @@ -1388,12 +960,12 @@ impl RPCProcessor { drop(inner.stop_source.take()); } debug!("stopping {} rpc worker tasks", unord.len()); - + // Wait for them to complete while unord.next().await.is_some() {} - + debug!("resetting rpc processor state"); - + // Release the rpc processor *self.inner.lock() = Self::new_inner(self.network_manager()); @@ -1406,7 +978,7 @@ impl RPCProcessor { body: Vec, peer_noderef: NodeRef, ) -> Result<(), String> { - let msg = RPCMessage { + let msg = RPCMessageEncoded { header: RPCMessageHeader { timestamp: intf::get_timestamp(), envelope, @@ -1424,352 +996,4 @@ impl RPCProcessor { .map_err(|e| format!("failed to enqueue received RPC message: {:?}", e))?; Ok(()) } - - // Gets a 'RespondTo::Sender' that contains either our dial info, - // or None if the peer has seen our dial info before or our node info is not yet valid - // because of an unknown network class - pub fn make_respond_to_sender(&self, peer: NodeRef) -> RespondTo { - if peer.has_seen_our_node_info() - || matches!( - self.network_manager() - .get_network_class() - .unwrap_or(NetworkClass::Invalid), - NetworkClass::Invalid - ) - { - RespondTo::Sender(None) - } else { - let our_sni = self.routing_table().get_own_signed_node_info(); - RespondTo::Sender(Some(our_sni)) - } - } - - // Send StatusQ RPC request, receive StatusA answer - // Can be sent via relays, but not via routes - pub async fn rpc_call_status(self, peer: NodeRef) -> Result { - - let node_status = self.network_manager().generate_node_status(); - let status_q = RPCOperationStatusQ { - node_status - }; - let respond_to = self.make_respond_to_sender(peer.clone()); - let operation = RPCOperation::new_question(RPCQuestion::new(respond_to, RPCQuestionDetail::StatusQ(status_q))); - - // Send the info request - let waitable_reply = self - .request(Destination::Direct(peer.clone()), operation, None) - .await? - .unwrap(); - - // Note what kind of ping this was and to what peer scope - let send_data_kind = waitable_reply.send_data_kind; - - // Wait for reply - let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; - - let (sender_info, node_status) = { - let response_operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - let status_a = match response_operation - .get_detail() - .which() - .map_err(map_error_capnp_notinschema!()) - .map_err(logthru_rpc!())? - { - veilid_capnp::operation::detail::StatusA(a) => { - a.map_err(map_error_internal!("Invalid StatusA"))? - } - _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), - }; - - // Decode node info - if !status_a.has_node_status() { - return Err(rpc_error_internal("Missing node status")); - } - let nsr = status_a - .get_node_status() - .map_err(map_error_internal!("Broken node status"))?; - let node_status = decode_node_status(&nsr)?; - - // Decode sender info - let sender_info = if status_a.has_sender_info() { - let sir = status_a - .get_sender_info() - .map_err(map_error_internal!("Broken sender info"))?; - decode_sender_info(&sir)? - } else { - SenderInfo::default() - }; - - // Update latest node status in routing table - peer.operate_mut(|e| { - e.update_node_status(node_status.clone()); - }); - - (sender_info, node_status) - }; - - // Report sender_info IP addresses to network manager - if let Some(socket_address) = sender_info.socket_address { - match send_data_kind { - SendDataKind::LocalDirect => { - self.network_manager() - .report_local_socket_address(socket_address, peer) - .await; - } - SendDataKind::GlobalDirect => { - self.network_manager() - .report_global_socket_address(socket_address, peer) - .await; - } - SendDataKind::GlobalIndirect => { - // Do nothing in this case, as the socket address returned here would be for any node other than ours - } - } - } - - // Return the answer for anyone who may care - let out = StatusAnswer { - latency, - node_status, - sender_info, - }; - - Ok(out) - } - - // Can only be sent directly, not via relays or routes - pub async fn rpc_call_validate_dial_info( - &self, - peer: NodeRef, - dial_info: DialInfo, - redirect: bool, - ) -> Result { - let network_manager = self.network_manager(); - let receipt_time = ms_to_us( - self.config - .get() - .network - .dht - .validate_dial_info_receipt_time_ms, - ); - // - let (vdi_msg, eventual_value) = { - let mut vdi_msg = ::capnp::message::Builder::new_default(); - let mut question = vdi_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let mut vdi_builder = detail.init_validate_dial_info(); - - // Generate receipt and waitable eventual so we can see if we get the receipt back - let (receipt, eventual_value) = network_manager - .generate_single_shot_receipt(receipt_time, []) - .map_err(map_error_string!())?; - - vdi_builder.set_redirect(redirect); - let mut di_builder = vdi_builder.reborrow().init_dial_info(); - encode_dial_info(&dial_info, &mut di_builder)?; - let r_builder = vdi_builder.init_receipt(receipt.len().try_into().map_err( - map_error_protocol!("invalid receipt length in validate dial info"), - )?); - r_builder.copy_from_slice(&receipt); - - (vdi_msg.into_reader(), eventual_value) - }; - - // Send the validate_dial_info request - // This can only be sent directly, as relays can not validate dial info - self.request(Destination::Direct(peer), vdi_msg, None) - .await?; - - log_net!(debug "waiting for validate_dial_info receipt"); - // Wait for receipt - match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedInBand { - inbound_noderef: _, - } => { - Err(rpc_error_internal("validate_dial_info receipt should be returned out-of-band")) - } - ReceiptEvent::ReturnedOutOfBand => { - log_net!(debug "validate_dial_info receipt returned"); - Ok(true) - } - ReceiptEvent::Expired => { - log_net!(debug "validate_dial_info receipt expired"); - Ok(false) - } - ReceiptEvent::Cancelled => { - Err(rpc_error_internal("receipt was dropped before expiration")) - } - } - } - - // Send FindNodeQ RPC request, receive FindNodeA answer - // Can be sent via all methods including relays and routes - pub async fn rpc_call_find_node( - self, - dest: Destination, - key: DHTKey, - safety_route: Option<&SafetyRouteSpec>, - respond_to: RespondTo, - ) -> Result { - let find_node_q_msg = { - let mut find_node_q_msg = ::capnp::message::Builder::new_default(); - let mut question = find_node_q_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to_builder = question.reborrow().init_respond_to(); - respond_to.encode(&mut respond_to_builder)?; - let detail = question.reborrow().init_detail(); - let mut fnq = detail.init_find_node_q(); - let mut node_id_builder = fnq.reborrow().init_node_id(); - encode_public_key(&key, &mut node_id_builder)?; - - find_node_q_msg.into_reader() - }; - - // Send the find_node request - let waitable_reply = self - .request(dest, find_node_q_msg, safety_route) - .await? - .unwrap(); - - // Wait for reply - let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; - - let response_operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - let find_node_a = match response_operation - .get_detail() - .which() - .map_err(map_error_capnp_notinschema!()) - .map_err(logthru_rpc!())? - { - veilid_capnp::operation::detail::FindNodeA(a) => { - a.map_err(map_error_internal!("Invalid FindNodeA"))? - } - _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), - }; - - let peers_reader = find_node_a - .get_peers() - .map_err(map_error_internal!("Missing peers"))?; - let mut peers = Vec::::with_capacity( - peers_reader - .len() - .try_into() - .map_err(map_error_internal!("too many peers"))?, - ); - for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p, true)?; - - if !self.filter_peer_scope(&peer_info.signed_node_info.node_info) { - return Err(rpc_error_invalid_format( - "find_node response has invalid peer scope", - )); - } - - peers.push(peer_info); - } - - let out = FindNodeAnswer { latency, peers }; - - Ok(out) - } - - // Sends a our node info to another node - // Can be sent via all methods including relays and routes - pub async fn rpc_call_node_info_update( - &self, - dest: Destination, - safety_route: Option<&SafetyRouteSpec>, - ) -> Result<(), RPCError> { - let sni_msg = { - let mut sni_msg = ::capnp::message::Builder::new_default(); - let mut question = sni_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let niu_builder = detail.init_node_info_update(); - let mut sni_builder = niu_builder.init_signed_node_info(); - let sni = self.routing_table().get_own_signed_node_info(); - encode_signed_node_info(&sni, &mut sni_builder)?; - - sni_msg.into_reader() - }; - - // Send the node_info_update request - self.request(dest, sni_msg, safety_route).await?; - - Ok(()) - } - - // Sends a unidirectional signal to a node - // Can be sent via all methods including relays and routes - pub async fn rpc_call_signal( - &self, - dest: Destination, - safety_route: Option<&SafetyRouteSpec>, - signal_info: SignalInfo, - ) -> Result<(), RPCError> { - let sig_msg = { - let mut sig_msg = ::capnp::message::Builder::new_default(); - let mut question = sig_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let mut sig_builder = detail.init_signal(); - encode_signal_info(&signal_info, &mut sig_builder)?; - - sig_msg.into_reader() - }; - - // Send the signal request - self.request(dest, sig_msg, safety_route).await?; - - Ok(()) - } - - // Sends a unidirectional in-band return receipt - // Can be sent via all methods including relays and routes - pub async fn rpc_call_return_receipt>( - &self, - dest: Destination, - safety_route: Option<&SafetyRouteSpec>, - receipt: D, - ) -> Result<(), RPCError> { - let receipt = receipt.as_ref(); - - let rr_msg = { - let mut rr_msg = ::capnp::message::Builder::new_default(); - let mut question = rr_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let rr_builder = detail.init_return_receipt(); - let r_builder = rr_builder.init_receipt(receipt.len().try_into().map_err( - map_error_protocol!("invalid receipt length in return receipt"), - )?); - r_builder.copy_from_slice(receipt); - - rr_msg.into_reader() - }; - - // Send the return receipt request - self.request(dest, rr_msg, safety_route).await?; - - Ok(()) - } - - // xxx do not process latency for routed messages } diff --git a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs new file mode 100644 index 00000000..62ff55b9 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_cancel_tunnel_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs new file mode 100644 index 00000000..aa73f303 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_complete_tunnel_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_find_block.rs b/veilid-core/src/rpc_processor/rpc_find_block.rs new file mode 100644 index 00000000..2a4c4eff --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_find_block.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_find_block_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs new file mode 100644 index 00000000..dbe37dfc --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -0,0 +1,148 @@ +use super::*; + +impl RPCProcessor { + // Send FindNodeQ RPC request, receive FindNodeA answer + // Can be sent via all methods including relays and routes + pub async fn rpc_call_find_node( + self, + dest: Destination, + key: DHTKey, + safety_route: Option<&SafetyRouteSpec>, + respond_to: RespondTo, + ) -> Result { + let find_node_q_msg = { + let mut find_node_q_msg = ::capnp::message::Builder::new_default(); + let mut question = find_node_q_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to_builder = question.reborrow().init_respond_to(); + respond_to.encode(&mut respond_to_builder)?; + let detail = question.reborrow().init_detail(); + let mut fnq = detail.init_find_node_q(); + let mut node_id_builder = fnq.reborrow().init_node_id(); + encode_public_key(&key, &mut node_id_builder)?; + + find_node_q_msg.into_reader() + }; + + // Send the find_node request + let waitable_reply = self + .request(dest, find_node_q_msg, safety_route) + .await? + .unwrap(); + + // Wait for reply + let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; + + let response_operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + let find_node_a = match response_operation + .get_detail() + .which() + .map_err(map_error_capnp_notinschema!()) + .map_err(logthru_rpc!())? + { + veilid_capnp::operation::detail::FindNodeA(a) => { + a.map_err(map_error_internal!("Invalid FindNodeA"))? + } + _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), + }; + + let peers_reader = find_node_a + .get_peers() + .map_err(map_error_internal!("Missing peers"))?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + + if !self.filter_peer_scope(&peer_info.signed_node_info.node_info) { + return Err(rpc_error_invalid_format( + "find_node response has invalid peer scope", + )); + } + + peers.push(peer_info); + } + + let out = FindNodeAnswer { latency, peers }; + + Ok(out) + } + + pub(crate) async fn process_find_node_q(&self, rpcreader: RPCMessage) -> Result<(), RPCError> { + // + let reply_msg = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // find_node must always want an answer + if !self.wants_answer(&operation)? { + return Err(rpc_error_invalid_format("find_node_q should want answer")); + } + + // get findNodeQ reader + let fnq_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::FindNodeQ(Ok(x))) => x, + _ => panic!("invalid operation type in process_find_node_q"), + }; + + // get the node id we want to look up + let target_node_id = decode_public_key( + &fnq_reader + .get_node_id() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?, + ); + + // add node information for the requesting node to our routing table + let routing_table = self.routing_table(); + + // find N nodes closest to the target node in our routing table + let own_peer_info = routing_table.get_own_peer_info(); + let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); + + let closest_nodes = routing_table.find_closest_nodes( + target_node_id, + // filter + Some(move |_k, v| { + RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid) + }), + // transform + move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info), + ); + log_rpc!(">>>> Returning {} closest peers", closest_nodes.len()); + + // Send find_node answer + let mut reply_msg = ::capnp::message::Builder::new_default(); + let mut answer = reply_msg.init_root::(); + answer.set_op_id(operation.get_op_id()); + let mut respond_to = answer.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = answer.reborrow().init_detail(); + let fna = detail.init_find_node_a(); + let mut peers_builder = fna.init_peers( + closest_nodes + .len() + .try_into() + .map_err(map_error_internal!("invalid closest nodes list length"))?, + ); + for (i, closest_node) in closest_nodes.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(closest_node, &mut pi_builder)?; + } + reply_msg.into_reader() + }; + + self.reply(rpcreader, reply_msg, None).await + } +} diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs new file mode 100644 index 00000000..dfc0a7c2 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_get_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_get_value_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs new file mode 100644 index 00000000..70a577fc --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -0,0 +1,78 @@ +use super::*; + +impl RPCProcessor { + // Sends a our node info to another node + // Can be sent via all methods including relays and routes + pub async fn rpc_call_node_info_update( + &self, + dest: Destination, + safety_route: Option<&SafetyRouteSpec>, + ) -> Result<(), RPCError> { + let sni_msg = { + let mut sni_msg = ::capnp::message::Builder::new_default(); + let mut question = sni_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let niu_builder = detail.init_node_info_update(); + let mut sni_builder = niu_builder.init_signed_node_info(); + let sni = self.routing_table().get_own_signed_node_info(); + encode_signed_node_info(&sni, &mut sni_builder)?; + + sni_msg.into_reader() + }; + + // Send the node_info_update request + self.request(dest, sni_msg, safety_route).await?; + + Ok(()) + } + + pub(crate) async fn process_node_info_update( + &self, + rpcreader: RPCMessage, + ) -> Result<(), RPCError> { + // + let sender_node_id = rpcreader.header.envelope.get_sender_id(); + let signed_node_info = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // This should never want an answer + if self.wants_answer(&operation)? { + return Err(rpc_error_invalid_format( + "node_info_update should not want answer", + )); + } + + // get nodeInfoUpdate reader + let niumsg_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::NodeInfoUpdate(Ok(x))) => x, + _ => panic!("invalid operation type in process_node_info_update"), + }; + + // Parse out fields + let sni_reader = niumsg_reader + .get_signed_node_info() + .map_err(map_error_internal!("no valid signed node info"))?; + decode_signed_node_info(&sni_reader, &sender_node_id, true)? + }; + + // Update our routing table with signed node info + if !self.filter_peer_scope(&signed_node_info.node_info) { + return Err(rpc_error_invalid_format( + "node_info_update has invalid peer scope", + )); + } + let _ = self + .routing_table() + .register_node_with_signed_node_info(sender_node_id, signed_node_info) + .map_err(RPCError::Internal)?; + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs new file mode 100644 index 00000000..51d47889 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -0,0 +1,76 @@ +use super::*; + +impl RPCProcessor { + // Sends a unidirectional in-band return receipt + // Can be sent via all methods including relays and routes + pub async fn rpc_call_return_receipt>( + &self, + dest: Destination, + safety_route: Option<&SafetyRouteSpec>, + receipt: D, + ) -> Result<(), RPCError> { + let receipt = receipt.as_ref(); + + let rr_msg = { + let mut rr_msg = ::capnp::message::Builder::new_default(); + let mut question = rr_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let rr_builder = detail.init_return_receipt(); + let r_builder = rr_builder.init_receipt(receipt.len().try_into().map_err( + map_error_protocol!("invalid receipt length in return receipt"), + )?); + r_builder.copy_from_slice(receipt); + + rr_msg.into_reader() + }; + + // Send the return receipt request + self.request(dest, rr_msg, safety_route).await?; + + Ok(()) + } + + pub(crate) async fn process_return_receipt( + &self, + rpcreader: RPCMessage, + ) -> Result<(), RPCError> { + let receipt = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // This should never want an answer + if self.wants_answer(&operation)? { + return Err(rpc_error_invalid_format( + "return receipt should not want answer", + )); + } + + // get returnReceipt reader + let rr_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::ReturnReceipt(Ok(x))) => x, + _ => panic!("invalid operation type in process_return_receipt"), + }; + + // Get receipt + rr_reader + .get_receipt() + .map_err(map_error_internal!( + "no valid receipt in process_return_receipt" + ))? + .to_vec() + }; + + // Handle it + let network_manager = self.network_manager(); + network_manager + .handle_in_band_receipt(receipt, rpcreader.header.peer_noderef) + .await + .map_err(map_error_string!()) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs new file mode 100644 index 00000000..a211c3b9 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -0,0 +1,10 @@ +use super::*; + +impl RPCProcessor { + // xxx do not process latency for routed messages + + pub(crate) async fn process_route(&self, _rpcreader: RPCMessage) -> Result<(), RPCError> { + // xxx do not process latency for routed messages + Err(rpc_error_unimplemented("process_route")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs new file mode 100644 index 00000000..f3d37041 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_set_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_set_value_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs new file mode 100644 index 00000000..172e9912 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -0,0 +1,61 @@ +use super::*; + +impl RPCProcessor { + // Sends a unidirectional signal to a node + // Can be sent via all methods including relays and routes + pub async fn rpc_call_signal( + &self, + dest: Destination, + safety_route: Option<&SafetyRouteSpec>, + signal_info: SignalInfo, + ) -> Result<(), RPCError> { + let sig_msg = { + let mut sig_msg = ::capnp::message::Builder::new_default(); + let mut question = sig_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let mut sig_builder = detail.init_signal(); + encode_signal_info(&signal_info, &mut sig_builder)?; + + sig_msg.into_reader() + }; + + // Send the signal request + self.request(dest, sig_msg, safety_route).await?; + + Ok(()) + } + + pub(crate) async fn process_signal(&self, rpcreader: RPCMessage) -> Result<(), RPCError> { + let signal_info = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // This should never want an answer + if self.wants_answer(&operation)? { + return Err(rpc_error_invalid_format("signal should not want answer")); + } + + // get signal reader + let sig_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::Signal(Ok(x))) => x, + _ => panic!("invalid operation type in process_signal"), + }; + + // Get signal info + decode_signal_info(&sig_reader)? + }; + + // Handle it + let network_manager = self.network_manager(); + network_manager + .handle_signal(signal_info) + .await + .map_err(map_error_string!()) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs new file mode 100644 index 00000000..13b6d55d --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_start_tunnel_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs new file mode 100644 index 00000000..c3f5492c --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -0,0 +1,98 @@ +use super::*; + +impl RPCProcessor { + // Send StatusQ RPC request, receive StatusA answer + // Can be sent via relays, but not via routes + pub async fn rpc_call_status( + self, + peer: NodeRef, + ) -> Result, RPCError> { + let node_status = self.network_manager().generate_node_status(); + let status_q = RPCOperationStatusQ { node_status }; + let respond_to = self.make_respond_to_sender(peer.clone()); + let question = RPCQuestion::new(respond_to, RPCQuestionDetail::StatusQ(status_q)); + + // Send the info request + let waitable_reply = self + .question(Destination::Direct(peer.clone()), question, None) + .await?; + + // Note what kind of ping this was and to what peer scope + let send_data_kind = waitable_reply.send_data_kind; + + // Wait for reply + let (msg, latency) = self.wait_for_reply(waitable_reply).await?; + + // Get the right answer type + let status_a = match msg.operation.into_kind() { + RPCOperationKind::Answer(a) => match a.into_detail() { + RPCAnswerDetail::StatusA(a) => a, + _ => return Err(rpc_error_invalid_format("not a status answer")), + }, + _ => return Err(rpc_error_invalid_format("not an answer")), + }; + + // Update latest node status in routing table + peer.operate_mut(|e| { + e.update_node_status(status_a.node_status.clone()); + }); + + // Report sender_info IP addresses to network manager + if let Some(socket_address) = status_a.sender_info.socket_address { + match send_data_kind { + SendDataKind::LocalDirect => { + self.network_manager() + .report_local_socket_address(socket_address, peer) + .await; + } + SendDataKind::GlobalDirect => { + self.network_manager() + .report_global_socket_address(socket_address, peer) + .await; + } + SendDataKind::GlobalIndirect => { + // Do nothing in this case, as the socket address returned here would be for any node other than ours + } + } + } + + Ok(Answer::new(latency, status_a)) + } + + pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + let peer_noderef = msg.header.peer_noderef.clone(); + + // Get the question + let status_q = match msg.operation.kind() { + RPCOperationKind::Question(q) => match q.detail() { + RPCQuestionDetail::StatusQ(q) => q, + _ => panic!("not a status question"), + }, + _ => panic!("not a question"), + }; + + // update node status for the requesting node to our routing table + if let Some(sender_nr) = msg.opt_sender_nr.clone() { + // Update latest node status in routing table for the statusq sender + sender_nr.operate_mut(|e| { + e.update_node_status(status_q.node_status.clone()); + }); + } + + // Make status answer + let node_status = self.network_manager().generate_node_status(); + let sender_info = Self::generate_sender_info(peer_noderef).await; + let status_a = RPCOperationStatusA { + node_status, + sender_info, + }; + + // Send status answer + self.answer( + msg, + RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)), + None, + ) + .await + } +} diff --git a/veilid-core/src/rpc_processor/rpc_supply_block.rs b/veilid-core/src/rpc_processor/rpc_supply_block.rs new file mode 100644 index 00000000..a187817e --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_supply_block.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_supply_block_q")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs new file mode 100644 index 00000000..4003c735 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -0,0 +1,207 @@ +use super::*; + +impl RPCProcessor { + // Can only be sent directly, not via relays or routes + pub async fn rpc_call_validate_dial_info( + &self, + peer: NodeRef, + dial_info: DialInfo, + redirect: bool, + ) -> Result { + let network_manager = self.network_manager(); + let receipt_time = ms_to_us( + self.config + .get() + .network + .dht + .validate_dial_info_receipt_time_ms, + ); + // + let (vdi_msg, eventual_value) = { + let mut vdi_msg = ::capnp::message::Builder::new_default(); + let mut question = vdi_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let mut vdi_builder = detail.init_validate_dial_info(); + + // Generate receipt and waitable eventual so we can see if we get the receipt back + let (receipt, eventual_value) = network_manager + .generate_single_shot_receipt(receipt_time, []) + .map_err(map_error_string!())?; + + vdi_builder.set_redirect(redirect); + let mut di_builder = vdi_builder.reborrow().init_dial_info(); + encode_dial_info(&dial_info, &mut di_builder)?; + let r_builder = vdi_builder.init_receipt(receipt.len().try_into().map_err( + map_error_protocol!("invalid receipt length in validate dial info"), + )?); + r_builder.copy_from_slice(&receipt); + + (vdi_msg.into_reader(), eventual_value) + }; + + // Send the validate_dial_info request + // This can only be sent directly, as relays can not validate dial info + self.request(Destination::Direct(peer), vdi_msg, None) + .await?; + + log_net!(debug "waiting for validate_dial_info receipt"); + // Wait for receipt + match eventual_value.await.take_value().unwrap() { + ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => Err(rpc_error_internal( + "validate_dial_info receipt should be returned out-of-band", + )), + ReceiptEvent::ReturnedOutOfBand => { + log_net!(debug "validate_dial_info receipt returned"); + Ok(true) + } + ReceiptEvent::Expired => { + log_net!(debug "validate_dial_info receipt expired"); + Ok(false) + } + ReceiptEvent::Cancelled => { + Err(rpc_error_internal("receipt was dropped before expiration")) + } + } + } + + pub(crate) async fn process_validate_dial_info( + &self, + rpcreader: RPCMessage, + ) -> Result<(), RPCError> { + // + let (redirect, dial_info, receipt) = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // This should never want an answer + if self.wants_answer(&operation)? { + return Err(rpc_error_invalid_format( + "validate dial info should not want answer", + )); + } + + // get validateDialInfo reader + let vdi_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::ValidateDialInfo(Ok(x))) => x, + _ => panic!("invalid operation type in process_validate_dial_info"), + }; + + // Parse out fields + let redirect = vdi_reader.get_redirect(); + let dial_info = decode_dial_info(&vdi_reader.get_dial_info().map_err( + map_error_internal!("no valid dial info in process_validate_dial_info"), + )?)?; + let receipt = vdi_reader + .get_receipt() + .map_err(map_error_internal!( + "no valid receipt in process_validate_dial_info" + ))? + .to_vec(); + + (redirect, dial_info, receipt) + }; + + // Redirect this request if we are asked to + if redirect { + // Find peers capable of validating this dial info + // We filter on the -outgoing- protocol capability status not the node's dial info + // Use the address type though, to ensure we reach an ipv6 capable node if this is + // an ipv6 address + let routing_table = self.routing_table(); + let filter = DialInfoFilter::global().with_address_type(dial_info.address_type()); + let sender_id = rpcreader.header.envelope.get_sender_id(); + let node_count = { + let c = self.config.get(); + c.network.dht.max_find_node_count as usize + }; + let mut peers = routing_table.find_fast_public_nodes_filtered(node_count, &filter); + if peers.is_empty() { + return Err(rpc_error_internal(format!( + "no peers matching filter '{:?}'", + filter + ))); + } + for peer in &mut peers { + // Ensure the peer is not the one asking for the validation + if peer.node_id() == sender_id { + continue; + } + + // Release the filter on the peer because we don't need to send the redirect with the filter + // we just wanted to make sure we only selected nodes that were capable of + // using the correct protocol for the dial info being validated + peer.set_filter(None); + + // Ensure the peer's status is known and that it is capable of + // making outbound connections for the dial info we want to verify + // and if this peer can validate dial info + let can_contact_dial_info = peer.operate(|e: &BucketEntryInner| { + if let Some(ni) = e.node_info() { + ni.outbound_protocols.contains(dial_info.protocol_type()) + && ni.can_validate_dial_info() + } else { + false + } + }); + if !can_contact_dial_info { + continue; + } + + // See if this peer will validate dial info + let will_validate_dial_info = peer.operate(|e: &BucketEntryInner| { + if let Some(status) = &e.peer_stats().status { + status.will_validate_dial_info + } else { + true + } + }); + if !will_validate_dial_info { + continue; + } + + // Make a copy of the request, without the redirect flag + let vdi_msg_reader = { + let mut vdi_msg = ::capnp::message::Builder::new_default(); + let mut question = vdi_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let mut vdi_builder = detail.init_validate_dial_info(); + vdi_builder.set_redirect(false); + let mut di_builder = vdi_builder.reborrow().init_dial_info(); + encode_dial_info(&dial_info, &mut di_builder)?; + let r_builder = vdi_builder.init_receipt(receipt.len().try_into().map_err( + map_error_protocol!("invalid receipt length in process_validate_dial_info"), + )?); + r_builder.copy_from_slice(&receipt); + vdi_msg.into_reader() + }; + + // Send the validate_dial_info request until we succeed + self.request(Destination::Direct(peer.clone()), vdi_msg_reader, None) + .await?; + } + return Ok(()); + }; + + // Otherwise send a return receipt directly + // Possibly from an alternate port + let network_manager = self.network_manager(); + network_manager + .send_out_of_band_receipt(dial_info.clone(), receipt) + .await + .map_err(map_error_string!()) + .map_err( + logthru_net!(error "failed to send direct receipt to dial info: {}", dial_info), + )?; + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs new file mode 100644 index 00000000..2cfa185d --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_value_changed")) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs new file mode 100644 index 00000000..1d5c9a04 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -0,0 +1,7 @@ +use super::*; + +impl RPCProcessor { + pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_watch_value_q")) + } +} diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 073fc706..73b04650 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1887,89 +1887,56 @@ impl VeilidAPI { //////////////////////////////////////////////////////////////// // Direct Node Access (pretty much for testing only) - #[instrument(level = "debug", err, skip(self))] - pub async fn status(&self, node_id: NodeId) -> Result { - let rpc = self.rpc_processor()?; - let routing_table = rpc.routing_table(); - let node_ref = match routing_table.lookup_node_ref(node_id.key) { - None => return Err(VeilidAPIError::NodeNotFound { node_id }), - Some(nr) => nr, - }; - let status_answer = rpc - .rpc_call_status(node_ref) - .await - .map_err(map_rpc_error!())?; - Ok(status_answer) - } + // #[instrument(level = "debug", err, skip(self))] + // pub async fn search_dht(&self, node_id: NodeId) -> Result { + // let rpc_processor = self.rpc_processor()?; + // let config = self.config()?; + // let (count, fanout, timeout) = { + // let c = config.get(); + // ( + // c.network.dht.resolve_node_count, + // c.network.dht.resolve_node_fanout, + // c.network.dht.resolve_node_timeout_ms.map(ms_to_us), + // ) + // }; - #[instrument(level = "debug", err, skip(self))] - pub async fn validate_dial_info( - &self, - node_id: NodeId, - dial_info: DialInfo, - redirect: bool, - ) -> Result { - let rpc = self.rpc_processor()?; - let routing_table = rpc.routing_table(); - let node_ref = match routing_table.lookup_node_ref(node_id.key) { - None => return Err(VeilidAPIError::NodeNotFound { node_id }), - Some(nr) => nr, - }; - rpc.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect) - .await - .map_err(map_rpc_error!()) - } + // let node_ref = rpc_processor + // .search_dht_single_key(node_id.key, count, fanout, timeout) + // .await + // .map_err(map_rpc_error!())?; - #[instrument(level = "debug", err, skip(self))] - pub async fn search_dht(&self, node_id: NodeId) -> Result { - let rpc_processor = self.rpc_processor()?; - let config = self.config()?; - let (count, fanout, timeout) = { - let c = config.get(); - ( - c.network.dht.resolve_node_count, - c.network.dht.resolve_node_fanout, - c.network.dht.resolve_node_timeout_ms.map(ms_to_us), - ) - }; + // let answer = node_ref.peer_info(); + // if let Some(answer) = answer { + // Ok(answer) + // } else { + // Err(VeilidAPIError::NoPeerInfo { + // node_id: NodeId::new(node_ref.node_id()), + // }) + // } + // } - let node_ref = rpc_processor - .search_dht_single_key(node_id.key, count, fanout, timeout) - .await - .map_err(map_rpc_error!())?; + // #[instrument(level = "debug", err, skip(self))] + // pub async fn search_dht_multi(&self, node_id: NodeId) -> Result, VeilidAPIError> { + // let rpc_processor = self.rpc_processor()?; + // let config = self.config()?; + // let (count, fanout, timeout) = { + // let c = config.get(); + // ( + // c.network.dht.resolve_node_count, + // c.network.dht.resolve_node_fanout, + // c.network.dht.resolve_node_timeout_ms.map(ms_to_us), + // ) + // }; - let answer = node_ref.peer_info(); - if let Some(answer) = answer { - Ok(answer) - } else { - Err(VeilidAPIError::NoPeerInfo { - node_id: NodeId::new(node_ref.node_id()), - }) - } - } + // let node_refs = rpc_processor + // .search_dht_multi_key(node_id.key, count, fanout, timeout) + // .await + // .map_err(map_rpc_error!())?; - #[instrument(level = "debug", err, skip(self))] - pub async fn search_dht_multi(&self, node_id: NodeId) -> Result, VeilidAPIError> { - let rpc_processor = self.rpc_processor()?; - let config = self.config()?; - let (count, fanout, timeout) = { - let c = config.get(); - ( - c.network.dht.resolve_node_count, - c.network.dht.resolve_node_fanout, - c.network.dht.resolve_node_timeout_ms.map(ms_to_us), - ) - }; + // let answer = node_refs.iter().filter_map(|x| x.peer_info()).collect(); - let node_refs = rpc_processor - .search_dht_multi_key(node_id.key, count, fanout, timeout) - .await - .map_err(map_rpc_error!())?; - - let answer = node_refs.iter().filter_map(|x| x.peer_info()).collect(); - - Ok(answer) - } + // Ok(answer) + // } //////////////////////////////////////////////////////////////// // Safety / Private Route Handling