mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-12-25 15:29:32 -05:00
Merge branch 'improved-relaying' into 'main'
Native IPV4-IPV6 bridging support Closes #371 See merge request veilid/veilid!275
This commit is contained in:
commit
612921a34d
@ -8,6 +8,7 @@ mod low_level_protocol_type;
|
|||||||
mod network_class;
|
mod network_class;
|
||||||
mod peer_address;
|
mod peer_address;
|
||||||
mod protocol_type;
|
mod protocol_type;
|
||||||
|
mod relay_kind;
|
||||||
mod signal_info;
|
mod signal_info;
|
||||||
mod socket_address;
|
mod socket_address;
|
||||||
|
|
||||||
@ -23,5 +24,6 @@ pub use low_level_protocol_type::*;
|
|||||||
pub use network_class::*;
|
pub use network_class::*;
|
||||||
pub use peer_address::*;
|
pub use peer_address::*;
|
||||||
pub use protocol_type::*;
|
pub use protocol_type::*;
|
||||||
|
pub use relay_kind::*;
|
||||||
pub use signal_info::*;
|
pub use signal_info::*;
|
||||||
pub use socket_address::*;
|
pub use socket_address::*;
|
||||||
|
@ -15,11 +15,6 @@ impl Default for NetworkClass {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkClass {
|
impl NetworkClass {
|
||||||
// Must an inbound relay be kept available?
|
|
||||||
// In the case of InboundCapable, it is left up to the class of each DialInfo to determine if an inbound relay is required
|
|
||||||
pub fn inbound_wants_relay(&self) -> bool {
|
|
||||||
matches!(self, Self::OutboundOnly | Self::WebApp)
|
|
||||||
}
|
|
||||||
// Should an outbound relay be kept available?
|
// Should an outbound relay be kept available?
|
||||||
pub fn outbound_wants_relay(&self) -> bool {
|
pub fn outbound_wants_relay(&self) -> bool {
|
||||||
matches!(self, Self::WebApp)
|
matches!(self, Self::WebApp)
|
||||||
|
13
veilid-core/src/network_manager/types/relay_kind.rs
Normal file
13
veilid-core/src/network_manager/types/relay_kind.rs
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
|
||||||
|
pub enum RelayKind {
|
||||||
|
Inbound = 0,
|
||||||
|
Outbound = 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RelayKind {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Inbound
|
||||||
|
}
|
||||||
|
}
|
@ -215,6 +215,13 @@ impl RouteSpecStore {
|
|||||||
) -> VeilidAPIResult<RouteId> {
|
) -> VeilidAPIResult<RouteId> {
|
||||||
use core::cmp::Ordering;
|
use core::cmp::Ordering;
|
||||||
|
|
||||||
|
let ip6_prefix_size = rti
|
||||||
|
.unlocked_inner
|
||||||
|
.config
|
||||||
|
.get()
|
||||||
|
.network
|
||||||
|
.max_connections_per_ip6_prefix_size as usize;
|
||||||
|
|
||||||
if hop_count < 1 {
|
if hop_count < 1 {
|
||||||
apibail_invalid_argument!(
|
apibail_invalid_argument!(
|
||||||
"Not allocating route less than one hop in length",
|
"Not allocating route less than one hop in length",
|
||||||
@ -286,6 +293,39 @@ impl RouteSpecStore {
|
|||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Exclude nodes on our same ipblock, or their relay is on our same ipblock
|
||||||
|
// or our relay is on their ipblock, or their relay is on our relays same ipblock
|
||||||
|
|
||||||
|
// our node vs their node
|
||||||
|
if our_peer_info
|
||||||
|
.signed_node_info()
|
||||||
|
.node_info()
|
||||||
|
.node_is_on_same_ipblock(sni.node_info(), ip6_prefix_size)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if let Some(rni) = sni.relay_info() {
|
||||||
|
// our node vs their relay
|
||||||
|
if our_peer_info
|
||||||
|
.signed_node_info()
|
||||||
|
.node_info()
|
||||||
|
.node_is_on_same_ipblock(rni, ip6_prefix_size)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if let Some(our_rni) = our_peer_info.signed_node_info().relay_info() {
|
||||||
|
// our relay vs their relay
|
||||||
|
if our_rni.node_is_on_same_ipblock(rni, ip6_prefix_size) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(our_rni) = our_peer_info.signed_node_info().relay_info() {
|
||||||
|
// our relay vs their node
|
||||||
|
if our_rni.node_is_on_same_ipblock(sni.node_info(), ip6_prefix_size) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Relay check
|
// Relay check
|
||||||
let relay_ids = sni.relay_ids();
|
let relay_ids = sni.relay_ids();
|
||||||
if !relay_ids.is_empty() {
|
if !relay_ids.is_empty() {
|
||||||
@ -1527,7 +1567,7 @@ impl RouteSpecStore {
|
|||||||
/// Returns a route set id
|
/// Returns a route set id
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
feature = "verbose-tracing",
|
feature = "verbose-tracing",
|
||||||
instrument(level = "trace", skip(self, blob), ret, err)
|
instrument(level = "trace", skip(self), ret, err)
|
||||||
)]
|
)]
|
||||||
pub fn add_remote_private_route(
|
pub fn add_remote_private_route(
|
||||||
&self,
|
&self,
|
||||||
|
@ -103,7 +103,11 @@ impl RoutingDomainDetailCommon {
|
|||||||
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
|
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
|
||||||
&self.dial_info_details
|
&self.dial_info_details
|
||||||
}
|
}
|
||||||
pub(super) fn clear_dial_info_details(&mut self, address_type: Option<AddressType>, protocol_type: Option<ProtocolType>) {
|
pub(super) fn clear_dial_info_details(
|
||||||
|
&mut self,
|
||||||
|
address_type: Option<AddressType>,
|
||||||
|
protocol_type: Option<ProtocolType>,
|
||||||
|
) {
|
||||||
self.dial_info_details.retain_mut(|e| {
|
self.dial_info_details.retain_mut(|e| {
|
||||||
let mut remove = true;
|
let mut remove = true;
|
||||||
if let Some(pt) = protocol_type {
|
if let Some(pt) = protocol_type {
|
||||||
@ -138,41 +142,23 @@ impl RoutingDomainDetailCommon {
|
|||||||
VALID_ENVELOPE_VERSIONS.to_vec(),
|
VALID_ENVELOPE_VERSIONS.to_vec(),
|
||||||
VALID_CRYPTO_KINDS.to_vec(),
|
VALID_CRYPTO_KINDS.to_vec(),
|
||||||
self.capabilities.clone(),
|
self.capabilities.clone(),
|
||||||
self.dial_info_details.clone()
|
self.dial_info_details.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Check if any of our dialinfo require a relay for signaling
|
let relay_info = if let Some(rn) = &self.relay_node {
|
||||||
// FullConeNAT requires a relay but it does not have to be published because it does not require signaling
|
let opt_relay_pi = rn.locked(rti).make_peer_info(self.routing_domain);
|
||||||
let mut publish_relay = node_info.network_class().inbound_wants_relay() || node_info.network_class().outbound_wants_relay();
|
if let Some(relay_pi) = opt_relay_pi {
|
||||||
if !publish_relay {
|
let (relay_ids, relay_sni) = relay_pi.destructure();
|
||||||
// Check the dialinfo to see if they might want to publish a relay for signalling specifically
|
match relay_sni {
|
||||||
for did in self.dial_info_details() {
|
SignedNodeInfo::Direct(d) => Some((relay_ids, d)),
|
||||||
if did.class.requires_signal() {
|
SignedNodeInfo::Relayed(_) => {
|
||||||
publish_relay = true;
|
warn!("relay node should not have a relay itself! if this happens, a relay updated its signed node info and became a relay, which should cause the relay to be dropped");
|
||||||
break;
|
None
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let relay_info = if publish_relay {
|
|
||||||
self
|
|
||||||
.relay_node
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|rn| {
|
|
||||||
let opt_relay_pi = rn.locked(rti).make_peer_info(self.routing_domain);
|
|
||||||
if let Some(relay_pi) = opt_relay_pi {
|
|
||||||
let (relay_ids, relay_sni) = relay_pi.destructure();
|
|
||||||
match relay_sni {
|
|
||||||
SignedNodeInfo::Direct(d) => Some((relay_ids, d)),
|
|
||||||
SignedNodeInfo::Relayed(_) => {
|
|
||||||
warn!("relay node should not have a relay itself! if this happens, a relay updated its signed node info and became a relay, which should cause the relay to be dropped");
|
|
||||||
None
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
})
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@ -184,7 +170,7 @@ impl RoutingDomainDetailCommon {
|
|||||||
rti.unlocked_inner.node_id_typed_key_pairs(),
|
rti.unlocked_inner.node_id_typed_key_pairs(),
|
||||||
node_info,
|
node_info,
|
||||||
relay_ids,
|
relay_ids,
|
||||||
relay_sdni,
|
relay_sdni,
|
||||||
)
|
)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
),
|
),
|
||||||
@ -194,7 +180,7 @@ impl RoutingDomainDetailCommon {
|
|||||||
rti.unlocked_inner.node_id_typed_key_pairs(),
|
rti.unlocked_inner.node_id_typed_key_pairs(),
|
||||||
node_info,
|
node_info,
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap(),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -277,9 +263,8 @@ fn first_filtered_dial_info_detail_between_nodes(
|
|||||||
to_node: &NodeInfo,
|
to_node: &NodeInfo,
|
||||||
dial_info_filter: &DialInfoFilter,
|
dial_info_filter: &DialInfoFilter,
|
||||||
sequencing: Sequencing,
|
sequencing: Sequencing,
|
||||||
dif_sort: Option<Arc<DialInfoDetailSort>>
|
dif_sort: Option<Arc<DialInfoDetailSort>>,
|
||||||
) -> Option<DialInfoDetail> {
|
) -> Option<DialInfoDetail> {
|
||||||
|
|
||||||
// Consider outbound capabilities
|
// Consider outbound capabilities
|
||||||
let dial_info_filter = (*dial_info_filter).filtered(
|
let dial_info_filter = (*dial_info_filter).filtered(
|
||||||
&DialInfoFilter::all()
|
&DialInfoFilter::all()
|
||||||
@ -289,23 +274,25 @@ fn first_filtered_dial_info_detail_between_nodes(
|
|||||||
|
|
||||||
// Apply sequencing and get sort
|
// Apply sequencing and get sort
|
||||||
// Include sorting by external dial info sort for rotating through dialinfo
|
// Include sorting by external dial info sort for rotating through dialinfo
|
||||||
// based on an external preference table, for example the one kept by
|
// based on an external preference table, for example the one kept by
|
||||||
// AddressFilter to deprioritize dialinfo that have recently failed to connect
|
// AddressFilter to deprioritize dialinfo that have recently failed to connect
|
||||||
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
|
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
|
||||||
let sort: Option<Box<DialInfoDetailSort>> = if ordered {
|
let sort: Option<Box<DialInfoDetailSort>> = if ordered {
|
||||||
if let Some(dif_sort) = dif_sort {
|
if let Some(dif_sort) = dif_sort {
|
||||||
Some(Box::new(move |a, b| {
|
Some(Box::new(move |a, b| {
|
||||||
let mut ord = dif_sort(a,b);
|
let mut ord = dif_sort(a, b);
|
||||||
if ord == core::cmp::Ordering::Equal {
|
if ord == core::cmp::Ordering::Equal {
|
||||||
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
|
ord = DialInfoDetail::ordered_sequencing_sort(a, b);
|
||||||
}
|
}
|
||||||
ord
|
ord
|
||||||
}))
|
}))
|
||||||
} else {
|
} else {
|
||||||
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
|
Some(Box::new(move |a, b| {
|
||||||
|
DialInfoDetail::ordered_sequencing_sort(a, b)
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
} else if let Some(dif_sort) = dif_sort {
|
} else if let Some(dif_sort) = dif_sort {
|
||||||
Some(Box::new(move |a,b| { dif_sort(a,b) }))
|
Some(Box::new(move |a, b| dif_sort(a, b)))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@ -332,17 +319,27 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
}
|
}
|
||||||
fn get_contact_method(
|
fn get_contact_method(
|
||||||
&self,
|
&self,
|
||||||
_rti: &RoutingTableInner,
|
rti: &RoutingTableInner,
|
||||||
peer_a: &PeerInfo,
|
peer_a: &PeerInfo,
|
||||||
peer_b: &PeerInfo,
|
peer_b: &PeerInfo,
|
||||||
dial_info_filter: DialInfoFilter,
|
dial_info_filter: DialInfoFilter,
|
||||||
sequencing: Sequencing,
|
sequencing: Sequencing,
|
||||||
dif_sort: Option<Arc<DialInfoDetailSort>>,
|
dif_sort: Option<Arc<DialInfoDetailSort>>,
|
||||||
) -> ContactMethod {
|
) -> ContactMethod {
|
||||||
|
let ip6_prefix_size = rti
|
||||||
|
.unlocked_inner
|
||||||
|
.config
|
||||||
|
.get()
|
||||||
|
.network
|
||||||
|
.max_connections_per_ip6_prefix_size as usize;
|
||||||
|
|
||||||
// Get the nodeinfos for convenience
|
// Get the nodeinfos for convenience
|
||||||
let node_a = peer_a.signed_node_info().node_info();
|
let node_a = peer_a.signed_node_info().node_info();
|
||||||
let node_b = peer_b.signed_node_info().node_info();
|
let node_b = peer_b.signed_node_info().node_info();
|
||||||
|
|
||||||
|
// Check to see if these nodes are on the same network
|
||||||
|
let same_ipblock = node_a.node_is_on_same_ipblock(node_b, ip6_prefix_size);
|
||||||
|
|
||||||
// Get the node ids that would be used between these peers
|
// Get the node ids that would be used between these peers
|
||||||
let cck = common_crypto_kinds(&peer_a.node_ids().kinds(), &peer_b.node_ids().kinds());
|
let cck = common_crypto_kinds(&peer_a.node_ids().kinds(), &peer_b.node_ids().kinds());
|
||||||
let Some(best_ck) = cck.first().copied() else {
|
let Some(best_ck) = cck.first().copied() else {
|
||||||
@ -354,8 +351,19 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
let node_b_id = peer_b.node_ids().get(best_ck).unwrap();
|
let node_b_id = peer_b.node_ids().get(best_ck).unwrap();
|
||||||
|
|
||||||
// Get the best match dial info for node B if we have it
|
// Get the best match dial info for node B if we have it
|
||||||
if let Some(target_did) =
|
// Don't try direct inbound at all if the two nodes are on the same ipblock to avoid hairpin NAT issues
|
||||||
first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort.clone())
|
// as well avoiding direct traffic between same-network nodes. This would be done in the LocalNetwork RoutingDomain.
|
||||||
|
if let Some(target_did) = (!same_ipblock)
|
||||||
|
.then(|| {
|
||||||
|
first_filtered_dial_info_detail_between_nodes(
|
||||||
|
node_a,
|
||||||
|
node_b,
|
||||||
|
&dial_info_filter,
|
||||||
|
sequencing,
|
||||||
|
dif_sort.clone(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.flatten()
|
||||||
{
|
{
|
||||||
// Do we need to signal before going inbound?
|
// Do we need to signal before going inbound?
|
||||||
if !target_did.class.requires_signal() {
|
if !target_did.class.requires_signal() {
|
||||||
@ -365,16 +373,20 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
|
|
||||||
// Get the target's inbound relay, it must have one or it is not reachable
|
// Get the target's inbound relay, it must have one or it is not reachable
|
||||||
if let Some(node_b_relay) = peer_b.signed_node_info().relay_info() {
|
if let Some(node_b_relay) = peer_b.signed_node_info().relay_info() {
|
||||||
|
|
||||||
// Note that relay_peer_info could be node_a, in which case a connection already exists
|
// Note that relay_peer_info could be node_a, in which case a connection already exists
|
||||||
// and we only get here if the connection had dropped, in which case node_a is unreachable until
|
// and we only get here if the connection had dropped, in which case node_a is unreachable until
|
||||||
// it gets a new relay connection up
|
// it gets a new relay connection up
|
||||||
if peer_b.signed_node_info().relay_ids().contains_any(peer_a.node_ids()) {
|
if peer_b
|
||||||
|
.signed_node_info()
|
||||||
|
.relay_ids()
|
||||||
|
.contains_any(peer_a.node_ids())
|
||||||
|
{
|
||||||
return ContactMethod::Existing;
|
return ContactMethod::Existing;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get best node id to contact relay with
|
// Get best node id to contact relay with
|
||||||
let Some(node_b_relay_id) = peer_b.signed_node_info().relay_ids().get(best_ck) else {
|
let Some(node_b_relay_id) = peer_b.signed_node_info().relay_ids().get(best_ck)
|
||||||
|
else {
|
||||||
// No best relay id
|
// No best relay id
|
||||||
return ContactMethod::Unreachable;
|
return ContactMethod::Unreachable;
|
||||||
};
|
};
|
||||||
@ -399,12 +411,10 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
node_a,
|
node_a,
|
||||||
&dial_info_filter,
|
&dial_info_filter,
|
||||||
sequencing,
|
sequencing,
|
||||||
dif_sort.clone()
|
dif_sort.clone(),
|
||||||
) {
|
) {
|
||||||
// Ensure we aren't on the same public IP address (no hairpin nat)
|
// Ensure we aren't on the same public IP address (no hairpin nat)
|
||||||
if reverse_did.dial_info.ip_addr()
|
if reverse_did.dial_info.ip_addr() != target_did.dial_info.ip_addr() {
|
||||||
!= target_did.dial_info.ip_addr()
|
|
||||||
{
|
|
||||||
// Can we receive a direct reverse connection?
|
// Can we receive a direct reverse connection?
|
||||||
if !reverse_did.class.requires_signal() {
|
if !reverse_did.class.requires_signal() {
|
||||||
return ContactMethod::SignalReverse(
|
return ContactMethod::SignalReverse(
|
||||||
@ -425,16 +435,18 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
node_b,
|
node_b,
|
||||||
&udp_dial_info_filter,
|
&udp_dial_info_filter,
|
||||||
sequencing,
|
sequencing,
|
||||||
dif_sort.clone()
|
dif_sort.clone(),
|
||||||
) {
|
) {
|
||||||
// Does node A have a direct udp dialinfo that node B can reach?
|
// Does node A have a direct udp dialinfo that node B can reach?
|
||||||
if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes(
|
if let Some(reverse_udp_did) =
|
||||||
node_b,
|
first_filtered_dial_info_detail_between_nodes(
|
||||||
node_a,
|
node_b,
|
||||||
&udp_dial_info_filter,
|
node_a,
|
||||||
sequencing,
|
&udp_dial_info_filter,
|
||||||
dif_sort.clone(),
|
sequencing,
|
||||||
) {
|
dif_sort.clone(),
|
||||||
|
)
|
||||||
|
{
|
||||||
// Ensure we aren't on the same public IP address (no hairpin nat)
|
// Ensure we aren't on the same public IP address (no hairpin nat)
|
||||||
if reverse_udp_did.dial_info.ip_addr()
|
if reverse_udp_did.dial_info.ip_addr()
|
||||||
!= target_udp_did.dial_info.ip_addr()
|
!= target_udp_did.dial_info.ip_addr()
|
||||||
@ -454,13 +466,16 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If the node B has no direct dial info, it needs to have an inbound relay
|
// If the node B has no direct dial info or is on the same ipblock, it needs to have an inbound relay
|
||||||
else if let Some(node_b_relay) = peer_b.signed_node_info().relay_info() {
|
else if let Some(node_b_relay) = peer_b.signed_node_info().relay_info() {
|
||||||
|
|
||||||
// Note that relay_peer_info could be node_a, in which case a connection already exists
|
// Note that relay_peer_info could be node_a, in which case a connection already exists
|
||||||
// and we only get here if the connection had dropped, in which case node_b is unreachable until
|
// and we only get here if the connection had dropped, in which case node_b is unreachable until
|
||||||
// it gets a new relay connection up
|
// it gets a new relay connection up
|
||||||
if peer_b.signed_node_info().relay_ids().contains_any(peer_a.node_ids()) {
|
if peer_b
|
||||||
|
.signed_node_info()
|
||||||
|
.relay_ids()
|
||||||
|
.contains_any(peer_a.node_ids())
|
||||||
|
{
|
||||||
return ContactMethod::Existing;
|
return ContactMethod::Existing;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -476,26 +491,29 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
node_b_relay,
|
node_b_relay,
|
||||||
&dial_info_filter,
|
&dial_info_filter,
|
||||||
sequencing,
|
sequencing,
|
||||||
dif_sort.clone()
|
dif_sort.clone(),
|
||||||
)
|
)
|
||||||
.is_some()
|
.is_some()
|
||||||
{
|
{
|
||||||
///////// Reverse connection
|
///////// Reverse connection
|
||||||
|
|
||||||
// Get the best match dial info for an reverse inbound connection from node B to node A
|
// Get the best match dial info for an reverse inbound connection from node B to node A
|
||||||
if let Some(reverse_did) = first_filtered_dial_info_detail_between_nodes(
|
// unless both nodes are on the same ipblock
|
||||||
node_b,
|
if let Some(reverse_did) = (!same_ipblock)
|
||||||
node_a,
|
.then(|| {
|
||||||
&dial_info_filter,
|
first_filtered_dial_info_detail_between_nodes(
|
||||||
sequencing,
|
node_b,
|
||||||
dif_sort.clone()
|
node_a,
|
||||||
) {
|
&dial_info_filter,
|
||||||
|
sequencing,
|
||||||
|
dif_sort.clone(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.flatten()
|
||||||
|
{
|
||||||
// Can we receive a direct reverse connection?
|
// Can we receive a direct reverse connection?
|
||||||
if !reverse_did.class.requires_signal() {
|
if !reverse_did.class.requires_signal() {
|
||||||
return ContactMethod::SignalReverse(
|
return ContactMethod::SignalReverse(node_b_relay_id, node_b_id);
|
||||||
node_b_relay_id,
|
|
||||||
node_b_id,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -504,7 +522,12 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If node A can't reach the node by other means, it may need to use its outbound relay
|
// If node A can't reach the node by other means, it may need to use its outbound relay
|
||||||
if peer_a.signed_node_info().node_info().network_class().outbound_wants_relay() {
|
if peer_a
|
||||||
|
.signed_node_info()
|
||||||
|
.node_info()
|
||||||
|
.network_class()
|
||||||
|
.outbound_wants_relay()
|
||||||
|
{
|
||||||
if let Some(node_a_relay_id) = peer_a.signed_node_info().relay_ids().get(best_ck) {
|
if let Some(node_a_relay_id) = peer_a.signed_node_info().relay_ids().get(best_ck) {
|
||||||
// Ensure it's not our relay we're trying to reach
|
// Ensure it's not our relay we're trying to reach
|
||||||
if node_a_relay_id != node_b_id {
|
if node_a_relay_id != node_b_id {
|
||||||
@ -573,7 +596,6 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
|
|||||||
sequencing: Sequencing,
|
sequencing: Sequencing,
|
||||||
dif_sort: Option<Arc<DialInfoDetailSort>>,
|
dif_sort: Option<Arc<DialInfoDetailSort>>,
|
||||||
) -> ContactMethod {
|
) -> ContactMethod {
|
||||||
|
|
||||||
// Get the nodeinfos for convenience
|
// Get the nodeinfos for convenience
|
||||||
let node_a = peer_a.signed_node_info().node_info();
|
let node_a = peer_a.signed_node_info().node_info();
|
||||||
let node_b = peer_b.signed_node_info().node_info();
|
let node_b = peer_b.signed_node_info().node_info();
|
||||||
@ -585,10 +607,16 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
|
|||||||
return ContactMethod::Unreachable;
|
return ContactMethod::Unreachable;
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(target_did) = first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort) {
|
if let Some(target_did) = first_filtered_dial_info_detail_between_nodes(
|
||||||
|
node_a,
|
||||||
|
node_b,
|
||||||
|
&dial_info_filter,
|
||||||
|
sequencing,
|
||||||
|
dif_sort,
|
||||||
|
) {
|
||||||
return ContactMethod::Direct(target_did.dial_info);
|
return ContactMethod::Direct(target_did.dial_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
ContactMethod::Unreachable
|
ContactMethod::Unreachable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,51 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
impl RoutingTable {
|
impl RoutingTable {
|
||||||
|
// Check if a relay is desired or not
|
||||||
|
fn public_internet_wants_relay(&self) -> Option<RelayKind> {
|
||||||
|
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
|
||||||
|
let own_node_info = own_peer_info.signed_node_info().node_info();
|
||||||
|
let network_class = own_node_info.network_class();
|
||||||
|
|
||||||
|
// Never give a relay to something with an invalid network class
|
||||||
|
if matches!(network_class, NetworkClass::Invalid) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we -need- a relay always request one
|
||||||
|
if let Some(rk) = own_node_info.requires_relay() {
|
||||||
|
return Some(rk);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't always need a relay, but we don't have support for
|
||||||
|
// all the address types then we should request one anyway
|
||||||
|
let mut address_types = AddressTypeSet::empty();
|
||||||
|
for did in own_node_info.dial_info_detail_list() {
|
||||||
|
address_types |= did.dial_info.address_type();
|
||||||
|
}
|
||||||
|
if address_types != AddressTypeSet::all() {
|
||||||
|
return Some(RelayKind::Inbound);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are behind some NAT, then we should get ourselves a relay just
|
||||||
|
// in case we need to navigate hairpin NAT to our own network
|
||||||
|
let mut inbound_addresses = HashSet::<SocketAddr>::new();
|
||||||
|
for did in own_node_info.dial_info_detail_list() {
|
||||||
|
inbound_addresses.insert(did.dial_info.to_socket_addr());
|
||||||
|
}
|
||||||
|
let own_local_peer_info = self.get_own_peer_info(RoutingDomain::LocalNetwork);
|
||||||
|
let own_local_node_info = own_local_peer_info.signed_node_info().node_info();
|
||||||
|
for ldid in own_local_node_info.dial_info_detail_list() {
|
||||||
|
inbound_addresses.remove(&ldid.dial_info.to_socket_addr());
|
||||||
|
}
|
||||||
|
if !inbound_addresses.is_empty() {
|
||||||
|
return Some(RelayKind::Inbound);
|
||||||
|
}
|
||||||
|
|
||||||
|
// No relay is desired
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
// Keep relays assigned and accessible
|
// Keep relays assigned and accessible
|
||||||
#[instrument(level = "trace", skip(self), err)]
|
#[instrument(level = "trace", skip(self), err)]
|
||||||
pub(crate) async fn relay_management_task_routine(
|
pub(crate) async fn relay_management_task_routine(
|
||||||
@ -9,10 +54,8 @@ impl RoutingTable {
|
|||||||
_last_ts: Timestamp,
|
_last_ts: Timestamp,
|
||||||
cur_ts: Timestamp,
|
cur_ts: Timestamp,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
|
|
||||||
let own_node_info = own_peer_info.signed_node_info().node_info();
|
|
||||||
let network_class = own_node_info.network_class();
|
|
||||||
let relay_node_filter = self.make_public_internet_relay_node_filter();
|
let relay_node_filter = self.make_public_internet_relay_node_filter();
|
||||||
|
let relay_desired = self.public_internet_wants_relay();
|
||||||
|
|
||||||
// Get routing domain editor
|
// Get routing domain editor
|
||||||
let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet);
|
let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet);
|
||||||
@ -36,19 +79,10 @@ impl RoutingTable {
|
|||||||
editor.clear_relay_node();
|
editor.clear_relay_node();
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
// Relay node is no longer required
|
// Relay node is no longer wanted
|
||||||
else if !own_node_info.requires_relay() {
|
else if relay_desired.is_none() {
|
||||||
log_rtab!(debug
|
log_rtab!(debug
|
||||||
"Relay node no longer required, dropping relay {}",
|
"Relay node no longer desired, dropping relay {}",
|
||||||
relay_node
|
|
||||||
);
|
|
||||||
editor.clear_relay_node();
|
|
||||||
false
|
|
||||||
}
|
|
||||||
// Should not have relay for invalid network class
|
|
||||||
else if !self.has_valid_network_class(RoutingDomain::PublicInternet) {
|
|
||||||
log_rtab!(debug
|
|
||||||
"Invalid network class does not get a relay, dropping relay {}",
|
|
||||||
relay_node
|
relay_node
|
||||||
);
|
);
|
||||||
editor.clear_relay_node();
|
editor.clear_relay_node();
|
||||||
@ -61,11 +95,13 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Do we need a relay?
|
// Do we want a relay?
|
||||||
if !has_relay && own_node_info.requires_relay() {
|
if !has_relay && relay_desired.is_some() {
|
||||||
|
let relay_desired = relay_desired.unwrap();
|
||||||
|
|
||||||
// Do we want an outbound relay?
|
// Do we want an outbound relay?
|
||||||
let mut got_outbound_relay = false;
|
let mut got_outbound_relay = false;
|
||||||
if network_class.outbound_wants_relay() {
|
if matches!(relay_desired, RelayKind::Outbound) {
|
||||||
// The outbound relay is the host of the PWA
|
// The outbound relay is the host of the PWA
|
||||||
if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await {
|
if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await {
|
||||||
// Register new outbound relay
|
// Register new outbound relay
|
||||||
@ -89,7 +125,11 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
if !got_outbound_relay {
|
if !got_outbound_relay {
|
||||||
// Find a node in our routing table that is an acceptable inbound relay
|
// Find a node in our routing table that is an acceptable inbound relay
|
||||||
if let Some(nr) = self.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) {
|
if let Some(nr) = self.find_inbound_relay(
|
||||||
|
RoutingDomain::PublicInternet,
|
||||||
|
cur_ts,
|
||||||
|
relay_node_filter,
|
||||||
|
) {
|
||||||
log_rtab!(debug "Inbound relay node selected: {}", nr);
|
log_rtab!(debug "Inbound relay node selected: {}", nr);
|
||||||
editor.set_relay_node(nr);
|
editor.set_relay_node(nr);
|
||||||
}
|
}
|
||||||
@ -106,40 +146,71 @@ impl RoutingTable {
|
|||||||
// Get all our outbound protocol/address types
|
// Get all our outbound protocol/address types
|
||||||
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
|
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
|
||||||
let mapped_port_info = self.get_low_level_port_info();
|
let mapped_port_info = self.get_low_level_port_info();
|
||||||
|
let own_node_info = self
|
||||||
|
.get_own_peer_info(RoutingDomain::PublicInternet)
|
||||||
|
.signed_node_info()
|
||||||
|
.node_info()
|
||||||
|
.clone();
|
||||||
|
let ip6_prefix_size = self
|
||||||
|
.unlocked_inner
|
||||||
|
.config
|
||||||
|
.get()
|
||||||
|
.network
|
||||||
|
.max_connections_per_ip6_prefix_size as usize;
|
||||||
|
|
||||||
move |e: &BucketEntryInner| {
|
move |e: &BucketEntryInner| {
|
||||||
// Ensure this node is not on the local network
|
// Ensure this node is not on the local network and is on the public internet
|
||||||
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
|
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
let Some(node_info) = e.node_info(RoutingDomain::PublicInternet) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Disqualify nodes that don't have relay capability or require a relay themselves
|
||||||
|
if !(node_info.has_capability(CAP_RELAY) && node_info.is_fully_direct_inbound()) {
|
||||||
|
// Needs to be able to accept packets to relay directly
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// Disqualify nodes that don't cover all our inbound ports for tcp and udp
|
// Disqualify nodes that don't cover all our inbound ports for tcp and udp
|
||||||
// as we need to be able to use the relay for keepalives for all nat mappings
|
// as we need to be able to use the relay for keepalives for all nat mappings
|
||||||
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
|
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
|
||||||
|
let dids = node_info.all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| {
|
||||||
|
did.matches_filter(&outbound_dif)
|
||||||
|
});
|
||||||
|
for did in &dids {
|
||||||
|
let pt = did.dial_info.protocol_type();
|
||||||
|
let at = did.dial_info.address_type();
|
||||||
|
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) {
|
||||||
|
low_level_protocol_ports.remove(&(*llpt, at, *port));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !low_level_protocol_ports.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
let can_serve_as_relay = e
|
// For all protocol types we could connect to the relay by, ensure the relay supports all address types
|
||||||
.node_info(RoutingDomain::PublicInternet)
|
let mut address_type_mappings = HashMap::<ProtocolType, AddressTypeSet>::new();
|
||||||
.map(|n| {
|
let dids = node_info.dial_info_detail_list();
|
||||||
if !(n.has_capability(CAP_RELAY) && n.is_fully_direct_inbound()) {
|
for did in dids {
|
||||||
// Needs to be able to accept packets to relay directly
|
address_type_mappings
|
||||||
|
.entry(did.dial_info.protocol_type())
|
||||||
|
.and_modify(|x| {
|
||||||
|
x.insert(did.dial_info.address_type());
|
||||||
|
})
|
||||||
|
.or_insert_with(|| did.dial_info.address_type().into());
|
||||||
|
}
|
||||||
|
for pt in outbound_dif.protocol_type_set.iter() {
|
||||||
|
if let Some(ats) = address_type_mappings.get(&pt) {
|
||||||
|
if *ats != AddressTypeSet::all() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let dids = n.all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| {
|
// Exclude any nodes that have our same network block
|
||||||
did.matches_filter(&outbound_dif)
|
if own_node_info.node_is_on_same_ipblock(node_info, ip6_prefix_size) {
|
||||||
});
|
|
||||||
for did in &dids {
|
|
||||||
let pt = did.dial_info.protocol_type();
|
|
||||||
let at = did.dial_info.address_type();
|
|
||||||
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at))
|
|
||||||
{
|
|
||||||
low_level_protocol_ports.remove(&(*llpt, at, *port));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
low_level_protocol_ports.is_empty()
|
|
||||||
})
|
|
||||||
.unwrap_or(false);
|
|
||||||
if !can_serve_as_relay {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,20 +218,13 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", skip(self), ret)]
|
#[instrument(level = "trace", skip(self, relay_node_filter), ret)]
|
||||||
pub fn find_inbound_relay(
|
pub fn find_inbound_relay(
|
||||||
&self,
|
&self,
|
||||||
routing_domain: RoutingDomain,
|
routing_domain: RoutingDomain,
|
||||||
cur_ts: Timestamp,
|
cur_ts: Timestamp,
|
||||||
|
relay_node_filter: impl Fn(&BucketEntryInner) -> bool,
|
||||||
) -> Option<NodeRef> {
|
) -> Option<NodeRef> {
|
||||||
// Get relay filter function
|
|
||||||
let relay_node_filter = match routing_domain {
|
|
||||||
RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(),
|
|
||||||
RoutingDomain::LocalNetwork => {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Go through all entries and find fastest entry that matches filter function
|
// Go through all entries and find fastest entry that matches filter function
|
||||||
let inner = self.inner.read();
|
let inner = self.inner.read();
|
||||||
let inner = &*inner;
|
let inner = &*inner;
|
||||||
|
@ -129,24 +129,24 @@ impl NodeInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Is some relay required either for signal or inbound relay or outbound relay?
|
/// Is some relay required either for signal or inbound relay or outbound relay?
|
||||||
pub fn requires_relay(&self) -> bool {
|
pub fn requires_relay(&self) -> Option<RelayKind> {
|
||||||
match self.network_class {
|
match self.network_class {
|
||||||
NetworkClass::InboundCapable => {
|
NetworkClass::InboundCapable => {
|
||||||
for did in &self.dial_info_detail_list {
|
for did in &self.dial_info_detail_list {
|
||||||
if did.class.requires_relay() {
|
if did.class.requires_relay() {
|
||||||
return true;
|
return Some(RelayKind::Inbound);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
NetworkClass::OutboundOnly => {
|
NetworkClass::OutboundOnly => {
|
||||||
return true;
|
return Some(RelayKind::Inbound);
|
||||||
}
|
}
|
||||||
NetworkClass::WebApp => {
|
NetworkClass::WebApp => {
|
||||||
return true;
|
return Some(RelayKind::Outbound);
|
||||||
}
|
}
|
||||||
NetworkClass::Invalid => {}
|
NetworkClass::Invalid => {}
|
||||||
}
|
}
|
||||||
false
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn has_capability(&self, cap: Capability) -> bool {
|
pub fn has_capability(&self, cap: Capability) -> bool {
|
||||||
@ -175,4 +175,21 @@ impl NodeInfo {
|
|||||||
}
|
}
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Does this appear on the same network within the routing domain
|
||||||
|
pub fn node_is_on_same_ipblock(&self, node_b: &NodeInfo, ip6_prefix_size: usize) -> bool {
|
||||||
|
let our_ip_blocks = self
|
||||||
|
.dial_info_detail_list()
|
||||||
|
.iter()
|
||||||
|
.map(|did| ip_to_ipblock(ip6_prefix_size, did.dial_info.to_socket_addr().ip()))
|
||||||
|
.collect::<HashSet<_>>();
|
||||||
|
|
||||||
|
for did in node_b.dial_info_detail_list() {
|
||||||
|
let ipblock = ip_to_ipblock(ip6_prefix_size, did.dial_info.to_socket_addr().ip());
|
||||||
|
if our_ip_blocks.contains(&ipblock) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -619,9 +619,15 @@ impl RPCProcessor {
|
|||||||
// Ensure the reply comes over the private route that was requested
|
// Ensure the reply comes over the private route that was requested
|
||||||
if let Some(reply_private_route) = waitable_reply.reply_private_route {
|
if let Some(reply_private_route) = waitable_reply.reply_private_route {
|
||||||
match &rpcreader.header.detail {
|
match &rpcreader.header.detail {
|
||||||
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {
|
RPCMessageHeaderDetail::Direct(_) => {
|
||||||
return Err(RPCError::protocol("should have received reply over private route"));
|
return Err(RPCError::protocol("should have received reply over private route or stub"));
|
||||||
}
|
},
|
||||||
|
RPCMessageHeaderDetail::SafetyRouted(sr) => {
|
||||||
|
let node_id = self.routing_table.node_id(sr.direct.envelope.get_crypto_kind());
|
||||||
|
if node_id.value != reply_private_route {
|
||||||
|
return Err(RPCError::protocol("should have received reply from safety route to a stub"));
|
||||||
|
}
|
||||||
|
},
|
||||||
RPCMessageHeaderDetail::PrivateRouted(pr) => {
|
RPCMessageHeaderDetail::PrivateRouted(pr) => {
|
||||||
if pr.private_route != reply_private_route {
|
if pr.private_route != reply_private_route {
|
||||||
return Err(RPCError::protocol("received reply over the wrong private route"));
|
return Err(RPCError::protocol("received reply over the wrong private route"));
|
||||||
|
@ -22,9 +22,7 @@ impl RPCProcessor {
|
|||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
feature = "verbose-tracing",
|
feature = "verbose-tracing",
|
||||||
instrument(level = "trace", skip(self, last_descriptor),
|
instrument(level = "trace", skip(self, last_descriptor),
|
||||||
fields(ret.value.data.len,
|
fields(ret.peers.len,
|
||||||
ret.seqs,
|
|
||||||
ret.peers.len,
|
|
||||||
ret.latency
|
ret.latency
|
||||||
),err)
|
),err)
|
||||||
)]
|
)]
|
||||||
@ -64,7 +62,8 @@ impl RPCProcessor {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Send the inspectvalue question
|
// Send the inspectvalue question
|
||||||
let inspect_value_q = RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?;
|
let inspect_value_q =
|
||||||
|
RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?;
|
||||||
let question = RPCQuestion::new(
|
let question = RPCQuestion::new(
|
||||||
network_result_try!(self.get_destination_respond_to(&dest)?),
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
RPCQuestionDetail::InspectValueQ(Box::new(inspect_value_q)),
|
RPCQuestionDetail::InspectValueQ(Box::new(inspect_value_q)),
|
||||||
@ -107,19 +106,18 @@ impl RPCProcessor {
|
|||||||
let debug_string_answer = format!(
|
let debug_string_answer = format!(
|
||||||
"OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}",
|
"OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}",
|
||||||
key,
|
key,
|
||||||
if descriptor.is_some() {
|
if descriptor.is_some() { " +desc" } else { "" },
|
||||||
" +desc"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
},
|
|
||||||
peers.len(),
|
peers.len(),
|
||||||
dest,
|
dest,
|
||||||
debug_seqs(&seqs)
|
debug_seqs(&seqs)
|
||||||
);
|
);
|
||||||
|
|
||||||
log_dht!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
|
|
||||||
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
let peer_ids: Vec<String> = peers
|
||||||
|
.iter()
|
||||||
|
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
|
||||||
|
.collect();
|
||||||
log_dht!(debug "Peers: {:#?}", peer_ids);
|
log_dht!(debug "Peers: {:#?}", peer_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,8 +138,6 @@ impl RPCProcessor {
|
|||||||
#[cfg(feature = "verbose-tracing")]
|
#[cfg(feature = "verbose-tracing")]
|
||||||
tracing::Span::current().record("ret.latency", latency.as_u64());
|
tracing::Span::current().record("ret.latency", latency.as_u64());
|
||||||
#[cfg(feature = "verbose-tracing")]
|
#[cfg(feature = "verbose-tracing")]
|
||||||
tracing::Span::current().record("ret.seqs", seqs);
|
|
||||||
#[cfg(feature = "verbose-tracing")]
|
|
||||||
tracing::Span::current().record("ret.peers.len", peers.len());
|
tracing::Span::current().record("ret.peers.len", peers.len());
|
||||||
|
|
||||||
Ok(NetworkResult::value(Answer::new(
|
Ok(NetworkResult::value(Answer::new(
|
||||||
@ -158,11 +154,7 @@ impl RPCProcessor {
|
|||||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
|
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
|
||||||
pub(crate) async fn process_inspect_value_q(
|
pub(crate) async fn process_inspect_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
|
||||||
&self,
|
|
||||||
msg: RPCMessage,
|
|
||||||
) -> RPCNetworkResult<()> {
|
|
||||||
|
|
||||||
// Ensure this never came over a private route, safety route is okay though
|
// Ensure this never came over a private route, safety route is okay though
|
||||||
match &msg.header.detail {
|
match &msg.header.detail {
|
||||||
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
|
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
|
||||||
@ -175,14 +167,8 @@ impl RPCProcessor {
|
|||||||
// Ignore if disabled
|
// Ignore if disabled
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let opi = routing_table.get_own_peer_info(msg.header.routing_domain());
|
let opi = routing_table.get_own_peer_info(msg.header.routing_domain());
|
||||||
if !opi
|
if !opi.signed_node_info().node_info().has_capability(CAP_DHT) {
|
||||||
.signed_node_info()
|
return Ok(NetworkResult::service_unavailable("dht is not available"));
|
||||||
.node_info()
|
|
||||||
.has_capability(CAP_DHT)
|
|
||||||
{
|
|
||||||
return Ok(NetworkResult::service_unavailable(
|
|
||||||
"dht is not available",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the question
|
// Get the question
|
||||||
@ -200,18 +186,16 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
// Get the nodes that we know about that are closer to the the key than our own node
|
// Get the nodes that we know about that are closer to the the key than our own node
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
|
let closer_to_key_peers = network_result_try!(
|
||||||
|
routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])
|
||||||
|
);
|
||||||
|
|
||||||
if debug_target_enabled!("dht") {
|
if debug_target_enabled!("dht") {
|
||||||
let debug_string = format!(
|
let debug_string = format!(
|
||||||
"IN <=== InspectValueQ({} {}{}) <== {}",
|
"IN <=== InspectValueQ({} {}{}) <== {}",
|
||||||
key,
|
key,
|
||||||
subkeys,
|
subkeys,
|
||||||
if want_descriptor {
|
if want_descriptor { " +wantdesc" } else { "" },
|
||||||
" +wantdesc"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
},
|
|
||||||
msg.header.direct_sender_node_id()
|
msg.header.direct_sender_node_id()
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -223,20 +207,21 @@ impl RPCProcessor {
|
|||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
c.network.dht.set_value_count as usize
|
c.network.dht.set_value_count as usize
|
||||||
};
|
};
|
||||||
let (inspect_result_seqs, inspect_result_descriptor) = if closer_to_key_peers.len() >= set_value_count {
|
let (inspect_result_seqs, inspect_result_descriptor) =
|
||||||
// Not close enough
|
if closer_to_key_peers.len() >= set_value_count {
|
||||||
(Vec::new(), None)
|
// Not close enough
|
||||||
} else {
|
(Vec::new(), None)
|
||||||
// Close enough, lets get it
|
} else {
|
||||||
|
// Close enough, lets get it
|
||||||
|
|
||||||
// See if we have this record ourselves
|
// See if we have this record ourselves
|
||||||
let storage_manager = self.storage_manager();
|
let storage_manager = self.storage_manager();
|
||||||
let inspect_result = network_result_try!(storage_manager
|
let inspect_result = network_result_try!(storage_manager
|
||||||
.inbound_inspect_value(key, subkeys, want_descriptor)
|
.inbound_inspect_value(key, subkeys, want_descriptor)
|
||||||
.await
|
.await
|
||||||
.map_err(RPCError::internal)?);
|
.map_err(RPCError::internal)?);
|
||||||
(inspect_result.seqs, inspect_result.opt_descriptor)
|
(inspect_result.seqs, inspect_result.opt_descriptor)
|
||||||
};
|
};
|
||||||
|
|
||||||
if debug_target_enabled!("dht") {
|
if debug_target_enabled!("dht") {
|
||||||
let debug_string_answer = format!(
|
let debug_string_answer = format!(
|
||||||
@ -251,10 +236,10 @@ impl RPCProcessor {
|
|||||||
closer_to_key_peers.len(),
|
closer_to_key_peers.len(),
|
||||||
msg.header.direct_sender_node_id()
|
msg.header.direct_sender_node_id()
|
||||||
);
|
);
|
||||||
|
|
||||||
log_dht!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make InspectValue answer
|
// Make InspectValue answer
|
||||||
let inspect_value_a = RPCOperationInspectValueA::new(
|
let inspect_value_a = RPCOperationInspectValueA::new(
|
||||||
inspect_result_seqs,
|
inspect_result_seqs,
|
||||||
@ -263,7 +248,10 @@ impl RPCProcessor {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Send InspectValue answer
|
// Send InspectValue answer
|
||||||
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a))))
|
self.answer(
|
||||||
.await
|
msg,
|
||||||
|
RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a))),
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
impl RPCProcessor {
|
impl RPCProcessor {
|
||||||
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))]
|
// Sends a dht value change notification
|
||||||
// Sends a high level app message
|
// Can be sent via all methods including relays and routes but never over a safety route
|
||||||
// Can be sent via all methods including relays and routes
|
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
feature = "verbose-tracing",
|
feature = "verbose-tracing",
|
||||||
instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err)
|
instrument(level = "trace", skip(self, value), err)
|
||||||
)]
|
)]
|
||||||
pub async fn rpc_call_value_changed(
|
pub async fn rpc_call_value_changed(
|
||||||
self,
|
self,
|
||||||
|
Loading…
Reference in New Issue
Block a user