mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-03-26 15:48:13 -04:00
unify routing domain handling code, fix edge case generating sender peer info
This commit is contained in:
parent
38f4ff66de
commit
b37e2cc3c9
@ -1090,7 +1090,7 @@ impl NetworkManager {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Cache the envelope information in the routing table
|
// Cache the envelope information in the routing table
|
||||||
let source_noderef = match routing_table.register_node_with_existing_connection(
|
let mut source_noderef = match routing_table.register_node_with_existing_connection(
|
||||||
envelope.get_sender_typed_id(),
|
envelope.get_sender_typed_id(),
|
||||||
flow,
|
flow,
|
||||||
ts,
|
ts,
|
||||||
@ -1104,6 +1104,9 @@ impl NetworkManager {
|
|||||||
};
|
};
|
||||||
source_noderef.add_envelope_version(envelope.get_version());
|
source_noderef.add_envelope_version(envelope.get_version());
|
||||||
|
|
||||||
|
// Enforce routing domain
|
||||||
|
source_noderef.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain));
|
||||||
|
|
||||||
// Pass message to RPC system
|
// Pass message to RPC system
|
||||||
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)?;
|
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)?;
|
||||||
|
|
||||||
|
@ -1056,6 +1056,11 @@ impl RouteSpecStore {
|
|||||||
// Set sequencing requirement
|
// Set sequencing requirement
|
||||||
first_hop.set_sequencing(sequencing);
|
first_hop.set_sequencing(sequencing);
|
||||||
|
|
||||||
|
// Enforce the routing domain
|
||||||
|
first_hop.merge_filter(
|
||||||
|
NodeRefFilter::new().with_routing_domain(RoutingDomain::PublicInternet),
|
||||||
|
);
|
||||||
|
|
||||||
// Return the compiled safety route
|
// Return the compiled safety route
|
||||||
//info!("compile_safety_route profile (stub): {} us", (get_timestamp() - profile_start_ts));
|
//info!("compile_safety_route profile (stub): {} us", (get_timestamp() - profile_start_ts));
|
||||||
return Ok(CompiledRoute {
|
return Ok(CompiledRoute {
|
||||||
@ -1113,6 +1118,10 @@ impl RouteSpecStore {
|
|||||||
// Ensure sequencing requirement is set on first hop
|
// Ensure sequencing requirement is set on first hop
|
||||||
first_hop.set_sequencing(safety_spec.sequencing);
|
first_hop.set_sequencing(safety_spec.sequencing);
|
||||||
|
|
||||||
|
// Enforce the routing domain
|
||||||
|
first_hop
|
||||||
|
.merge_filter(NodeRefFilter::new().with_routing_domain(RoutingDomain::PublicInternet));
|
||||||
|
|
||||||
// Get the safety route secret key
|
// Get the safety route secret key
|
||||||
let secret = safety_rsd.secret_key;
|
let secret = safety_rsd.secret_key;
|
||||||
|
|
||||||
|
@ -28,6 +28,14 @@ pub(crate) enum Destination {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Routing configuration for destination
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct UnsafeRoutingInfo {
|
||||||
|
pub opt_node: Option<NodeRef>,
|
||||||
|
pub opt_relay: Option<NodeRef>,
|
||||||
|
pub opt_routing_domain: Option<RoutingDomain>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Destination {
|
impl Destination {
|
||||||
pub fn node(&self) -> Option<NodeRef> {
|
pub fn node(&self) -> Option<NodeRef> {
|
||||||
match self {
|
match self {
|
||||||
@ -138,6 +146,81 @@ impl Destination {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_unsafe_routing_info(
|
||||||
|
&self,
|
||||||
|
routing_table: RoutingTable,
|
||||||
|
) -> Option<UnsafeRoutingInfo> {
|
||||||
|
// If there's a safety route in use, the safety route will be responsible for the routing
|
||||||
|
match self.get_safety_selection() {
|
||||||
|
SafetySelection::Unsafe(_) => {}
|
||||||
|
SafetySelection::Safe(_) => {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get:
|
||||||
|
// * The target node (possibly relayed)
|
||||||
|
// * The routing domain we are sending to if we can determine it
|
||||||
|
let (opt_node, opt_relay, opt_routing_domain) = match self {
|
||||||
|
Destination::Direct {
|
||||||
|
node,
|
||||||
|
safety_selection: _,
|
||||||
|
} => {
|
||||||
|
let opt_routing_domain = node.best_routing_domain();
|
||||||
|
if opt_routing_domain.is_none() {
|
||||||
|
// No routing domain for target, no node info
|
||||||
|
// Only a stale connection or no connection exists
|
||||||
|
log_rpc!(debug "No routing domain for node: node={}", node);
|
||||||
|
};
|
||||||
|
(Some(node.clone()), None, opt_routing_domain)
|
||||||
|
}
|
||||||
|
Destination::Relay {
|
||||||
|
relay,
|
||||||
|
node,
|
||||||
|
safety_selection: _,
|
||||||
|
} => {
|
||||||
|
// Outbound relays are defined as routing to and from PublicInternet only right now
|
||||||
|
|
||||||
|
// Resolve the relay for this target's routing domain and see if it matches this relay
|
||||||
|
let mut opt_routing_domain = None;
|
||||||
|
for target_rd in node.routing_domain_set() {
|
||||||
|
// Check out inbound/outbound relay to match routing domain
|
||||||
|
if let Some(relay_node) = routing_table.relay_node(target_rd) {
|
||||||
|
if relay.same_entry(&relay_node) {
|
||||||
|
// Relay for this destination is one of our routing domain relays (our inbound or outbound)
|
||||||
|
opt_routing_domain = Some(target_rd);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check remote node's published relay to see if that who is relaying
|
||||||
|
if let Some(target_relay) = node.relay(target_rd).ok().flatten() {
|
||||||
|
if relay.same_entry(&target_relay) {
|
||||||
|
// Relay for this destination is one of its published relays
|
||||||
|
opt_routing_domain = Some(target_rd);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if opt_routing_domain.is_none() {
|
||||||
|
// In the case of an unexpected relay, log it and don't pass any sender peer info into an unexpected relay
|
||||||
|
log_rpc!(debug "Unexpected relay used for node: relay={}, node={}", relay, node);
|
||||||
|
};
|
||||||
|
|
||||||
|
(Some(node.clone()), Some(relay.clone()), opt_routing_domain)
|
||||||
|
}
|
||||||
|
Destination::PrivateRoute {
|
||||||
|
private_route: _,
|
||||||
|
safety_selection: _,
|
||||||
|
} => (None, None, Some(RoutingDomain::PublicInternet)),
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(UnsafeRoutingInfo {
|
||||||
|
opt_node,
|
||||||
|
opt_relay,
|
||||||
|
opt_routing_domain,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for Destination {
|
impl fmt::Display for Destination {
|
||||||
|
@ -53,11 +53,13 @@ use storage_manager::*;
|
|||||||
struct RPCMessageHeaderDetailDirect {
|
struct RPCMessageHeaderDetailDirect {
|
||||||
/// The decoded header of the envelope
|
/// The decoded header of the envelope
|
||||||
envelope: Envelope,
|
envelope: Envelope,
|
||||||
/// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it
|
/// The noderef of the peer that sent the message (not the original sender).
|
||||||
|
/// Ensures node doesn't get evicted from routing table until we're done with it
|
||||||
|
/// Should be filted to the routing domain of the peer that we received from
|
||||||
peer_noderef: NodeRef,
|
peer_noderef: NodeRef,
|
||||||
/// The flow from the peer sent the message (not the original sender)
|
/// The flow from the peer sent the message (not the original sender)
|
||||||
flow: Flow,
|
flow: Flow,
|
||||||
/// The routing domain the message was sent through
|
/// The routing domain of the peer that we received from
|
||||||
routing_domain: RoutingDomain,
|
routing_domain: RoutingDomain,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -869,51 +871,36 @@ impl RPCProcessor {
|
|||||||
// Don't do this if the sender is to remain private
|
// Don't do this if the sender is to remain private
|
||||||
// Otherwise we would be attaching the original sender's identity to the final destination,
|
// Otherwise we would be attaching the original sender's identity to the final destination,
|
||||||
// thus defeating the purpose of the safety route entirely :P
|
// thus defeating the purpose of the safety route entirely :P
|
||||||
match dest.get_safety_selection() {
|
let Some(UnsafeRoutingInfo {
|
||||||
SafetySelection::Unsafe(_) => {}
|
opt_node, opt_relay: _, opt_routing_domain
|
||||||
SafetySelection::Safe(_) => {
|
}) = dest.get_unsafe_routing_info(self.routing_table.clone()) else {
|
||||||
return SenderPeerInfo::default();
|
return SenderPeerInfo::default();
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the target we're sending to
|
|
||||||
let routing_table = self.routing_table();
|
|
||||||
let target = match dest {
|
|
||||||
Destination::Direct {
|
|
||||||
node: target,
|
|
||||||
safety_selection: _,
|
|
||||||
} => target.clone(),
|
|
||||||
Destination::Relay {
|
|
||||||
relay: _,
|
|
||||||
node: target,
|
|
||||||
safety_selection: _,
|
|
||||||
} => target.clone(),
|
|
||||||
Destination::PrivateRoute {
|
|
||||||
private_route: _,
|
|
||||||
safety_selection: _,
|
|
||||||
} => {
|
|
||||||
return SenderPeerInfo::default();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
let Some(node) = opt_node else {
|
||||||
let Some(routing_domain) = target.best_routing_domain() else {
|
// If this is going over a private route, don't bother sending any sender peer info
|
||||||
|
// The other side won't accept it because peer info sent over a private route
|
||||||
|
// could be used to deanonymize the private route's endpoint
|
||||||
|
return SenderPeerInfo::default();
|
||||||
|
};
|
||||||
|
let Some(routing_domain) = opt_routing_domain else {
|
||||||
// No routing domain for target, no node info
|
// No routing domain for target, no node info
|
||||||
// Only a stale connection or no connection exists
|
// Only a stale connection or no connection exists
|
||||||
return SenderPeerInfo::default();
|
return SenderPeerInfo::default();
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get the target's node info timestamp
|
// Get the target's node info timestamp
|
||||||
let target_node_info_ts = target.node_info_ts(routing_domain);
|
let target_node_info_ts = node.node_info_ts(routing_domain);
|
||||||
|
|
||||||
// Return whatever peer info we have even if the network class is not yet valid
|
// Return whatever peer info we have even if the network class is not yet valid
|
||||||
// That away we overwrite any prior existing valid-network-class nodeinfo in the remote routing table
|
// That away we overwrite any prior existing valid-network-class nodeinfo in the remote routing table
|
||||||
|
let routing_table = self.routing_table();
|
||||||
let own_peer_info = routing_table.get_own_peer_info(routing_domain);
|
let own_peer_info = routing_table.get_own_peer_info(routing_domain);
|
||||||
|
|
||||||
// Get our node info timestamp
|
// Get our node info timestamp
|
||||||
let our_node_info_ts = own_peer_info.signed_node_info().timestamp();
|
let our_node_info_ts = own_peer_info.signed_node_info().timestamp();
|
||||||
|
|
||||||
// If the target has seen our node info already don't send it again
|
// If the target has seen our node info already don't send it again
|
||||||
if target.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) {
|
if node.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) {
|
||||||
return SenderPeerInfo::new_no_peer_info(target_node_info_ts);
|
return SenderPeerInfo::new_no_peer_info(target_node_info_ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1358,6 +1345,7 @@ impl RPCProcessor {
|
|||||||
request: RPCMessage,
|
request: RPCMessage,
|
||||||
answer: RPCAnswer,
|
answer: RPCAnswer,
|
||||||
) ->RPCNetworkResult<()> {
|
) ->RPCNetworkResult<()> {
|
||||||
|
|
||||||
// Extract destination from respond_to
|
// Extract destination from respond_to
|
||||||
let dest = network_result_try!(self.get_respond_to_destination(&request));
|
let dest = network_result_try!(self.get_respond_to_destination(&request));
|
||||||
|
|
||||||
|
@ -23,81 +23,38 @@ impl RPCProcessor {
|
|||||||
self,
|
self,
|
||||||
dest: Destination,
|
dest: Destination,
|
||||||
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
|
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
|
||||||
let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() {
|
// Determine routing domain and node status to send
|
||||||
SafetySelection::Unsafe(_) => {
|
let (opt_target_nr, routing_domain, node_status) = if let Some(UnsafeRoutingInfo {
|
||||||
let (opt_target_nr, routing_domain) = match &dest {
|
opt_node,
|
||||||
Destination::Direct {
|
opt_relay,
|
||||||
node: target,
|
opt_routing_domain,
|
||||||
safety_selection: _,
|
}) =
|
||||||
} => {
|
dest.get_unsafe_routing_info(self.routing_table.clone())
|
||||||
let routing_domain = match target.best_routing_domain() {
|
{
|
||||||
Some(rd) => rd,
|
let Some(routing_domain) = opt_routing_domain else {
|
||||||
None => {
|
// Because this exits before calling 'question()',
|
||||||
// Because this exits before calling 'question()',
|
// a failure to find a routing domain constitutes a send failure
|
||||||
// a failure to find a routing domain constitutes a send failure
|
// Record the send failure on both the node and its relay
|
||||||
let send_ts = get_aligned_timestamp();
|
let send_ts = get_aligned_timestamp();
|
||||||
self.record_send_failure(
|
if let Some(node) = &opt_node {
|
||||||
RPCKind::Question,
|
self.record_send_failure(RPCKind::Question, send_ts, node.clone(), None, None);
|
||||||
send_ts,
|
}
|
||||||
target.clone(),
|
if let Some(relay) = &opt_relay {
|
||||||
None,
|
self.record_send_failure(RPCKind::Question, send_ts, relay.clone(), None, None);
|
||||||
None,
|
}
|
||||||
);
|
return Ok(NetworkResult::no_connection_other(
|
||||||
return Ok(NetworkResult::no_connection_other(
|
"no routing domain for target",
|
||||||
"no routing domain for target",
|
));
|
||||||
));
|
};
|
||||||
}
|
|
||||||
};
|
|
||||||
(Some(target.clone()), routing_domain)
|
|
||||||
}
|
|
||||||
Destination::Relay {
|
|
||||||
relay,
|
|
||||||
node: target,
|
|
||||||
safety_selection: _,
|
|
||||||
} => {
|
|
||||||
let routing_domain = match relay.best_routing_domain() {
|
|
||||||
Some(rd) => rd,
|
|
||||||
None => {
|
|
||||||
// Because this exits before calling 'question()',
|
|
||||||
// a failure to find a routing domain constitutes a send failure for both the target and its relay
|
|
||||||
let send_ts = get_aligned_timestamp();
|
|
||||||
self.record_send_failure(
|
|
||||||
RPCKind::Question,
|
|
||||||
send_ts,
|
|
||||||
relay.clone(),
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
self.record_send_failure(
|
|
||||||
RPCKind::Question,
|
|
||||||
send_ts,
|
|
||||||
target.clone(),
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
return Ok(NetworkResult::no_connection_other(
|
|
||||||
"no routing domain for peer",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
(Some(target.clone()), routing_domain)
|
|
||||||
}
|
|
||||||
Destination::PrivateRoute {
|
|
||||||
private_route: _,
|
|
||||||
safety_selection: _,
|
|
||||||
} => (None, RoutingDomain::PublicInternet),
|
|
||||||
};
|
|
||||||
|
|
||||||
let node_status = Some(self.network_manager().generate_node_status(routing_domain));
|
let node_status = Some(self.network_manager().generate_node_status(routing_domain));
|
||||||
(opt_target_nr, routing_domain, node_status)
|
(opt_node, routing_domain, node_status)
|
||||||
}
|
} else {
|
||||||
SafetySelection::Safe(_) => {
|
// Safety route means we don't exchange node status and things are all PublicInternet RoutingDomain
|
||||||
let routing_domain = RoutingDomain::PublicInternet;
|
(None, RoutingDomain::PublicInternet, None)
|
||||||
let node_status = None;
|
|
||||||
(None, routing_domain, node_status)
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Create status rpc question
|
||||||
let status_q = RPCOperationStatusQ::new(node_status);
|
let status_q = RPCOperationStatusQ::new(node_status);
|
||||||
let question = RPCQuestion::new(
|
let question = RPCQuestion::new(
|
||||||
network_result_try!(self.get_destination_respond_to(&dest)?),
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
|
@ -416,7 +416,7 @@ impl RoutingContext {
|
|||||||
/// This is useful for checking if you should push new subkeys to the network, or retrieve the current state of a record from the network
|
/// This is useful for checking if you should push new subkeys to the network, or retrieve the current state of a record from the network
|
||||||
/// to see what needs updating locally.
|
/// to see what needs updating locally.
|
||||||
///
|
///
|
||||||
/// * `key` is the record key to watch. it must first be opened for reading or writing.
|
/// * `key` is the record key to inspect. it must first be opened for reading or writing.
|
||||||
/// * `subkeys` is the the range of subkeys to inspect. The range must not exceed 512 discrete non-overlapping or adjacent subranges.
|
/// * `subkeys` is the the range of subkeys to inspect. The range must not exceed 512 discrete non-overlapping or adjacent subranges.
|
||||||
/// If no range is specified, this is equivalent to inspecting the entire range of subkeys. In total, the list of subkeys returned will be truncated at 512 elements.
|
/// If no range is specified, this is equivalent to inspecting the entire range of subkeys. In total, the list of subkeys returned will be truncated at 512 elements.
|
||||||
/// * `scope` is what kind of range the inspection has:
|
/// * `scope` is what kind of range the inspection has:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user