fix bug where messages sent to a private route without a safety route would not receive replies

fix verbose-tracing feature flag
improve route allocation to avoid co-located nodes
This commit is contained in:
Christien Rioux 2024-04-27 22:05:19 -04:00
parent 7c7ea4e3c7
commit bac12131c6
4 changed files with 90 additions and 57 deletions

View File

@ -215,6 +215,13 @@ impl RouteSpecStore {
) -> VeilidAPIResult<RouteId> {
use core::cmp::Ordering;
let ip6_prefix_size = rti
.unlocked_inner
.config
.get()
.network
.max_connections_per_ip6_prefix_size as usize;
if hop_count < 1 {
apibail_invalid_argument!(
"Not allocating route less than one hop in length",
@ -286,6 +293,39 @@ impl RouteSpecStore {
return false;
};
// Exclude nodes on our same ipblock, or their relay is on our same ipblock
// or our relay is on their ipblock, or their relay is on our relays same ipblock
// our node vs their node
if our_peer_info
.signed_node_info()
.node_info()
.node_is_on_same_ipblock(sni.node_info(), ip6_prefix_size)
{
return false;
}
if let Some(rni) = sni.relay_info() {
// our node vs their relay
if our_peer_info
.signed_node_info()
.node_info()
.node_is_on_same_ipblock(rni, ip6_prefix_size)
{
return false;
}
if let Some(our_rni) = our_peer_info.signed_node_info().relay_info() {
// our relay vs their relay
if our_rni.node_is_on_same_ipblock(rni, ip6_prefix_size) {
return false;
}
}
} else if let Some(our_rni) = our_peer_info.signed_node_info().relay_info() {
// our relay vs their node
if our_rni.node_is_on_same_ipblock(sni.node_info(), ip6_prefix_size) {
return false;
}
}
// Relay check
let relay_ids = sni.relay_ids();
if !relay_ids.is_empty() {
@ -1527,7 +1567,7 @@ impl RouteSpecStore {
/// Returns a route set id
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, blob), ret, err)
instrument(level = "trace", skip(self), ret, err)
)]
pub fn add_remote_private_route(
&self,

View File

@ -619,9 +619,15 @@ impl RPCProcessor {
// Ensure the reply comes over the private route that was requested
if let Some(reply_private_route) = waitable_reply.reply_private_route {
match &rpcreader.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {
return Err(RPCError::protocol("should have received reply over private route"));
}
RPCMessageHeaderDetail::Direct(_) => {
return Err(RPCError::protocol("should have received reply over private route or stub"));
},
RPCMessageHeaderDetail::SafetyRouted(sr) => {
let node_id = self.routing_table.node_id(sr.direct.envelope.get_crypto_kind());
if node_id.value != reply_private_route {
return Err(RPCError::protocol("should have received reply from safety route to a stub"));
}
},
RPCMessageHeaderDetail::PrivateRouted(pr) => {
if pr.private_route != reply_private_route {
return Err(RPCError::protocol("received reply over the wrong private route"));

View File

@ -22,9 +22,7 @@ impl RPCProcessor {
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, last_descriptor),
fields(ret.value.data.len,
ret.seqs,
ret.peers.len,
fields(ret.peers.len,
ret.latency
),err)
)]
@ -64,7 +62,8 @@ impl RPCProcessor {
);
// Send the inspectvalue question
let inspect_value_q = RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?;
let inspect_value_q =
RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?;
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?),
RPCQuestionDetail::InspectValueQ(Box::new(inspect_value_q)),
@ -107,19 +106,18 @@ impl RPCProcessor {
let debug_string_answer = format!(
"OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}",
key,
if descriptor.is_some() {
" +desc"
} else {
""
},
if descriptor.is_some() { " +desc" } else { "" },
peers.len(),
dest,
debug_seqs(&seqs)
);
log_dht!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
let peer_ids: Vec<String> = peers
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
log_dht!(debug "Peers: {:#?}", peer_ids);
}
@ -140,8 +138,6 @@ impl RPCProcessor {
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.latency", latency.as_u64());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.seqs", seqs);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.peers.len", peers.len());
Ok(NetworkResult::value(Answer::new(
@ -158,11 +154,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_inspect_value_q(
&self,
msg: RPCMessage,
) -> RPCNetworkResult<()> {
pub(crate) async fn process_inspect_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
@ -175,14 +167,8 @@ impl RPCProcessor {
// Ignore if disabled
let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain());
if !opi
.signed_node_info()
.node_info()
.has_capability(CAP_DHT)
{
return Ok(NetworkResult::service_unavailable(
"dht is not available",
));
if !opi.signed_node_info().node_info().has_capability(CAP_DHT) {
return Ok(NetworkResult::service_unavailable("dht is not available"));
}
// Get the question
@ -200,18 +186,16 @@ impl RPCProcessor {
// Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
let closer_to_key_peers = network_result_try!(
routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])
);
if debug_target_enabled!("dht") {
let debug_string = format!(
"IN <=== InspectValueQ({} {}{}) <== {}",
key,
subkeys,
if want_descriptor {
" +wantdesc"
} else {
""
},
if want_descriptor { " +wantdesc" } else { "" },
msg.header.direct_sender_node_id()
);
@ -223,20 +207,21 @@ impl RPCProcessor {
let c = self.config.get();
c.network.dht.set_value_count as usize
};
let (inspect_result_seqs, inspect_result_descriptor) = if closer_to_key_peers.len() >= set_value_count {
// Not close enough
(Vec::new(), None)
} else {
// Close enough, lets get it
let (inspect_result_seqs, inspect_result_descriptor) =
if closer_to_key_peers.len() >= set_value_count {
// Not close enough
(Vec::new(), None)
} else {
// Close enough, lets get it
// See if we have this record ourselves
let storage_manager = self.storage_manager();
let inspect_result = network_result_try!(storage_manager
.inbound_inspect_value(key, subkeys, want_descriptor)
.await
.map_err(RPCError::internal)?);
(inspect_result.seqs, inspect_result.opt_descriptor)
};
// See if we have this record ourselves
let storage_manager = self.storage_manager();
let inspect_result = network_result_try!(storage_manager
.inbound_inspect_value(key, subkeys, want_descriptor)
.await
.map_err(RPCError::internal)?);
(inspect_result.seqs, inspect_result.opt_descriptor)
};
if debug_target_enabled!("dht") {
let debug_string_answer = format!(
@ -251,10 +236,10 @@ impl RPCProcessor {
closer_to_key_peers.len(),
msg.header.direct_sender_node_id()
);
log_dht!(debug "{}", debug_string_answer);
}
// Make InspectValue answer
let inspect_value_a = RPCOperationInspectValueA::new(
inspect_result_seqs,
@ -263,7 +248,10 @@ impl RPCProcessor {
)?;
// Send InspectValue answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a))))
.await
self.answer(
msg,
RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a))),
)
.await
}
}

View File

@ -1,12 +1,11 @@
use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))]
// Sends a high level app message
// Can be sent via all methods including relays and routes
// Sends a dht value change notification
// Can be sent via all methods including relays and routes but never over a safety route
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err)
instrument(level = "trace", skip(self, value), err)
)]
pub async fn rpc_call_value_changed(
self,