checkpoint

This commit is contained in:
Christien Rioux 2024-07-24 13:52:47 -04:00
parent e759e50983
commit abc67f9606
33 changed files with 689 additions and 261 deletions

View File

@ -781,6 +781,7 @@ impl NetworkManager {
// Add the peer info to our routing table
let mut peer_nr = match routing_table.register_node_with_peer_info(
RoutingDomain::PublicInternet,
SafetyDomainSet::all(),
peer_info,
false,
) {
@ -810,6 +811,7 @@ impl NetworkManager {
// Add the peer info to our routing table
let mut peer_nr = match routing_table.register_node_with_peer_info(
RoutingDomain::PublicInternet,
SafetyDomainSet::all(),
peer_info,
false,
) {
@ -906,12 +908,15 @@ impl NetworkManager {
}
/// Called by the RPC handler when we want to issue an RPC request or response
/// safety_domain is used to determine if this is being sent in an unsafe context
/// and should reject attempts to send to safety-only nodes
/// node_ref is the direct destination to which the envelope will be sent
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
/// which will cause the envelope to be relayed
#[instrument(level = "trace", target = "net", skip_all)]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
safety_domain: SafetyDomain,
node_ref: NodeRef,
destination_node_ref: Option<NodeRef>,
body: B,
@ -947,7 +952,7 @@ impl NetworkManager {
}
// Send the envelope via whatever means necessary
self.send_data(node_ref, out).await
self.send_data(safety_domain, node_ref, out).await
}
/// Called by the RPC handler when we want to issue an direct receipt
@ -1141,9 +1146,10 @@ impl NetworkManager {
};
// Relay the packet to the desired destination
// Relayed packets are never received over a safety route so they are implicitly
// in the SafetyDomain::Unsafe
log_net!("relaying {} bytes to {}", data.len(), relay_nr);
network_result_value_or_log!(match self.send_data(relay_nr, data.to_vec())
network_result_value_or_log!(match self.send_data(SafetyDomain::Unsafe, relay_nr, data.to_vec())
.await {
Ok(v) => v,
Err(e) => {

View File

@ -15,10 +15,14 @@ impl NetworkManager {
#[instrument(level="trace", target="net", skip_all, err)]
pub(crate) async fn send_data(
&self,
safety_domain: SafetyDomain,
destination_node_ref: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last flow we've seen this peer on
// If we have an existing flow, then we do not need to check the safety domain
// as a connection has already been established
let data = if let Some(flow) = destination_node_ref.last_flow() {
match self
.net()
@ -52,11 +56,12 @@ impl NetworkManager {
// Get the best way to contact this node
let possibly_relayed_contact_method = self.get_node_contact_method(destination_node_ref.clone())?;
self.try_possibly_relayed_contact_method(possibly_relayed_contact_method, destination_node_ref, data).await
self.try_possibly_relayed_contact_method(safety_domain, possibly_relayed_contact_method, destination_node_ref, data).await
}
#[instrument(level="trace", target="net", skip_all)]
pub(crate) fn try_possibly_relayed_contact_method(&self,
safety_domain: SafetyDomain,
possibly_relayed_contact_method: NodeContactMethod,
destination_node_ref: NodeRef,
data: Vec<u8>,
@ -70,6 +75,7 @@ impl NetworkManager {
NodeContactMethod::OutboundRelay(relay_nr)
| NodeContactMethod::InboundRelay(relay_nr) => {
let cm = this.get_node_contact_method(relay_nr.clone())?;
(cm, relay_nr, Some(possibly_relayed_contact_method))
}
cm => (cm, destination_node_ref.clone(), None),
@ -103,40 +109,61 @@ impl NetworkManager {
);
}
NodeContactMethod::Direct(dial_info) => {
// Ensure we're not sending in an unsafe context to a safety-only node
if !target_node_ref.safety_domains().contains(safety_domain) {
bail!("should not be sending direct to invalid safety domain: target={}", target_node_ref);
}
network_result_try!(
this.send_data_ncm_direct(target_node_ref, dial_info, data).await?
)
}
NodeContactMethod::SignalReverse(relay_nr, target_node_ref) => {
NodeContactMethod::SignalReverse(relay_node_ref, target_node_ref) => {
// Ensure we're not sending in an unsafe context to a safety-only node
if !target_node_ref.safety_domains().contains(safety_domain) || !relay_node_ref.safety_domains().contains(safety_domain) {
bail!("should not be sending signal reverse to invalid safety domain: target={}, relay={}", target_node_ref, relay_node_ref);
}
let nres =
this.send_data_ncm_signal_reverse(relay_nr.clone(), target_node_ref.clone(), data.clone())
this.send_data_ncm_signal_reverse(relay_node_ref.clone(), target_node_ref.clone(), data.clone())
.await?;
if matches!(nres, NetworkResult::Timeout) {
// Failed to holepunch, fallback to inbound relay
log_network_result!(debug "Reverse connection failed to {}, falling back to inbound relay via {}", target_node_ref, relay_nr);
network_result_try!(this.try_possibly_relayed_contact_method(NodeContactMethod::InboundRelay(relay_nr), destination_node_ref, data).await?)
log_network_result!(debug "Reverse connection failed to {}, falling back to inbound relay via {}", target_node_ref, relay_node_ref);
network_result_try!(this.try_possibly_relayed_contact_method(safety_domain, NodeContactMethod::InboundRelay(relay_node_ref), destination_node_ref, data).await?)
} else {
network_result_try!(nres)
}
}
NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
NodeContactMethod::SignalHolePunch(relay_node_ref, target_node_ref) => {
// Ensure we're not sending in an unsafe context to a safety-only node
if !target_node_ref.safety_domains().contains(safety_domain) || !relay_node_ref.safety_domains().contains(safety_domain) {
bail!("should not be sending signal hole punch to invalid safety domain: target={}, relay={}", target_node_ref, relay_node_ref);
}
let nres =
this.send_data_ncm_signal_hole_punch(relay_nr.clone(), target_node_ref.clone(), data.clone())
this.send_data_ncm_signal_hole_punch(relay_node_ref.clone(), target_node_ref.clone(), data.clone())
.await?;
if matches!(nres, NetworkResult::Timeout) {
// Failed to holepunch, fallback to inbound relay
log_network_result!(debug "Hole punch failed to {}, falling back to inbound relay via {}", target_node_ref, relay_nr);
network_result_try!(this.try_possibly_relayed_contact_method(NodeContactMethod::InboundRelay(relay_nr), destination_node_ref, data).await?)
log_network_result!(debug "Hole punch failed to {}, falling back to inbound relay via {}", target_node_ref, relay_node_ref);
network_result_try!(this.try_possibly_relayed_contact_method(safety_domain, NodeContactMethod::InboundRelay(relay_node_ref), destination_node_ref, data).await?)
} else {
network_result_try!(nres)
}
}
NodeContactMethod::Existing => {
// If we have an existing flow, then we do not need to check the safety domain
// as a connection has already been established
network_result_try!(
this.send_data_ncm_existing(target_node_ref, data).await?
)
}
NodeContactMethod::Unreachable => {
// If we have no way of reaching this node, try a last ditch effort to reach if over an existing
// incoming connection
network_result_try!(
this.send_data_ncm_unreachable(target_node_ref, data)
.await?

View File

@ -149,6 +149,10 @@ pub(crate) struct BucketEntryInner {
/// If the entry is being punished and should be considered dead
#[serde(skip)]
punishment: Option<PunishmentReason>,
/// If this node is seen in an unsafe context it is safe to contact
/// unsafely and use in the construction of safety routes
/// This tracks what domains this peer is safe to be contacted in
safety_domains: SafetyDomainSet,
/// Tracking identifier for NodeRef debugging
#[cfg(feature = "tracking")]
#[serde(skip)]
@ -406,6 +410,14 @@ impl BucketEntryInner {
!last_flows.is_empty()
}
pub fn safety_domains(&self) -> SafetyDomainSet {
self.safety_domains
}
pub fn set_safety_domains<S: Into<SafetyDomainSet>>(&mut self, s: S) {
self.safety_domains = s.into();
}
pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> {
let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
@ -972,6 +984,7 @@ impl BucketEntry {
latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(),
punishment: None,
safety_domains: SafetyDomainSet::empty(),
#[cfg(feature = "tracking")]
next_track_id: 0,
#[cfg(feature = "tracking")]

View File

@ -51,7 +51,7 @@ impl RoutingTable {
c.network.dht.max_find_node_count as usize
};
let closest_nodes = match self.find_preferred_closest_nodes(
let closest_nodes = match self.find_preferred_closest_unsafe_nodes(
node_count,
key,
filters,
@ -132,7 +132,7 @@ impl RoutingTable {
};
//
let closest_nodes = match self.find_preferred_closest_nodes(
let closest_nodes = match self.find_preferred_closest_unsafe_nodes(
node_count,
key,
filters,

View File

@ -653,27 +653,39 @@ impl RoutingTable {
}
/// Resolve an existing routing table entry using any crypto kind and return a reference to it
pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult<Option<NodeRef>> {
pub fn lookup_any_node_ref(
&self,
safety_domain: SafetyDomain,
node_id_key: PublicKey,
) -> EyreResult<Option<NodeRef>> {
self.inner
.read()
.lookup_any_node_ref(self.clone(), node_id_key)
.lookup_any_node_ref(self.clone(), safety_domain, node_id_key)
}
/// Resolve an existing routing table entry and return a reference to it
pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult<Option<NodeRef>> {
self.inner.read().lookup_node_ref(self.clone(), node_id)
pub fn lookup_node_ref(
&self,
safety_domain: SafetyDomain,
node_id: TypedKey,
) -> EyreResult<Option<NodeRef>> {
self.inner
.read()
.lookup_node_ref(self.clone(), safety_domain, node_id)
}
/// Resolve an existing routing table entry and return a filtered reference to it
#[instrument(level = "trace", skip_all)]
pub fn lookup_and_filter_noderef(
&self,
safety_domain: SafetyDomain,
node_id: TypedKey,
routing_domain_set: RoutingDomainSet,
dial_info_filter: DialInfoFilter,
) -> EyreResult<Option<NodeRef>> {
self.inner.read().lookup_and_filter_noderef(
self.clone(),
safety_domain,
node_id,
routing_domain_set,
dial_info_filter,
@ -687,12 +699,14 @@ impl RoutingTable {
pub fn register_node_with_peer_info(
&self,
routing_domain: RoutingDomain,
safety_domain: SafetyDomain,
peer_info: PeerInfo,
allow_invalid: bool,
) -> EyreResult<NodeRef> {
self.inner.write().register_node_with_peer_info(
self.clone(),
routing_domain,
safety_domain,
peer_info,
allow_invalid,
)
@ -700,6 +714,8 @@ impl RoutingTable {
/// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the last peer address we have for it, since that's pretty common
/// This always gets added to the SafetyDomain::Unsafe because direct connections
/// are inherently Unsafe.
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_existing_connection(
&self,
@ -764,12 +780,15 @@ impl RoutingTable {
pub fn clear_punishments(&self) {
let cur_ts = get_aligned_timestamp();
self.inner
.write()
.with_entries_mut(cur_ts, BucketEntryState::Punished, |rti, e| {
self.inner.write().with_entries_mut(
cur_ts,
SafetyDomainSet::all(),
BucketEntryState::Punished,
|rti, e| {
e.with_mut(rti, |_rti, ei| ei.set_punished(None));
Option::<()>::None
});
},
);
}
//////////////////////////////////////////////////////////////////////
@ -949,7 +968,7 @@ impl RoutingTable {
let filters = VecDeque::from([filter]);
self.find_preferred_fastest_nodes(
self.find_preferred_fastest_unsafe_nodes(
protocol_types_len * 2 * max_per_type,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
@ -979,7 +998,7 @@ impl RoutingTable {
out
}
pub fn find_preferred_fastest_nodes<'a, T, O>(
pub fn find_preferred_fastest_unsafe_nodes<'a, T, O>(
&self,
node_count: usize,
filters: VecDeque<RoutingTableEntryFilter>,
@ -990,10 +1009,10 @@ impl RoutingTable {
{
self.inner
.read()
.find_preferred_fastest_nodes(node_count, filters, transform)
.find_preferred_fastest_unsafe_nodes(node_count, filters, transform)
}
pub fn find_preferred_closest_nodes<'a, T, O>(
pub fn find_preferred_closest_unsafe_nodes<'a, T, O>(
&self,
node_count: usize,
node_id: TypedKey,
@ -1005,7 +1024,7 @@ impl RoutingTable {
{
self.inner
.read()
.find_preferred_closest_nodes(node_count, node_id, filters, transform)
.find_preferred_closest_unsafe_nodes(node_count, node_id, filters, transform)
}
pub fn sort_and_clean_closest_noderefs(
@ -1022,11 +1041,11 @@ impl RoutingTable {
pub fn register_find_node_answer(
&self,
crypto_kind: CryptoKind,
peers: Vec<PeerInfo>,
peers: PeerInfoResponse,
) -> Vec<NodeRef> {
// Register nodes we'd found
let mut out = Vec::<NodeRef>::with_capacity(peers.len());
for p in peers {
let mut out = Vec::<NodeRef>::with_capacity(peers.peer_info_list.len());
for p in peers.peer_info_list {
// Ensure we're getting back nodes we asked for
if !p.node_ids().kinds().contains(&crypto_kind) {
continue;
@ -1038,7 +1057,12 @@ impl RoutingTable {
}
// Register the node if it's new
match self.register_node_with_peer_info(RoutingDomain::PublicInternet, p, false) {
match self.register_node_with_peer_info(
RoutingDomain::PublicInternet,
peers.safety_domain_set,
p,
false,
) {
Ok(nr) => out.push(nr),
Err(e) => {
log_rtab!(debug "failed to register node with peer info from find node answer: {}", e);

View File

@ -170,6 +170,15 @@ pub(crate) trait NodeRefBase: Sized {
fn set_seen_our_node_info_ts(&self, routing_domain: RoutingDomain, seen_ts: Timestamp) {
self.operate_mut(|_rti, e| e.set_seen_our_node_info_ts(routing_domain, seen_ts));
}
fn safety_domains(&self) -> SafetyDomainSet {
self.operate(|_rti, e| e.safety_domains())
}
fn set_safety_domains<S: Into<SafetyDomainSet>>(&mut self, s: S) {
self.operate_mut(|_rti, e| e.set_safety_domains(s))
}
// fn network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> {
// self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.network_class()))
// }
@ -204,8 +213,13 @@ pub(crate) trait NodeRefBase: Sized {
}
// Register relay node and return noderef
let nr =
rti.register_node_with_peer_info(self.routing_table(), routing_domain, rpi, false)?;
let nr = rti.register_node_with_peer_info(
self.routing_table(),
routing_domain,
e.safety_domains(),
rpi,
false,
)?;
Ok(Some(nr))
})
}

View File

@ -33,6 +33,7 @@ impl RouteNode {
&self,
routing_table: RoutingTable,
crypto_kind: CryptoKind,
safety_domain_set: SafetyDomainSet,
) -> Option<NodeRef> {
match self {
RouteNode::NodeId(id) => {
@ -49,6 +50,7 @@ impl RouteNode {
//
match routing_table.register_node_with_peer_info(
RoutingDomain::PublicInternet,
safety_domain_set,
*pi.clone(),
false,
) {
@ -116,7 +118,9 @@ impl PrivateRouteHops {
pub(crate) struct PrivateRoute {
/// The public key used for the entire route
pub public_key: TypedKey,
/// The number of hops in the 'hops' structure
pub hop_count: u8,
/// The encoded hops structure
pub hops: PrivateRouteHops,
}
@ -130,6 +134,7 @@ impl PrivateRoute {
node,
next_hop: None,
})),
safety_domain_set: SafetyDomainSet::all(),
}
}
@ -193,7 +198,7 @@ impl fmt::Display for PrivateRoute {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"PR({:?}+{}{})",
"PR({:?}+{}{}{})",
self.public_key,
self.hop_count,
match &self.hops {
@ -211,7 +216,8 @@ impl fmt::Display for PrivateRoute {
PrivateRouteHops::Empty => {
"".to_owned()
}
}
},
SafetyDomain::print_set(self.safety_domain_set),
)
}
}

View File

@ -421,9 +421,15 @@ impl RouteSpecStore {
NodeRef::new(routing_table.clone(), entry.unwrap(), None)
};
// Pull the whole routing table in sorted order
let nodes: Vec<NodeRef> =
rti.find_peers_with_sort_and_filter(usize::MAX, cur_ts, filters, compare, transform);
// Pull the unsafe domain peers in sorted order
let nodes: Vec<NodeRef> = rti.find_peers_with_sort_and_filter(
usize::MAX,
cur_ts,
SafetyDomain::Unsafe.into(),
filters,
compare,
transform,
);
// If we couldn't find enough nodes, wait until we have more nodes in the routing table
if nodes.len() < hop_count {
@ -749,10 +755,7 @@ impl RouteSpecStore {
let safety_selection = SafetySelection::Safe(safety_spec);
(
Destination::PrivateRoute {
private_route,
safety_selection,
},
Destination::private_route(private_route, safety_selection),
hops,
)
};
@ -799,10 +802,7 @@ impl RouteSpecStore {
let safety_selection = SafetySelection::Safe(safety_spec);
Destination::PrivateRoute {
private_route,
safety_selection,
}
Destination::private_route(private_route, safety_selection)
};
// Test with double-round trip ping to self
@ -1042,6 +1042,7 @@ impl RouteSpecStore {
rti.register_node_with_peer_info(
routing_table.clone(),
RoutingDomain::PublicInternet,
private_route.safety_domain_set,
*pi,
false,
)
@ -1459,6 +1460,8 @@ impl RouteSpecStore {
// add hop for 'FirstHop'
hop_count: (hop_count + 1).try_into().unwrap(),
hops: PrivateRouteHops::FirstHop(Box::new(route_hop)),
// routes we allocate ourselves are safe in all domains
safety_domain_set: SafetyDomainSet::all(),
};
Ok(private_route)
}

View File

@ -236,12 +236,17 @@ impl RoutingTableInner {
pub fn reset_all_updated_since_last_network_change(&mut self) {
let cur_ts = get_aligned_timestamp();
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| {
self.with_entries_mut(
cur_ts,
SafetyDomainSet::all(),
BucketEntryState::Dead,
|rti, v| {
v.with_mut(rti, |_rti, e| {
e.reset_updated_since_last_network_change();
});
Option::<()>::None
});
},
);
}
/// Return if this routing domain has a valid network class
@ -341,13 +346,18 @@ impl RoutingTableInner {
// If the local network topology has changed, nuke the existing local node info and let new local discovery happen
if changed {
let cur_ts = get_aligned_timestamp();
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| {
self.with_entries_mut(
cur_ts,
SafetyDomainSet::all(),
BucketEntryState::Dead,
|rti, e| {
e.with_mut(rti, |_rti, e| {
e.clear_signed_node_info(RoutingDomain::LocalNetwork);
e.reset_updated_since_last_network_change();
});
Option::<()>::None
});
},
);
}
}
@ -421,7 +431,11 @@ impl RoutingTableInner {
pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts {
self.live_entry_count.clear();
let cur_ts = get_aligned_timestamp();
self.with_entries_mut(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
self.with_entries_mut(
cur_ts,
SafetyDomainSet::all(),
BucketEntryState::Unreliable,
|rti, entry| {
entry.with_inner(|e| {
// Tally per routing domain and crypto kind
for rd in RoutingDomain::all() {
@ -440,7 +454,8 @@ impl RoutingTableInner {
}
});
Option::<()>::None
});
},
);
self.live_entry_count.clone()
}
@ -454,12 +469,13 @@ impl RoutingTableInner {
pub fn get_entry_count(
&self,
routing_domain_set: RoutingDomainSet,
safety_domain_set: SafetyDomainSet,
min_state: BucketEntryState,
crypto_kinds: &[CryptoKind],
) -> usize {
let mut count = 0usize;
let cur_ts = get_aligned_timestamp();
self.with_entries(cur_ts, min_state, |rti, e| {
self.with_entries(cur_ts, safety_domain_set, min_state, |rti, e| {
if e.with_inner(|e| {
e.best_routing_domain(rti, routing_domain_set).is_some()
&& !common_crypto_kinds(&e.crypto_kinds(), crypto_kinds).is_empty()
@ -475,11 +491,14 @@ impl RoutingTableInner {
pub fn with_entries<T, F: FnMut(&RoutingTableInner, Arc<BucketEntry>) -> Option<T>>(
&self,
cur_ts: Timestamp,
safety_domain_set: SafetyDomainSet,
min_state: BucketEntryState,
mut f: F,
) -> Option<T> {
for entry in &self.all_entries {
if entry.with_inner(|e| e.state(cur_ts) >= min_state) {
if entry.with_inner(|e| {
e.state(cur_ts) >= min_state && !e.safety_domains().is_disjoint(safety_domain_set)
}) {
if let Some(out) = f(self, entry) {
return Some(out);
}
@ -493,12 +512,15 @@ impl RoutingTableInner {
pub fn with_entries_mut<T, F: FnMut(&mut RoutingTableInner, Arc<BucketEntry>) -> Option<T>>(
&mut self,
cur_ts: Timestamp,
safety_domain_set: SafetyDomainSet,
min_state: BucketEntryState,
mut f: F,
) -> Option<T> {
let mut entries = Vec::with_capacity(self.all_entries.len());
for entry in self.all_entries.iter() {
if entry.with_inner(|e| e.state(cur_ts) >= min_state) {
if entry.with_inner(|e| {
e.state(cur_ts) >= min_state && !e.safety_domains().is_disjoint(safety_domain_set)
}) {
entries.push(entry);
}
}
@ -520,7 +542,11 @@ impl RoutingTableInner {
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count());
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
self.with_entries(
cur_ts,
SafetyDomain::Unsafe.into(),
BucketEntryState::Unreliable,
|rti, entry| {
let entry_needs_ping = |e: &BucketEntryInner| {
// If this entry isn't in the routing domain we are checking, don't include it
if !e.exists_in_routing_domain(rti, routing_domain) {
@ -528,7 +554,8 @@ impl RoutingTableInner {
}
// If we don't have node status for this node, then we should ping it to get some node status
if e.has_node_info(routing_domain.into()) && e.node_status(routing_domain).is_none()
if e.has_node_info(routing_domain.into())
&& e.node_status(routing_domain).is_none()
{
return true;
}
@ -554,17 +581,8 @@ impl RoutingTableInner {
));
}
Option::<()>::None
});
node_refs
}
#[allow(dead_code)]
pub fn get_all_alive_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec<NodeRef> {
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count());
self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| {
node_refs.push(NodeRef::new(outer_self.clone(), entry, None));
Option::<()>::None
});
},
);
node_refs
}
@ -702,7 +720,7 @@ impl RoutingTableInner {
new_entry.with_mut_inner(|e| update_func(self, e));
// Kick the bucket
log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS));
log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), SafetyDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS));
Ok(nr)
}
@ -729,6 +747,7 @@ impl RoutingTableInner {
pub fn lookup_node_ref(
&self,
outer_self: RoutingTable,
safety_domain: SafetyDomain,
node_id: TypedKey,
) -> EyreResult<Option<NodeRef>> {
if self.unlocked_inner.matches_own_node_id(&[node_id]) {
@ -750,11 +769,12 @@ impl RoutingTableInner {
pub fn lookup_and_filter_noderef(
&self,
outer_self: RoutingTable,
safety_domain: SafetyDomain,
node_id: TypedKey,
routing_domain_set: RoutingDomainSet,
dial_info_filter: DialInfoFilter,
) -> EyreResult<Option<NodeRef>> {
let nr = self.lookup_node_ref(outer_self, node_id)?;
let nr = self.lookup_node_ref(outer_self, safety_domain, node_id)?;
Ok(nr.map(|nr| {
nr.filtered_clone(
NodeRefFilter::new()
@ -790,6 +810,7 @@ impl RoutingTableInner {
&mut self,
outer_self: RoutingTable,
routing_domain: RoutingDomain,
safety_domain: SafetyDomain,
peer_info: PeerInfo,
allow_invalid: bool,
) -> EyreResult<NodeRef> {
@ -838,6 +859,7 @@ impl RoutingTableInner {
self.register_node_with_peer_info(
outer_self.clone(),
routing_domain,
safety_domain,
relay_peer_info,
false,
)?;
@ -846,7 +868,7 @@ impl RoutingTableInner {
let (node_ids, signed_node_info) = peer_info.destructure();
let mut nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| {
e.update_signed_node_info(routing_domain, signed_node_info);
e.update_signed_node_info(routing_domain, safety_domain_set, signed_node_info);
})?;
nr.set_filter(Some(
@ -866,10 +888,15 @@ impl RoutingTableInner {
flow: Flow,
timestamp: Timestamp,
) -> EyreResult<NodeRef> {
let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| {
let nr = self.create_node_ref(
outer_self,
SafetyDomain::Unsafe,
&TypedKeyGroup::from(node_id),
|_rti, e| {
//e.make_not_dead(timestamp);
e.touch_last_seen(timestamp);
})?;
},
)?;
// set the most recent node address for connection finding and udp replies
nr.locked_mut(self).set_last_flow(flow, timestamp);
Ok(nr)
@ -963,7 +990,7 @@ impl RoutingTableInner {
}) as RoutingTableEntryFilter;
filters.push_front(public_node_filter);
self.find_preferred_fastest_nodes(
self.find_preferred_fastest_unsafe_nodes(
node_count,
filters,
|_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
@ -1010,6 +1037,7 @@ impl RoutingTableInner {
&self,
node_count: usize,
cur_ts: Timestamp,
safety_domain_set: SafetyDomainSet,
mut filters: VecDeque<RoutingTableEntryFilter>,
mut compare: C,
mut transform: T,
@ -1039,7 +1067,11 @@ impl RoutingTableInner {
}
// add all nodes that match filter
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, v| {
self.with_entries(
cur_ts,
safety_domain_set,
BucketEntryState::Unreliable,
|rti, v| {
// Apply filter
let mut filtered = false;
for filter in &mut filters {
@ -1052,7 +1084,8 @@ impl RoutingTableInner {
nodes.push(Some(v.clone()));
}
Option::<()>::None
});
},
);
// sort by preference for returning nodes
nodes.sort_by(|a, b| compare(self, a, b));
@ -1069,7 +1102,7 @@ impl RoutingTableInner {
}
#[instrument(level = "trace", skip_all)]
pub fn find_preferred_fastest_nodes<T, O>(
pub fn find_preferred_fastest_unsafe_nodes<T, O>(
&self,
node_count: usize,
mut filters: VecDeque<RoutingTableEntryFilter>,
@ -1148,7 +1181,7 @@ impl RoutingTableInner {
}
#[instrument(level = "trace", skip_all)]
pub fn find_preferred_closest_nodes<T, O>(
pub fn find_preferred_closest_unsafe_nodes<T, O>(
&self,
node_count: usize,
node_id: TypedKey,

View File

@ -263,7 +263,7 @@ impl RoutingTable {
);
let nr =
match self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) {
match self.register_node_with_peer_info(RoutingDomain::PublicInternet, SafetyDomainSet::all(), pi, true) {
Ok(nr) => nr,
Err(e) => {
log_rtab!(error "failed to register bootstrap peer info: {}", e);

View File

@ -51,7 +51,7 @@ impl RoutingTable {
filters.push_front(filter);
let noderefs = routing_table
.find_preferred_closest_nodes(
.find_preferred_closest_unsafe_nodes(
CLOSEST_PEERS_REQUEST_COUNT,
self_node_id,
filters,

View File

@ -67,7 +67,7 @@ impl RoutingTable {
) as RoutingTableEntryFilter;
filters.push_front(filter);
let noderefs = routing_table.find_preferred_fastest_nodes(
let noderefs = routing_table.find_preferred_fastest_unsafe_nodes(
min_peer_count,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {

View File

@ -111,6 +111,7 @@ impl RoutingTable {
// Register new outbound relay
match self.register_node_with_peer_info(
RoutingDomain::PublicInternet,
SafetyDomainSet::all(),
outbound_relay_peerinfo,
false,
) {
@ -247,7 +248,12 @@ impl RoutingTable {
let mut best_inbound_relay: Option<Arc<BucketEntry>> = None;
// Iterate all known nodes for candidates
inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
// Only consider nodes that are in the SafetyDomain::Unsafe
inner.with_entries(
cur_ts,
SafetyDomain::Unsafe.into(),
BucketEntryState::Unreliable,
|rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |rti, e| {
// Filter this node
@ -272,7 +278,8 @@ impl RoutingTable {
});
// Don't end early, iterate through all entries
Option::<()>::None
});
},
);
// Return the best inbound relay noderef
best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None))
}

View File

@ -4,6 +4,7 @@ mod node_info;
mod node_status;
mod peer_info;
mod routing_domain;
mod safety_domain;
mod signed_direct_node_info;
mod signed_node_info;
mod signed_relayed_node_info;
@ -16,6 +17,7 @@ pub use node_info::*;
pub use node_status::*;
pub use peer_info::*;
pub use routing_domain::*;
pub use safety_domain::*;
pub use signed_direct_node_info::*;
pub use signed_node_info::*;
pub use signed_relayed_node_info::*;

View File

@ -46,3 +46,9 @@ impl PeerInfo {
}
}
}
#[derive(Clone, Debug)]
pub struct PeerInfoResponse {
pub safety_domain_set: SafetyDomainSet,
pub peer_info_list: Vec<PeerInfo>,
}

View File

@ -0,0 +1,40 @@
use super::*;
#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
#[enumset(repr = "u8")]
pub enum SafetyDomain {
Unsafe = 0,
Safe = 1,
}
//pub type SafetyDomainSet = EnumSet<SafetyDomain>;
impl From<SafetySelection> for SafetyDomain {
fn from(value: SafetySelection) -> Self {
match value {
SafetySelection::Unsafe(_) => SafetyDomain::Unsafe,
SafetySelection::Safe(_) => SafetyDomain::Safe,
}
}
}
impl SafetyDomain {
pub fn print(&self) -> String {
if *self == SafetyDomain::Unsafe.into() {
"*UNSAFE".to_string()
} else {
"*SAFE".to_string()
}
}
// pub fn print_set(set: SafetyDomainSet) -> String {
// if *set == SafetyDomainSet::all() {
// "*ALL".to_string()
// } else if *set == SafetyDomain::Unsafe.into() {
// "*UNSAFE".to_string()
// } else if *set == SafetyDomain::Safe.into() {
// "*SAFE".to_string()
// } else {
// "*NONE".to_string()
// }
// }
}

View File

@ -122,11 +122,15 @@ pub(crate) fn encode_private_route(
h_builder.set_empty(());
}
};
// We don't encode safety domain set, it will be set by the decoder based on how it was received
Ok(())
}
pub(crate) fn decode_private_route(
reader: &veilid_capnp::private_route::Reader,
safety_domain_set: SafetyDomainSet,
) -> Result<PrivateRoute, RPCError> {
let public_key = decode_typed_key(&reader.get_public_key().map_err(
RPCError::map_protocol("invalid public key in private route"),
@ -149,6 +153,7 @@ pub(crate) fn decode_private_route(
public_key,
hop_count,
hops,
safety_domain_set,
})
}

View File

@ -9,6 +9,8 @@ pub(crate) enum Destination {
node: NodeRef,
/// Require safety route or not
safety_selection: SafetySelection,
/// Override safety domain
opt_override_safety_domain: Option<SafetyDomain>,
},
/// Send to node for relay purposes
Relay {
@ -18,6 +20,8 @@ pub(crate) enum Destination {
node: NodeRef,
/// Require safety route or not
safety_selection: SafetySelection,
/// Override safety domain
opt_override_safety_domain: Option<SafetyDomain>,
},
/// Send to private route
PrivateRoute {
@ -25,6 +29,8 @@ pub(crate) enum Destination {
private_route: PrivateRoute,
/// Require safety route or not
safety_selection: SafetySelection,
/// Override safety domain
opt_override_safety_domain: Option<SafetyDomain>,
},
}
@ -34,6 +40,7 @@ pub struct UnsafeRoutingInfo {
pub opt_node: Option<NodeRef>,
pub opt_relay: Option<NodeRef>,
pub opt_routing_domain: Option<RoutingDomain>,
pub opt_override_safety_domain: Option<SafetyDomain>,
}
impl Destination {
@ -42,15 +49,18 @@ impl Destination {
Destination::Direct {
node: target,
safety_selection: _,
opt_override_safety_domain: _,
} => Some(target.clone()),
Destination::Relay {
relay: _,
node: target,
safety_selection: _,
opt_override_safety_domain: _,
} => Some(target.clone()),
Destination::PrivateRoute {
private_route: _,
safety_selection: _,
opt_override_safety_domain: _,
} => None,
}
}
@ -59,6 +69,7 @@ impl Destination {
Self::Direct {
node,
safety_selection: SafetySelection::Unsafe(sequencing),
opt_override_safety_domain: None,
}
}
pub fn relay(relay: NodeRef, node: NodeRef) -> Self {
@ -67,12 +78,14 @@ impl Destination {
relay,
node,
safety_selection: SafetySelection::Unsafe(sequencing),
opt_override_safety_domain: None,
}
}
pub fn private_route(private_route: PrivateRoute, safety_selection: SafetySelection) -> Self {
Self::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain: None,
}
}
@ -81,43 +94,90 @@ impl Destination {
Destination::Direct {
node,
safety_selection: _,
opt_override_safety_domain,
} => Self::Direct {
node,
safety_selection,
opt_override_safety_domain,
},
Destination::Relay {
relay,
node,
safety_selection: _,
opt_override_safety_domain,
} => Self::Relay {
relay,
node,
safety_selection,
opt_override_safety_domain,
},
Destination::PrivateRoute {
private_route,
safety_selection: _,
opt_override_safety_domain,
} => Self::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain,
},
}
}
pub fn is_direct(&self) -> bool {
matches!(
self,
Destination::Direct {
node: _,
safety_selection: _,
opt_override_safety_domain: _
}
)
}
pub fn is_relay(&self) -> bool {
matches!(
self,
Destination::Relay {
relay: _,
node: _,
safety_selection: _,
opt_override_safety_domain: _
}
)
}
pub fn is_private_route(&self) -> bool {
matches!(
self,
Destination::PrivateRoute {
private_route: _,
safety_selection: _,
opt_override_safety_domain: _,
}
)
}
pub fn has_safety_route(&self) -> bool {
matches!(self.get_safety_selection(), SafetySelection::Safe(_))
}
pub fn get_safety_selection(&self) -> &SafetySelection {
match self {
Destination::Direct {
node: _,
safety_selection,
opt_override_safety_domain: _,
} => safety_selection,
Destination::Relay {
relay: _,
node: _,
safety_selection,
opt_override_safety_domain: _,
} => safety_selection,
Destination::PrivateRoute {
private_route: _,
safety_selection,
opt_override_safety_domain: _,
} => safety_selection,
}
}
@ -127,15 +187,18 @@ impl Destination {
Destination::Direct {
node,
safety_selection: _,
opt_override_safety_domain: _,
}
| Destination::Relay {
relay: _,
node,
safety_selection: _,
opt_override_safety_domain: _,
} => Ok(Target::NodeId(node.best_node_id())),
Destination::PrivateRoute {
private_route,
safety_selection: _,
opt_override_safety_domain: _,
} => {
// Add the remote private route if we're going to keep the id
let route_id = rss
@ -147,6 +210,40 @@ impl Destination {
}
}
pub fn with_override_safety_domain(self, safety_domain: SafetyDomain) -> Self {
match self {
Destination::Direct {
node,
safety_selection,
opt_override_safety_domain: _,
} => Self::Direct {
node,
safety_selection,
opt_override_safety_domain: Some(safety_domain),
},
Destination::Relay {
relay,
node,
safety_selection,
opt_override_safety_domain: _,
} => Self::Relay {
relay,
node,
safety_selection,
opt_override_safety_domain: Some(safety_domain),
},
Destination::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain: _,
} => Self::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain: Some(safety_domain),
},
}
}
pub fn get_unsafe_routing_info(
&self,
routing_table: RoutingTable,
@ -162,10 +259,12 @@ impl Destination {
// Get:
// * The target node (possibly relayed)
// * The routing domain we are sending to if we can determine it
let (opt_node, opt_relay, opt_routing_domain) = match self {
// * The safety domain override if one was specified
let (opt_node, opt_relay, opt_routing_domain, opt_override_safety_domain) = match self {
Destination::Direct {
node,
safety_selection: _,
opt_override_safety_domain: override_safety_domain,
} => {
let opt_routing_domain = node.best_routing_domain();
if opt_routing_domain.is_none() {
@ -173,12 +272,18 @@ impl Destination {
// Only a stale connection or no connection exists
log_rpc!(debug "No routing domain for node: node={}", node);
};
(Some(node.clone()), None, opt_routing_domain)
(
Some(node.clone()),
None,
opt_routing_domain,
*override_safety_domain,
)
}
Destination::Relay {
relay,
node,
safety_selection: _,
opt_override_safety_domain: override_safety_domain,
} => {
// Outbound relays are defined as routing to and from PublicInternet only right now
@ -207,18 +312,30 @@ impl Destination {
log_rpc!(debug "Unexpected relay used for node: relay={}, node={}", relay, node);
};
(Some(node.clone()), Some(relay.clone()), opt_routing_domain)
(
Some(node.clone()),
Some(relay.clone()),
opt_routing_domain,
*override_safety_domain,
)
}
Destination::PrivateRoute {
private_route: _,
safety_selection: _,
} => (None, None, Some(RoutingDomain::PublicInternet)),
opt_override_safety_domain: override_safety_domain,
} => (
None,
None,
Some(RoutingDomain::PublicInternet),
*override_safety_domain,
),
};
Some(UnsafeRoutingInfo {
opt_node,
opt_relay,
opt_routing_domain,
opt_override_safety_domain,
})
}
}
@ -229,6 +346,7 @@ impl fmt::Display for Destination {
Destination::Direct {
node,
safety_selection,
opt_override_safety_domain,
} => {
let sr = if matches!(safety_selection, SafetySelection::Safe(_)) {
"+SR"
@ -236,12 +354,19 @@ impl fmt::Display for Destination {
""
};
write!(f, "{}{}", node, sr)
let osd = if let Some(override_safety_domain) = opt_override_safety_domain {
format!("*{:?}", override_safety_domain)
} else {
"".to_string()
};
write!(f, "{}{}{}", node, sr, osd)
}
Destination::Relay {
relay,
node,
safety_selection,
opt_override_safety_domain,
} => {
let sr = if matches!(safety_selection, SafetySelection::Safe(_)) {
"+SR"
@ -249,11 +374,18 @@ impl fmt::Display for Destination {
""
};
write!(f, "{}@{}{}", node, relay, sr)
let osd = if let Some(override_safety_domain) = opt_override_safety_domain {
format!("*{:?}", override_safety_domain)
} else {
"".to_string()
};
write!(f, "{}@{}{}{}", node, relay, sr, osd)
}
Destination::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain,
} => {
let sr = if matches!(safety_selection, SafetySelection::Safe(_)) {
"+SR"
@ -261,7 +393,13 @@ impl fmt::Display for Destination {
""
};
write!(f, "{}{}", private_route.public_key, sr)
let osd = if let Some(override_safety_domain) = opt_override_safety_domain {
format!("*{:?}", override_safety_domain)
} else {
"".to_string()
};
write!(f, "{}{}{}", private_route.public_key, sr, osd)
}
}
}
@ -289,6 +427,7 @@ impl RPCProcessor {
Ok(rpc_processor::Destination::Direct {
node: nr,
safety_selection,
opt_override_safety_domain: None,
})
}
Target::PrivateRoute(rsid) => {
@ -302,6 +441,7 @@ impl RPCProcessor {
Ok(rpc_processor::Destination::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain: None,
})
}
}
@ -319,6 +459,7 @@ impl RPCProcessor {
Destination::Direct {
node: target,
safety_selection,
opt_override_safety_domain: _,
} => match safety_selection {
SafetySelection::Unsafe(_) => {
// Sent directly with no safety route, can respond directly
@ -347,6 +488,7 @@ impl RPCProcessor {
relay,
node: target,
safety_selection,
opt_override_safety_domain: _,
} => match safety_selection {
SafetySelection::Unsafe(_) => {
// Sent via a relay with no safety route, can respond directly
@ -373,6 +515,7 @@ impl RPCProcessor {
Destination::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain: _,
} => {
let Some(avoid_node_id) = private_route.first_hop_node_id() else {
return Err(RPCError::internal(

View File

@ -58,7 +58,7 @@ pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String {
out
}
pub(crate) type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>;
pub(crate) type FanoutCallReturnType = RPCNetworkResult<PeerInfoResponse>;
pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
@ -196,7 +196,8 @@ where
match (self.call_routine)(next_node.clone()).await {
Ok(NetworkResult::Value(v)) => {
// Filter returned nodes
let filtered_v: Vec<PeerInfo> = v
let filtered_peer_info_list: Vec<PeerInfo> = v
.peer_info_list
.into_iter()
.filter(|pi| {
let node_ids = pi.node_ids().to_vec();
@ -212,9 +213,13 @@ where
// Call succeeded
// Register the returned nodes and add them to the fanout queue in sorted order
let new_nodes = self
.routing_table
.register_find_node_answer(self.crypto_kind, filtered_v);
let new_nodes = self.routing_table.register_find_node_answer(
self.crypto_kind,
PeerInfoResponse {
safety_domain_set: v.safety_domain_set,
peer_info_list: filtered_peer_info_list,
},
);
self.clone().add_to_fanout_queue(&new_nodes);
}
#[allow(unused_variables)]
@ -275,7 +280,12 @@ where
};
routing_table
.find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform)
.find_preferred_closest_unsafe_nodes(
self.node_count,
self.node_id,
filters,
transform,
)
.map_err(RPCError::invalid_format)?
};
self.clone().add_to_fanout_queue(&closest_nodes);

View File

@ -228,6 +228,8 @@ struct RenderedOperation {
remote_private_route: Option<PublicKey>,
/// The private route requested to receive the reply
reply_private_route: Option<PublicKey>,
/// The safety domain we are sending in
safety_domain: SafetyDomain,
}
impl fmt::Debug for RenderedOperation {
@ -240,6 +242,7 @@ impl fmt::Debug for RenderedOperation {
.field("safety_route", &self.safety_route)
.field("remote_private_route", &self.remote_private_route)
.field("reply_private_route", &self.reply_private_route)
.field("safety_domain", &self.safety_domain)
.finish()
}
}
@ -732,6 +735,10 @@ impl RPCProcessor {
safety_route: if sr_is_stub { None } else { Some(sr_pubkey) },
remote_private_route: if pr_is_stub { None } else { Some(pr_pubkey) },
reply_private_route,
// If we are choosing to send without a safety route, then we are in the unsafe domain
// If we are sending with a safety route, then our first hop should always be
// to a node in the unsafe domain since we allocated the safety route ourselves
safety_domain: SafetyDomain::Unsafe,
};
Ok(NetworkResult::value(out))
@ -745,8 +752,7 @@ impl RPCProcessor {
&self,
dest: Destination,
operation: &RPCOperation,
) ->RPCNetworkResult<RenderedOperation> {
let out: NetworkResult<RenderedOperation>;
) -> RPCNetworkResult<RenderedOperation> {
// Encode message to a builder and make a message reader for it
// Then produce the message as an unencrypted byte buffer
@ -771,11 +777,13 @@ impl RPCProcessor {
Destination::Direct {
node: ref node_ref,
safety_selection,
opt_override_safety_domain,
}
| Destination::Relay {
relay: ref node_ref,
node: _,
safety_selection,
opt_override_safety_domain,
} => {
// Send to a node without a private route
// --------------------------------------
@ -785,6 +793,7 @@ impl RPCProcessor {
relay: _,
node: ref target,
safety_selection: _,
opt_override_safety_domain: _,
} = dest
{
(node_ref.clone(), target.clone())
@ -810,7 +819,7 @@ impl RPCProcessor {
// If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing
out = NetworkResult::value(RenderedOperation {
Ok(NetworkResult::value(RenderedOperation {
message,
destination_node_ref,
node_ref,
@ -818,7 +827,8 @@ impl RPCProcessor {
safety_route: None,
remote_private_route: None,
reply_private_route: None,
});
safety_domain: opt_override_safety_domain.unwrap_or(SafetyDomain::Unsafe),
}))
}
SafetySelection::Safe(_) => {
// No private route was specified for the request
@ -840,32 +850,45 @@ impl RPCProcessor {
);
// Wrap with safety route
out = self.wrap_with_route(
let mut rendered_operation = network_result_try!(self.wrap_with_route(
safety_selection,
private_route,
reply_private_route,
message,
)?;
)?);
// Override safety domain if we requested it
if let Some(override_safety_domain) = opt_override_safety_domain {
rendered_operation.safety_domain = override_safety_domain;
}
Ok(NetworkResult::value(rendered_operation))
}
}
};
}
Destination::PrivateRoute {
private_route,
safety_selection,
opt_override_safety_domain,
} => {
// Send to private route
// ---------------------
// Reply with 'route' operation
out = self.wrap_with_route(
let mut rendered_operation = network_result_try!(self.wrap_with_route(
safety_selection,
private_route,
reply_private_route,
message,
)?;
}
)?);
// Override safety domain if we requested it
if let Some(override_safety_domain) = opt_override_safety_domain {
rendered_operation.safety_domain = override_safety_domain;
}
Ok(out)
Ok(NetworkResult::value(rendered_operation))
}
}
}
/// Get signed node info to package with RPC messages to improve
@ -878,7 +901,7 @@ impl RPCProcessor {
// Otherwise we would be attaching the original sender's identity to the final destination,
// thus defeating the purpose of the safety route entirely :P
let Some(UnsafeRoutingInfo {
opt_node, opt_relay: _, opt_routing_domain
opt_node, opt_relay: _, opt_routing_domain, opt_override_safety_domain:_
}) = dest.get_unsafe_routing_info(self.routing_table.clone()) else {
return SenderPeerInfo::default();
};
@ -1194,6 +1217,7 @@ impl RPCProcessor {
safety_route,
remote_private_route,
reply_private_route,
safety_domain,
} = network_result_try!(self.render_operation(dest.clone(), &operation)?);
// Calculate answer timeout
@ -1214,6 +1238,7 @@ impl RPCProcessor {
let res = self
.network_manager()
.send_envelope(
safety_domain,
node_ref.clone(),
Some(destination_node_ref.clone()),
message,
@ -1283,7 +1308,7 @@ impl RPCProcessor {
// Log rpc send
#[cfg(feature = "verbose-tracing")]
debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest);
debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest, override_safety_domain = override_safety_domain);
// Produce rendered operation
let RenderedOperation {
@ -1294,6 +1319,7 @@ impl RPCProcessor {
safety_route,
remote_private_route,
reply_private_route: _,
safety_domain,
} = network_result_try!(self.render_operation(dest, &operation)?);
// Send statement
@ -1304,6 +1330,7 @@ impl RPCProcessor {
let res = self
.network_manager()
.send_envelope(
safety_domain,
node_ref.clone(),
Some(destination_node_ref.clone()),
message,
@ -1370,6 +1397,7 @@ impl RPCProcessor {
safety_route,
remote_private_route,
reply_private_route: _,
safety_domain,
} = network_result_try!(self.render_operation(dest, &operation)?);
// Send the reply
@ -1380,6 +1408,7 @@ impl RPCProcessor {
let res = self
.network_manager()
.send_envelope(
safety_domain,
node_ref.clone(),
Some(destination_node_ref.clone()),
message,
@ -1539,6 +1568,7 @@ impl RPCProcessor {
}
opt_sender_nr = match self.routing_table().register_node_with_peer_info(
routing_domain,
SafetyDomainSet::all(),
sender_peer_info.clone(),
false,
) {

View File

@ -13,7 +13,7 @@ impl RPCProcessor {
dest: Destination,
node_id: TypedKey,
capabilities: Vec<Capability>,
) -> RPCNetworkResult<Answer<Vec<PeerInfo>>> {
) -> RPCNetworkResult<Answer<PeerInfoResponse>> {
let _guard = self
.unlocked_inner
.startup_lock
@ -21,13 +21,7 @@ impl RPCProcessor {
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
if matches!(
dest,
Destination::PrivateRoute {
private_route: _,
safety_selection: _
}
) {
if dest.is_private_route() {
return Err(RPCError::internal(
"Never send find node requests over private routes",
));
@ -43,6 +37,12 @@ impl RPCProcessor {
let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest);
let safety_domain_set = if dest.has_safety_route() {
SafetyDomain::Safe.into()
} else {
SafetyDomainSet::all()
};
// Send the find_node request
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
@ -66,9 +66,9 @@ impl RPCProcessor {
};
// Verify peers are in the correct peer scope
let peers = find_node_a.destructure();
let peer_info_list = find_node_a.destructure();
for peer_info in &peers {
for peer_info in &peer_info_list {
if !self.verify_node_info(
RoutingDomain::PublicInternet,
peer_info.signed_node_info(),
@ -80,10 +80,15 @@ impl RPCProcessor {
}
}
let peer_info_response = PeerInfoResponse {
safety_domain_set,
peer_info_list,
};
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
peers,
peer_info_response,
)))
}

View File

@ -4,7 +4,7 @@ use crate::storage_manager::{SignedValueData, SignedValueDescriptor};
#[derive(Clone, Debug)]
pub struct GetValueAnswer {
pub value: Option<SignedValueData>,
pub peers: Vec<PeerInfo>,
pub peers: PeerInfoResponse,
pub descriptor: Option<SignedValueDescriptor>,
}
@ -77,6 +77,12 @@ impl RPCProcessor {
vcrypto: vcrypto.clone(),
});
let safety_domain_set = if dest.has_safety_route() {
SafetyDomain::Safe.into()
} else {
SafetyDomainSet::all()
};
log_dht!(debug "{}", debug_string);
let waitable_reply = network_result_try!(
@ -103,7 +109,7 @@ impl RPCProcessor {
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (value, peers, descriptor) = get_value_a.destructure();
let (value, peer_info_list, descriptor) = get_value_a.destructure();
if debug_target_enabled!("dht") {
let debug_string_value = value.as_ref().map(|v| {
format!(" len={} seq={} writer={}",
@ -123,18 +129,18 @@ impl RPCProcessor {
} else {
""
},
peers.len(),
peer_info_list.len(),
dest
);
log_dht!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
let peer_ids:Vec<String> = peer_info_list.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
log_dht!(debug "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peer_info_list) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
@ -156,14 +162,14 @@ impl RPCProcessor {
tracing::Span::current().record("ret.value.data.writer", value.value_data().writer().to_string());
}
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.peers.len", peers.len());
tracing::Span::current().record("ret.peers.len", peer_info_list.len());
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
GetValueAnswer {
value,
peers,
peers: PeerInfoResponse{ safety_domain_set, peer_info_list },
descriptor,
},
)))

View File

@ -4,7 +4,7 @@ use crate::storage_manager::SignedValueDescriptor;
#[derive(Clone, Debug)]
pub struct InspectValueAnswer {
pub seqs: Vec<ValueSeqNum>,
pub peers: Vec<PeerInfo>,
pub peers: PeerInfoResponse,
pub descriptor: Option<SignedValueDescriptor>,
}
@ -80,6 +80,12 @@ impl RPCProcessor {
vcrypto: vcrypto.clone(),
});
let safety_domain_set = if dest.has_safety_route() {
SafetyDomain::Safe.into()
} else {
SafetyDomainSet::all()
};
log_dht!(debug "{}", debug_string);
let waitable_reply = network_result_try!(
@ -106,20 +112,20 @@ impl RPCProcessor {
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (seqs, peers, descriptor) = inspect_value_a.destructure();
let (seqs, peer_info_list, descriptor) = inspect_value_a.destructure();
if debug_target_enabled!("dht") {
let debug_string_answer = format!(
"OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}",
key,
if descriptor.is_some() { " +desc" } else { "" },
peers.len(),
peer_info_list.len(),
dest,
debug_seqs(&seqs)
);
log_dht!(debug "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
let peer_ids: Vec<String> = peer_info_list
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
@ -127,7 +133,7 @@ impl RPCProcessor {
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peer_info_list) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
@ -150,7 +156,7 @@ impl RPCProcessor {
reply_private_route,
InspectValueAnswer {
seqs,
peers,
peers: PeerInfoResponse{ safety_domain_set, peer_info_list },
descriptor,
},
)))

View File

@ -4,7 +4,7 @@ use super::*;
pub struct SetValueAnswer {
pub set: bool,
pub value: Option<SignedValueData>,
pub peers: Vec<PeerInfo>,
pub peers: PeerInfoResponse,
}
impl RPCProcessor {
@ -95,6 +95,12 @@ impl RPCProcessor {
log_dht!(debug "{}", debug_string);
}
let safety_domain_set = if dest.has_safety_route() {
SafetyDomain::Safe.into()
} else {
SafetyDomainSet::all()
};
let waitable_reply = network_result_try!(
self.question(dest.clone(), question, Some(question_context))
.await?
@ -119,7 +125,7 @@ impl RPCProcessor {
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (set, value, peers) = set_value_a.destructure();
let (set, value, peer_info_list) = set_value_a.destructure();
if debug_target_enabled!("dht") {
let debug_string_value = value.as_ref().map(|v| {
@ -140,18 +146,18 @@ impl RPCProcessor {
""
},
debug_string_value,
peers.len(),
peer_info_list.len(),
dest,
);
log_dht!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
let peer_ids:Vec<String> = peer_info_list.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
log_dht!(debug "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peer_info_list) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
@ -180,7 +186,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
SetValueAnswer { set, value, peers },
SetValueAnswer { set, value, peers: PeerInfoResponse { safety_domain_set, peer_info_list } }
)))
}

View File

@ -15,14 +15,13 @@ impl RPCProcessor {
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination never has a private route
if matches!(
dest,
Destination::PrivateRoute {
private_route: _,
safety_selection: _
// Ensure destination is direct
if dest.has_safety_route() {
return Err(RPCError::internal(
"Never send signal requests over safety routes",
));
}
) {
if dest.is_private_route() {
return Err(RPCError::internal(
"Never send signal requests over private routes",
));

View File

@ -31,6 +31,7 @@ impl RPCProcessor {
opt_node,
opt_relay,
opt_routing_domain,
opt_override_safety_domain: _,
}) =
dest.get_unsafe_routing_info(self.routing_table.clone())
{
@ -109,6 +110,7 @@ impl RPCProcessor {
Destination::Direct {
node: target,
safety_selection,
opt_override_safety_domain: _,
} => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) {
if let Some(sender_info) = sender_info {
@ -145,10 +147,12 @@ impl RPCProcessor {
relay: _,
node: _,
safety_selection: _,
opt_override_safety_domain: _,
}
| Destination::PrivateRoute {
private_route: _,
safety_selection: _,
opt_override_safety_domain: _,
} => {
// sender info is irrelevant over relays and routes
}

View File

@ -20,7 +20,7 @@ impl RPCProcessor {
.map_err(RPCError::map_try_again("not started up"))?;
// Ensure destination is never using a safety route
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
if dest.has_safety_route() {
return Err(RPCError::internal(
"Never send value changes over safety routes",
));

View File

@ -4,7 +4,7 @@ use super::*;
pub struct WatchValueAnswer {
pub accepted: bool,
pub expiration_ts: Timestamp,
pub peers: Vec<PeerInfo>,
pub peers: PeerInfoResponse,
pub watch_id: u64,
}
@ -86,6 +86,12 @@ impl RPCProcessor {
log_dht!(debug "{}", debug_string);
let safety_domain_set = if dest.has_safety_route() {
SafetyDomain::Safe.into()
} else {
SafetyDomainSet::all()
};
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?);
@ -108,7 +114,7 @@ impl RPCProcessor {
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let question_watch_id = watch_id;
let (accepted, expiration, peers, watch_id) = watch_value_a.destructure();
let (accepted, expiration, peer_info_list, watch_id) = watch_value_a.destructure();
if debug_target_enabled!("dht") {
let debug_string_answer = format!(
"OUT <== WatchValueA({}id={} {} #{:?}@{} peers={}) <= {}",
@ -117,13 +123,13 @@ impl RPCProcessor {
key,
subkeys,
expiration,
peers.len(),
peer_info_list.len(),
dest
);
log_dht!(debug "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
let peer_ids: Vec<String> = peer_info_list
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
@ -150,7 +156,12 @@ impl RPCProcessor {
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
let valid = match RoutingTable::verify_peers_closer(
vcrypto,
target_node_id,
key,
&peer_info_list,
) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
@ -168,7 +179,7 @@ impl RPCProcessor {
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.expiration", latency.as_u64());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.peers.len", peers.len());
tracing::Span::current().record("ret.peers.len", peer_info_list.len());
Ok(NetworkResult::value(Answer::new(
latency,
@ -176,7 +187,10 @@ impl RPCProcessor {
WatchValueAnswer {
accepted,
expiration_ts: Timestamp::new(expiration),
peers,
peers: PeerInfoResponse {
safety_domain_set,
peer_info_list,
},
watch_id,
},
)))

View File

@ -113,7 +113,7 @@ impl StorageManager {
// Keep the value if we got one and it is newer and it passes schema validation
let Some(value) = gva.answer.value else {
// Return peers if we have some
log_network_result!(debug "GetValue returned no value, fanout call returned peers {}", gva.answer.peers.len());
log_network_result!(debug "GetValue returned no value, fanout call returned peers {}", gva.answer.peers.peer_info_list.len());
return Ok(NetworkResult::value(gva.answer.peers))
};
@ -179,7 +179,7 @@ impl StorageManager {
}
// Return peers if we have some
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.peer_info_list.len());
Ok(NetworkResult::value(gva.answer.peers))
}.instrument(tracing::trace_span!("outbound_get_value fanout routine"))

View File

@ -225,7 +225,7 @@ impl StorageManager {
}
// Return peers if we have some
log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len());
log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.peer_info_list.len());
Ok(NetworkResult::value(answer.peers))
}.instrument(tracing::trace_span!("outbound_inspect_value fanout call"))

View File

@ -107,7 +107,7 @@ impl StorageManager {
ctx.missed_since_last_set += 1;
// Return peers if we have some
log_network_result!(debug "SetValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_set, sva.answer.peers.len());
log_network_result!(debug "SetValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_set, sva.answer.peers.peer_info_list.len());
return Ok(NetworkResult::value(sva.answer.peers));
}
@ -122,7 +122,7 @@ impl StorageManager {
}
// Return peers if we have some
log_network_result!(debug "SetValue returned no value, fanout call returned peers {}", sva.answer.peers.len());
log_network_result!(debug "SetValue returned no value, fanout call returned peers {}", sva.answer.peers.peer_info_list.len());
return Ok(NetworkResult::value(sva.answer.peers));
};

View File

@ -291,7 +291,7 @@ impl StorageManager {
}
// Return peers if we have some
log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node);
log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.peer_info_list.len(), next_node);
Ok(NetworkResult::value(wva.answer.peers))
}.instrument(tracing::trace_span!("outbound_watch_value call routine"))

View File

@ -224,7 +224,7 @@ fn get_destination(
let text = text.to_owned();
Box::pin(async move {
// Safety selection
let (text, ss) = if let Some((first, second)) = text.split_once('+') {
let (text, oss) = if let Some((first, second)) = text.split_once('+') {
let ss = get_safety_selection(routing_table.clone())(second)?;
(first, Some(ss))
} else {
@ -233,6 +233,15 @@ fn get_destination(
if text.is_empty() {
return None;
}
// Override safety domain
let (text, osd) = if let Some(first) = text.strip_suffix("*SAFE") {
(first, Some(SafetyDomain::Safe))
} else {
(text, None)
};
if text.is_empty() {
return None;
}
if &text[0..1] == "#" {
let rss = routing_table.route_spec_store();
@ -254,11 +263,14 @@ fn get_destination(
};
private_route
};
Some(Destination::private_route(
let mut d = Destination::private_route(
private_route,
ss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())),
))
oss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())),
);
if let Some(sd) = osd {
d = d.with_override_safety_domain(sd);
}
Some(d)
} else {
let (text, mods) = text
.split_once('/')
@ -274,25 +286,29 @@ fn get_destination(
}
let mut d = Destination::relay(relay_nr, target_nr);
if let Some(ss) = ss {
if let Some(ss) = oss {
d = d.with_safety(ss)
}
if let Some(sd) = osd {
d = d.with_override_safety_domain(sd);
}
Some(d)
} else {
// Direct
let mut target_nr =
resolve_node_ref(routing_table, ss.unwrap_or_default())(text).await?;
resolve_node_ref(routing_table, oss.unwrap_or_default())(text).await?;
if let Some(mods) = mods {
target_nr = get_node_ref_modifiers(target_nr)(mods)?;
}
let mut d = Destination::direct(target_nr);
if let Some(ss) = ss {
if let Some(ss) = oss {
d = d.with_safety(ss)
}
if let Some(sd) = osd {
d = d.with_override_safety_domain(sd);
}
Some(d)
}
}
@ -968,6 +984,7 @@ impl VeilidAPI {
Destination::Direct {
node: target,
safety_selection: _,
opt_override_safety_domain: _,
} => Ok(format!(
"Destination: {:#?}\nTarget Entry:\n{}\n",
&dest,
@ -977,6 +994,7 @@ impl VeilidAPI {
relay,
node: target,
safety_selection: _,
opt_override_safety_domain: _,
} => Ok(format!(
"Destination: {:#?}\nTarget Entry:\n{}\nRelay Entry:\n{}\n",
&dest,
@ -986,6 +1004,7 @@ impl VeilidAPI {
Destination::PrivateRoute {
private_route: _,
safety_selection: _,
opt_override_safety_domain: _,
} => Ok(format!("Destination: {:#?}", &dest)),
}
}