private routing

This commit is contained in:
John Smith 2022-11-02 15:36:01 -04:00
parent ec58574a5e
commit 92b22d5af5
18 changed files with 426 additions and 245 deletions

View File

@ -249,7 +249,7 @@ struct SignedNodeInfo {
}
struct SenderInfo {
socketAddress @0 :SocketAddress; # socket address was available for peer
socketAddress @0 :SocketAddress; # socket address that for the sending peer
}
struct PeerInfo {
@ -265,12 +265,12 @@ struct RoutedOperation {
}
struct OperationStatusQ {
nodeStatus @0 :NodeStatus; # node status update about the statusq sender
nodeStatus @0 :NodeStatus; # Optional: node status update about the statusq sender
}
struct OperationStatusA {
nodeStatus @0 :NodeStatus; # returned node status
senderInfo @1 :SenderInfo; # info about StatusQ sender from the perspective of the replier
nodeStatus @0 :NodeStatus; # Optional: returned node status
senderInfo @1 :SenderInfo; # Optional: info about StatusQ sender from the perspective of the replier
}
struct OperationValidateDialInfo {

View File

@ -783,6 +783,26 @@ impl NetworkManager {
.await
}
/// Process a received safety receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
pub async fn handle_safety_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
) -> NetworkResult<()> {
let receipt_manager = self.receipt_manager();
let receipt = match Receipt::from_signed_data(receipt_data.as_ref()) {
Err(e) => {
return NetworkResult::invalid_message(e.to_string());
}
Ok(v) => v,
};
receipt_manager
.handle_receipt(receipt, ReceiptReturned::Safety)
.await
}
/// Process a received private receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
pub async fn handle_private_receipt<R: AsRef<[u8]>>(
@ -1025,7 +1045,8 @@ impl NetworkManager {
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand => {
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"reverse connect receipt should be returned in-band",
));
@ -1127,7 +1148,8 @@ impl NetworkManager {
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand => {
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"hole punch receipt should be returned in-band",
));

View File

@ -79,7 +79,7 @@ impl DiscoveryContext {
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> {
let rpc = self.routing_table.rpc_processor();
let res = network_result_value_or_log!(debug match rpc.rpc_call_status(node_ref.clone()).await {
let res = network_result_value_or_log!(debug match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await {
Ok(v) => v,
Err(e) => {
log_net!(error
@ -98,7 +98,7 @@ impl DiscoveryContext {
node_ref,
res.answer
);
res.answer.socket_address
res.answer.map(|si| si.socket_address)
}
// find fast peers with a particular address type, and ask them to tell us what our external address is

View File

@ -343,7 +343,7 @@ impl NetworkManager {
&self,
cur_ts: u64,
unord: &mut FuturesUnordered<
SendPinBoxFuture<Result<NetworkResult<Answer<SenderInfo>>, RPCError>>,
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>,
>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
@ -394,7 +394,7 @@ impl NetworkManager {
nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif));
log_net!("--> Keepalive ping to {:?}", nr_filtered);
unord.push(
async move { rpc.rpc_call_status(nr_filtered).await }
async move { rpc.rpc_call_status(Destination::direct(nr_filtered)).await }
.instrument(Span::current())
.boxed(),
);
@ -408,7 +408,7 @@ impl NetworkManager {
if !did_pings {
let rpc = rpc.clone();
unord.push(
async move { rpc.rpc_call_status(nr).await }
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
);
@ -425,7 +425,7 @@ impl NetworkManager {
&self,
cur_ts: u64,
unord: &mut FuturesUnordered<
SendPinBoxFuture<Result<NetworkResult<Answer<SenderInfo>>, RPCError>>,
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>,
>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
@ -440,7 +440,7 @@ impl NetworkManager {
// Just do a single ping with the best protocol for all the nodes
unord.push(
async move { rpc.rpc_call_status(nr).await }
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
);

View File

@ -11,6 +11,7 @@ use xx::*;
pub enum ReceiptEvent {
ReturnedOutOfBand,
ReturnedInBand { inbound_noderef: NodeRef },
ReturnedSafety,
ReturnedPrivate { private_route: DHTKey },
Expired,
Cancelled,
@ -20,6 +21,7 @@ pub enum ReceiptEvent {
pub enum ReceiptReturned {
OutOfBand,
InBand { inbound_noderef: NodeRef },
Safety,
Private { private_route: DHTKey },
}
@ -412,6 +414,7 @@ impl ReceiptManager {
match receipt_returned {
ReceiptReturned::OutOfBand => "OutOfBand".to_owned(),
ReceiptReturned::InBand { ref inbound_noderef } => format!("InBand({})", inbound_noderef),
ReceiptReturned::Safety => "Safety".to_owned(),
ReceiptReturned::Private { ref private_route } => format!("Private({})", private_route),
},
if extra_data.is_empty() {
@ -445,6 +448,7 @@ impl ReceiptManager {
// Get the receipt event to return
let receipt_event = match receipt_returned {
ReceiptReturned::OutOfBand => ReceiptEvent::ReturnedOutOfBand,
ReceiptReturned::Safety => ReceiptEvent::ReturnedSafety,
ReceiptReturned::InBand {
ref inbound_noderef,
} => ReceiptEvent::ReturnedInBand {

View File

@ -626,7 +626,7 @@ impl RouteSpecStore {
signatures: &[DHTSignature],
data: &[u8],
last_hop_id: DHTKey,
) -> EyreResult<Option<(DHTKeySecret, SafetySelection)>> {
) -> EyreResult<Option<(DHTKeySecret, SafetySpec)>> {
let inner = &*self.inner.lock();
let rsd = Self::detail(inner, &public_key).ok_or_else(|| eyre!("route does not exist"))?;
@ -656,12 +656,12 @@ impl RouteSpecStore {
// We got the correct signatures, return a key ans
Ok(Some((
rsd.secret_key,
SafetySelection::Safe(SafetySpec {
SafetySpec {
preferred_route: Some(*public_key),
hop_count: rsd.hops.len(),
stability: rsd.stability,
sequencing: rsd.sequencing,
}),
},
)))
}

View File

@ -3,42 +3,59 @@ use rpc_processor::*;
#[derive(Debug, Clone)]
pub struct RPCOperationStatusQ {
pub node_status: NodeStatus,
pub node_status: Option<NodeStatus>,
}
impl RPCOperationStatusQ {
pub fn decode(
reader: &veilid_capnp::operation_status_q::Reader,
) -> Result<RPCOperationStatusQ, RPCError> {
let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?;
let node_status = decode_node_status(&ns_reader)?;
let node_status = if reader.has_node_status() {
let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?;
let node_status = decode_node_status(&ns_reader)?;
Some(node_status)
} else {
None
};
Ok(RPCOperationStatusQ { node_status })
}
pub fn encode(
&self,
builder: &mut veilid_capnp::operation_status_q::Builder,
) -> Result<(), RPCError> {
let mut ns_builder = builder.reborrow().init_node_status();
encode_node_status(&self.node_status, &mut ns_builder)?;
if let Some(ns) = &self.node_status {
let mut ns_builder = builder.reborrow().init_node_status();
encode_node_status(&ns, &mut ns_builder)?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RPCOperationStatusA {
pub node_status: NodeStatus,
pub sender_info: SenderInfo,
pub node_status: Option<NodeStatus>,
pub sender_info: Option<SenderInfo>,
}
impl RPCOperationStatusA {
pub fn decode(
reader: &veilid_capnp::operation_status_a::Reader,
) -> Result<RPCOperationStatusA, RPCError> {
let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?;
let node_status = decode_node_status(&ns_reader)?;
let node_status = if reader.has_node_status() {
let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?;
let node_status = decode_node_status(&ns_reader)?;
Some(node_status)
} else {
None
};
let si_reader = reader.get_sender_info().map_err(RPCError::protocol)?;
let sender_info = decode_sender_info(&si_reader)?;
let sender_info = if reader.has_sender_info() {
let si_reader = reader.get_sender_info().map_err(RPCError::protocol)?;
let sender_info = decode_sender_info(&si_reader)?;
Some(sender_info)
} else {
None
};
Ok(RPCOperationStatusA {
node_status,
@ -49,10 +66,14 @@ impl RPCOperationStatusA {
&self,
builder: &mut veilid_capnp::operation_status_a::Builder,
) -> Result<(), RPCError> {
let mut ns_builder = builder.reborrow().init_node_status();
encode_node_status(&self.node_status, &mut ns_builder)?;
let mut si_builder = builder.reborrow().init_sender_info();
encode_sender_info(&self.sender_info, &mut si_builder)?;
if let Some(ns) = &self.node_status {
let mut ns_builder = builder.reborrow().init_node_status();
encode_node_status(&ns, &mut ns_builder)?;
}
if let Some(si) = &self.sender_info {
let mut si_builder = builder.reborrow().init_sender_info();
encode_sender_info(&si, &mut si_builder)?;
}
Ok(())
}
}

View File

@ -5,30 +5,21 @@ pub fn encode_sender_info(
sender_info: &SenderInfo,
builder: &mut veilid_capnp::sender_info::Builder,
) -> Result<(), RPCError> {
if let Some(socket_address) = &sender_info.socket_address {
let mut sab = builder.reborrow().init_socket_address();
encode_socket_address(socket_address, &mut sab)?;
}
let mut sab = builder.reborrow().init_socket_address();
encode_socket_address(&sender_info.socket_address, &mut sab)?;
Ok(())
}
pub fn decode_sender_info(
reader: &veilid_capnp::sender_info::Reader,
) -> Result<SenderInfo, RPCError> {
if !reader.has_socket_address() {
return Err(RPCError::internal("invalid socket address type"));
}
let socket_address = if reader.has_socket_address() {
Some(decode_socket_address(
&reader
.reborrow()
.get_socket_address()
.map_err(RPCError::map_internal(
"invalid socket address in sender_info",
))?,
)?)
} else {
None
};
let sa_reader = reader
.reborrow()
.get_socket_address()
.map_err(RPCError::map_internal(
"invalid socket address in sender_info",
))?;
let socket_address = decode_socket_address(&sa_reader)?;
Ok(SenderInfo { socket_address })
}

View File

@ -25,6 +25,7 @@ pub use coders::*;
pub use destination::*;
pub use operation_waiter::*;
pub use rpc_error::*;
pub use rpc_status::*;
use super::*;
use crate::crypto::*;
@ -64,8 +65,8 @@ struct RPCMessageHeaderDetailSafetyRouted {
struct RPCMessageHeaderDetailPrivateRouted {
/// The private route we received the rpc over
private_route: DHTKey,
// The safety selection for replying to this private routed rpc
safety_selection: SafetySelection,
// The safety spec for replying to this private routed rpc
safety_spec: SafetySpec,
}
#[derive(Debug, Clone)]
@ -807,7 +808,7 @@ impl RPCProcessor {
);
}
RPCMessageHeaderDetail::SafetyRouted(detail) => {
// If this was sent via a safety route, but no received over our private route, don't respond with a safety route,
// If this was sent via a safety route, but not received over our private route, don't respond with a safety route,
// it would give away which safety routes belong to this node
NetworkResult::value(Destination::private_route(
pr.clone(),
@ -818,7 +819,7 @@ impl RPCProcessor {
// If this was received over our private route, it's okay to respond to a private route via our safety route
NetworkResult::value(Destination::private_route(
pr.clone(),
detail.safety_selection.clone(),
SafetySelection::Safe(detail.safety_spec.clone()),
))
}
}
@ -1067,19 +1068,15 @@ impl RPCProcessor {
#[instrument(level = "trace", skip(self, body), err)]
pub fn enqueue_safety_routed_message(
&self, xxx keep pushing this through
private_route: DHTKey,
safety_selection: SafetySelection,
&self,
sequencing: Sequencing,
body: Vec<u8>,
) -> EyreResult<()> {
let msg = RPCMessageEncoded {
header: RPCMessageHeader {
detail: RPCMessageHeaderDetail::PrivateRouted(
RPCMessageHeaderDetailPrivateRouted {
private_route,
safety_selection,
},
),
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
sequencing,
}),
timestamp: intf::get_timestamp(),
body_len: body.len() as u64,
},
@ -1100,7 +1097,7 @@ impl RPCProcessor {
pub fn enqueue_private_routed_message(
&self,
private_route: DHTKey,
safety_selection: SafetySelection,
safety_spec: SafetySpec,
body: Vec<u8>,
) -> EyreResult<()> {
let msg = RPCMessageEncoded {
@ -1108,7 +1105,7 @@ impl RPCProcessor {
detail: RPCMessageHeaderDetail::PrivateRouted(
RPCMessageHeaderDetailPrivateRouted {
private_route,
safety_selection,
safety_spec,
},
),
timestamp: intf::get_timestamp(),

View File

@ -68,21 +68,14 @@ impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
// Ensure this never came over a private route
match msg.header.detail {
RPCMessageHeaderDetail::Direct(_) => todo!(),
RPCMessageHeaderDetail::PrivateRouted(_) => todo!(),
}
if matches!(
dest,
Destination::PrivateRoute {
private_route: _,
safety_selection: _
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol(
"not processing find node request over private route",
))
}
) {
return Err(RPCError::internal(
"Never send find node requests over private routes",
));
}
// Get the question

View File

@ -33,7 +33,7 @@ impl RPCProcessor {
pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> {
let detail = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::PrivateRouted(_) => {
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol("node_info_update must be direct"));
}
};

View File

@ -42,6 +42,13 @@ impl RPCProcessor {
.await => {}
);
}
RPCMessageHeaderDetail::SafetyRouted(_) => {
network_result_value_or_log!(debug
network_manager
.handle_safety_receipt(receipt)
.await => {}
);
}
RPCMessageHeaderDetail::PrivateRouted(detail) => {
network_result_value_or_log!(debug
network_manager

View File

@ -148,8 +148,102 @@ impl RPCProcessor {
Ok(())
}
/// Process a routed operation that came in over a safety route but no private route
///
/// Note: it is important that we never respond with a safety route to questions that come
/// in without a private route. Giving away a safety route when the node id is known is
/// a privacy violation!
#[instrument(level = "trace", skip_all, err)]
async fn process_routed_operation(
fn process_safety_routed_operation(
&self,
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
safety_route: &SafetyRoute,
) -> Result<(), RPCError> {
// Get sequencing preference
let sequencing = if detail
.connection_descriptor
.protocol_type()
.is_connection_oriented()
{
Sequencing::EnsureOrdered
} else {
Sequencing::NoPreference
};
// Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret)
// xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes?
let node_id_secret = self.routing_table.node_id_secret();
let dh_secret = self
.crypto
.cached_dh(&safety_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?;
let body = Crypto::decrypt_aead(
&routed_operation.data,
&routed_operation.nonce,
&dh_secret,
None,
)
.map_err(RPCError::map_internal(
"decryption of routed operation failed",
))?;
// Pass message to RPC system
self.enqueue_safety_routed_message(sequencing, body)
.map_err(RPCError::internal)?;
Ok(())
}
/// Process a routed operation that came in over both a safety route and a private route
#[instrument(level = "trace", skip_all, err)]
fn process_private_routed_operation(
&self,
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
safety_route: &SafetyRoute,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
// Get sender id
let sender_id = detail.envelope.get_sender_id();
// Look up the private route and ensure it's one in our spec store
let rss = self.routing_table.route_spec_store();
let (secret_key, safety_spec) = rss
.validate_signatures(
&private_route.public_key,
&routed_operation.signatures,
&routed_operation.data,
sender_id,
)
.map_err(RPCError::protocol)?
.ok_or_else(|| RPCError::protocol("signatures did not validate for private route"))?;
// Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret)
// xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes?
let dh_secret = self
.crypto
.cached_dh(&safety_route.public_key, &secret_key)
.map_err(RPCError::protocol)?;
let body = Crypto::decrypt_aead(
&routed_operation.data,
&routed_operation.nonce,
&dh_secret,
None,
)
.map_err(RPCError::map_internal(
"decryption of routed operation failed",
))?;
// Pass message to RPC system
self.enqueue_private_routed_message(private_route.public_key, safety_spec, body)
.map_err(RPCError::internal)?;
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
fn process_routed_operation(
&self,
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
@ -170,67 +264,18 @@ impl RPCProcessor {
// If the private route public key is our node id, then this was sent via safety route to our node directly
// so there will be no signatures to validate
let (secret_key, safety_selection) = if private_route.public_key
== self.routing_table.node_id()
{
if private_route.public_key == self.routing_table.node_id() {
// The private route was a stub
// Return our secret key and an appropriate safety selection
//
// Note: it is important that we never respond with a safety route to questions that come
// in without a private route. Giving away a safety route when the node id is known is
// a privacy violation!
// Get sequencing preference
let sequencing = if detail
.connection_descriptor
.protocol_type()
.is_connection_oriented()
{
Sequencing::EnsureOrdered
} else {
Sequencing::NoPreference
};
(
self.routing_table.node_id_secret(),
SafetySelection::Unsafe(sequencing),
)
self.process_safety_routed_operation(detail, routed_operation, safety_route)
} else {
// Get sender id
let sender_id = detail.envelope.get_sender_id();
// Look up the private route and ensure it's one in our spec store
let rss = self.routing_table.route_spec_store();
rss.validate_signatures(
&private_route.public_key,
&routed_operation.signatures,
&routed_operation.data,
sender_id,
// Both safety and private routes used, should reply with a safety route
self.process_private_routed_operation(
detail,
routed_operation,
safety_route,
private_route,
)
.map_err(RPCError::protocol)?
.ok_or_else(|| RPCError::protocol("signatures did not validate for private route"))?
};
// Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret)
// xxx: punish nodes that send messages that fail to decrypt eventually
let dh_secret = self
.crypto
.cached_dh(&safety_route.public_key, &secret_key)
.map_err(RPCError::protocol)?;
let body = Crypto::decrypt_aead(
&routed_operation.data,
&routed_operation.nonce,
&dh_secret,
None,
)
.map_err(RPCError::map_internal(
"decryption of routed operation failed",
))?;
// Pass message to RPC system
self.enqueue_private_routed_message(private_route.public_key, safety_selection, body)
.map_err(RPCError::internal)?;
Ok(())
}
}
#[instrument(level = "trace", skip(self, msg), err)]
@ -312,8 +357,7 @@ impl RPCProcessor {
route.operation,
&route.safety_route,
&private_route,
)
.await?;
)?;
}
} else if blob_tag == 0 {
// RouteHop
@ -383,8 +427,7 @@ impl RPCProcessor {
route.operation,
&route.safety_route,
private_route,
)
.await?;
)?;
}
}
}

View File

@ -2,13 +2,26 @@ use super::*;
impl RPCProcessor {
// Sends a unidirectional signal to a node
// Can be sent via all methods including relays and routes
// Can be sent via relays but not routes. For routed 'signal' like capabilities, use AppMessage.
#[instrument(level = "trace", skip(self), ret, err)]
pub async fn rpc_call_signal(
self,
dest: Destination,
signal_info: SignalInfo,
) -> Result<NetworkResult<()>, RPCError> {
// Ensure destination never has a private route
if matches!(
dest,
Destination::PrivateRoute {
private_route: _,
safety_selection: _
}
) {
return Err(RPCError::internal(
"Never send signal requests over private routes",
));
}
let signal = RPCOperationSignal { signal_info };
let statement = RPCStatement::new(RPCStatementDetail::Signal(signal));
@ -20,6 +33,15 @@ impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_signal(&self, msg: RPCMessage) -> Result<(), RPCError> {
// Can't allow anything other than direct packets here, as handling reverse connections
// or anything like via signals over private routes would deanonymize the route
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) => {}
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol("signal must be direct"));
}
};
// Get the statement
let signal = match msg.operation.into_kind() {
RPCOperationKind::Statement(s) => match s.into_detail() {

View File

@ -1,30 +1,79 @@
use super::*;
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
pub struct SenderInfo {
pub socket_address: SocketAddress,
}
impl RPCProcessor {
// Send StatusQ RPC request, receive StatusA answer
// Can be sent via relays, but not via routes
// Can be sent via relays or routes, but will have less information via routes
// sender:
// unsafe -> node status
// safe -> nothing
// receiver:
// direct -> node status + sender info
// safety -> node status
// private -> nothing
#[instrument(level = "trace", skip(self), ret, err)]
pub async fn rpc_call_status(
self,
peer: NodeRef,
) -> Result<NetworkResult<Answer<SenderInfo>>, RPCError> {
let routing_domain = match peer.best_routing_domain() {
Some(rd) => rd,
None => {
return Ok(NetworkResult::no_connection_other(
"no routing domain for peer",
))
dest: Destination,
) -> Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError> {
let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() {
SafetySelection::Unsafe(_) => {
let (opt_target_nr, routing_domain) = match &dest {
Destination::Direct {
target,
safety_selection: _,
} => {
let routing_domain = match target.best_routing_domain() {
Some(rd) => rd,
None => {
return Ok(NetworkResult::no_connection_other(
"no routing domain for target",
))
}
};
(Some(target.clone()), routing_domain)
}
Destination::Relay {
relay,
target,
safety_selection: _,
} => {
let opt_target_nr = self.routing_table.lookup_node_ref(*target);
let routing_domain = match relay.best_routing_domain() {
Some(rd) => rd,
None => {
return Ok(NetworkResult::no_connection_other(
"no routing domain for peer",
))
}
};
(opt_target_nr, routing_domain)
}
Destination::PrivateRoute {
private_route: _,
safety_selection: _,
} => (None, RoutingDomain::PublicInternet),
};
let node_status = Some(self.network_manager().generate_node_status(routing_domain));
(opt_target_nr, routing_domain, node_status)
}
SafetySelection::Safe(_) => {
let routing_domain = RoutingDomain::PublicInternet;
let node_status = None;
(None, routing_domain, node_status)
}
};
let node_status = self.network_manager().generate_node_status(routing_domain);
let status_q = RPCOperationStatusQ { node_status };
let question = RPCQuestion::new(RespondTo::Sender, RPCQuestionDetail::StatusQ(status_q));
// Send the info request
let waitable_reply = network_result_try!(
self.question(Destination::direct(peer.clone()), question)
.await?
);
let waitable_reply = network_result_try!(self.question(dest.clone(), question).await?);
// Note what kind of ping this was and to what peer scope
let send_data_kind = waitable_reply.send_data_kind;
@ -45,74 +94,90 @@ impl RPCProcessor {
};
// Ensure the returned node status is the kind for the routing domain we asked for
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(status_a.node_status, NodeStatus::PublicInternet(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(status_a.node_status, NodeStatus::LocalNetwork(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
if let Some(target_nr) = opt_target_nr {
if let Some(node_status) = status_a.node_status {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(node_status, NodeStatus::LocalNetwork(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
}
}
}
// Update latest node status in routing table
target_nr.update_node_status(node_status);
}
}
// Update latest node status in routing table
peer.update_node_status(status_a.node_status);
// Report sender_info IP addresses to network manager
// Don't need to validate these addresses for the current routing domain
// the address itself is irrelevant, and the remote node can lie anyway
if let Some(socket_address) = status_a.sender_info.socket_address {
match send_data_kind {
SendDataKind::Direct(connection_descriptor) => match routing_domain {
RoutingDomain::PublicInternet => self
.network_manager()
.report_public_internet_socket_address(
socket_address,
connection_descriptor,
peer,
),
RoutingDomain::LocalNetwork => {
self.network_manager().report_local_network_socket_address(
socket_address,
connection_descriptor,
peer,
)
let mut opt_sender_info = None;
match dest {
Destination::Direct {
target,
safety_selection,
} => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) {
if let Some(sender_info) = status_a.sender_info {
match send_data_kind {
SendDataKind::Direct(connection_descriptor) => {
// Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
// If this changes, we'd want to know about that to reset the networking stack
match routing_domain {
RoutingDomain::PublicInternet => self
.network_manager()
.report_public_internet_socket_address(
sender_info.socket_address,
connection_descriptor,
target,
),
RoutingDomain::LocalNetwork => {
self.network_manager().report_local_network_socket_address(
sender_info.socket_address,
connection_descriptor,
target,
)
}
}
opt_sender_info = Some(sender_info.clone());
}
SendDataKind::Indirect => {
// Do nothing in this case, as the socket address returned here would be for any node other than ours
}
SendDataKind::Existing(_) => {
// Do nothing in this case, as an existing connection could not have a different public address or it would have been reset
}
};
}
},
SendDataKind::Indirect => {
// Do nothing in this case, as the socket address returned here would be for any node other than ours
}
SendDataKind::Existing(_) => {
// Do nothing in this case, as an existing connection could not have a different public address or it would have been reset
}
}
}
Ok(NetworkResult::value(Answer::new(
latency,
status_a.sender_info,
)))
Destination::Relay {
relay: _,
target: _,
safety_selection: _,
}
| Destination::PrivateRoute {
private_route: _,
safety_selection: _,
} => {
// sender info is irrelevant over relays and routes
}
};
Ok(NetworkResult::value(Answer::new(latency, opt_sender_info)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
let detail = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol("status_q must be direct"));
}
};
let connection_descriptor = detail.connection_descriptor;
let routing_domain = detail.routing_domain;
// Get the question
let status_q = match msg.operation.kind() {
RPCOperationKind::Question(q) => match q.detail() {
@ -122,36 +187,55 @@ impl RPCProcessor {
_ => panic!("not a question"),
};
// Ensure the node status from the question is the kind for the routing domain we received the request in
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(status_q.node_status, NodeStatus::PublicInternet(_)) {
log_rpc!(debug "node status doesn't match PublicInternet routing domain");
return Ok(());
let (node_status, sender_info) = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
let connection_descriptor = detail.connection_descriptor;
let routing_domain = detail.routing_domain;
// Ensure the node status from the question is the kind for the routing domain we received the request in
if let Some(node_status) = &status_q.node_status {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
log_rpc!(debug "node status doesn't match PublicInternet routing domain");
return Ok(());
}
}
RoutingDomain::LocalNetwork => {
if !matches!(node_status, NodeStatus::LocalNetwork(_)) {
log_rpc!(debug "node status doesn't match LocalNetwork routing domain");
return Ok(());
}
}
}
// update node status for the requesting node to our routing table
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
// Update latest node status in routing table for the statusq sender
sender_nr.update_node_status(node_status.clone());
}
}
// Get the peer address in the returned sender info
let sender_info = SenderInfo {
socket_address: *connection_descriptor.remote_address(),
};
// Make status answer
let node_status = self.network_manager().generate_node_status(routing_domain);
(Some(node_status), Some(sender_info))
}
RoutingDomain::LocalNetwork => {
if !matches!(status_q.node_status, NodeStatus::LocalNetwork(_)) {
log_rpc!(debug "node status doesn't match LocalNetwork routing domain");
return Ok(());
}
RPCMessageHeaderDetail::SafetyRouted(_) => {
// Make status answer
let node_status = self
.network_manager()
.generate_node_status(RoutingDomain::PublicInternet);
(Some(node_status), None)
}
}
// update node status for the requesting node to our routing table
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
// Update latest node status in routing table for the statusq sender
sender_nr.update_node_status(status_q.node_status.clone());
}
// Make status answer
let node_status = self.network_manager().generate_node_status(routing_domain);
// Get the peer address in the returned sender info
let sender_info = SenderInfo {
socket_address: Some(*connection_descriptor.remote_address()),
RPCMessageHeaderDetail::PrivateRouted(_) => (None, None),
};
// Make status answer
let status_a = RPCOperationStatusA {
node_status,
sender_info,

View File

@ -35,7 +35,8 @@ impl RPCProcessor {
// Wait for receipt
match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => {
| ReceiptEvent::ReturnedInBand { inbound_noderef: _ }
| ReceiptEvent::ReturnedSafety => {
log_net!(debug "validate_dial_info receipt should be returned out-of-band".green());
Ok(false)
}

View File

@ -3,6 +3,7 @@
use super::*;
use routing_table::*;
use rpc_processor::*;
fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> {
if text == "dead" {
@ -397,7 +398,7 @@ impl VeilidAPI {
// Dump routing table entry
let out = match rpc
.rpc_call_status(nr)
.rpc_call_status(Destination::direct(nr))
.await
.map_err(VeilidAPIError::internal)?
{

View File

@ -366,11 +366,6 @@ impl BlockId {
/////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default, Serialize, Deserialize)]
pub struct SenderInfo {
pub socket_address: Option<SocketAddress>,
}
// Keep member order appropriate for sorting < preference
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub enum DialInfoClass {
@ -420,7 +415,7 @@ pub enum Stability {
Reliable,
}
/// The choice of safety route including in compiled routes
/// The choice of safety route to include in compiled routes
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum SafetySelection {
/// Don't use a safety route, only specify the sequencing preference