mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-13 08:19:47 -05:00
filters
This commit is contained in:
parent
82dce24224
commit
9e506d23df
@ -107,28 +107,50 @@ impl DiscoveryContext {
|
|||||||
address_type: AddressType,
|
address_type: AddressType,
|
||||||
ignore_node: Option<DHTKey>,
|
ignore_node: Option<DHTKey>,
|
||||||
) -> Option<(SocketAddress, NodeRef)> {
|
) -> Option<(SocketAddress, NodeRef)> {
|
||||||
let filter = DialInfoFilter::global()
|
|
||||||
.with_protocol_type(protocol_type)
|
|
||||||
.with_address_type(address_type);
|
|
||||||
let node_count = {
|
let node_count = {
|
||||||
let config = self.routing_table.network_manager().config();
|
let config = self.routing_table.network_manager().config();
|
||||||
let c = config.get();
|
let c = config.get();
|
||||||
c.network.dht.max_find_node_count as usize
|
c.network.dht.max_find_node_count as usize
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Build an filter that matches our protocol and address type
|
||||||
|
// and excludes relays so we can get an accurate external address
|
||||||
|
let dial_info_filter = DialInfoFilter::global()
|
||||||
|
.with_protocol_type(protocol_type)
|
||||||
|
.with_address_type(address_type);
|
||||||
|
let inbound_dial_info_entry_filter =
|
||||||
|
RoutingTable::make_inbound_dial_info_entry_filter(dial_info_filter.clone());
|
||||||
|
let disallow_relays_filter = move |e: &BucketEntryInner| {
|
||||||
|
if let Some(n) = e.node_info() {
|
||||||
|
n.relay_peer_info.is_none()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let filter =
|
||||||
|
RoutingTable::combine_filters(inbound_dial_info_entry_filter, disallow_relays_filter);
|
||||||
|
|
||||||
|
// Find public nodes matching this filter
|
||||||
let peers = self
|
let peers = self
|
||||||
.routing_table
|
.routing_table
|
||||||
.find_fast_public_nodes_filtered(node_count, &filter);
|
.find_fast_public_nodes_filtered(node_count, filter);
|
||||||
if peers.is_empty() {
|
if peers.is_empty() {
|
||||||
log_net!("no peers of type '{:?}'", filter);
|
log_net!(
|
||||||
|
"no external address detection peers of type {:?}:{:?}",
|
||||||
|
protocol_type,
|
||||||
|
address_type
|
||||||
|
);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
for peer in peers {
|
|
||||||
|
// For each peer, if it's not our ignore-node, ask them for our public address, filtering on desired dial info
|
||||||
|
for mut peer in peers {
|
||||||
if let Some(ignore_node) = ignore_node {
|
if let Some(ignore_node) = ignore_node {
|
||||||
if peer.node_id() == ignore_node {
|
if peer.node_id() == ignore_node {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
peer.set_filter(Some(dial_info_filter.clone()));
|
||||||
if let Some(sa) = self.request_public_address(peer.clone()).await {
|
if let Some(sa) = self.request_public_address(peer.clone()).await {
|
||||||
return Some((sa, peer));
|
return Some((sa, peer));
|
||||||
}
|
}
|
||||||
@ -249,7 +271,10 @@ impl DiscoveryContext {
|
|||||||
inner.external_1_address = Some(external_1);
|
inner.external_1_address = Some(external_1);
|
||||||
inner.node_1 = Some(node_1);
|
inner.node_1 = Some(node_1);
|
||||||
|
|
||||||
log_net!(debug "external_1_dial_info: {:?}\nexternal_1_address: {:?}\nnode_1: {:?}", inner.external_1_dial_info, inner.external_1_address, inner.node_1);
|
info!(
|
||||||
|
"external_1_dial_info: {:?}\nexternal_1_address: {:?}\nnode_1: {:?}",
|
||||||
|
inner.external_1_dial_info, inner.external_1_address, inner.node_1
|
||||||
|
);
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@ -339,6 +364,11 @@ impl DiscoveryContext {
|
|||||||
Some(v) => v,
|
Some(v) => v,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"external_2_address: {:?}\nnode_2: {:?}",
|
||||||
|
external_2_address, node_2
|
||||||
|
);
|
||||||
|
|
||||||
// If we have two different external addresses, then this is a symmetric NAT
|
// If we have two different external addresses, then this is a symmetric NAT
|
||||||
if external_2_address != external_1_address {
|
if external_2_address != external_1_address {
|
||||||
// Symmetric NAT is outbound only, no public dial info will work
|
// Symmetric NAT is outbound only, no public dial info will work
|
||||||
|
@ -5,15 +5,64 @@ use crate::xx::*;
|
|||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
||||||
impl RoutingTable {
|
impl RoutingTable {
|
||||||
// Retrieve the fastest nodes in the routing table with a particular kind of protocol and address type
|
// Makes a filter that finds nodes with a matching inbound dialinfo
|
||||||
// Returns noderefs are are scoped to that address type only
|
pub fn make_inbound_dial_info_entry_filter(
|
||||||
pub fn find_fast_public_nodes_filtered(
|
dial_info_filter: DialInfoFilter,
|
||||||
|
) -> impl FnMut(&BucketEntryInner) -> bool {
|
||||||
|
// does it have matching public dial info?
|
||||||
|
move |e| {
|
||||||
|
e.node_info()
|
||||||
|
.map(|n| {
|
||||||
|
n.first_filtered_dial_info_detail(|did| did.matches_filter(&dial_info_filter))
|
||||||
|
.is_some()
|
||||||
|
})
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Makes a filter that finds nodes capable of dialing a particular outbound dialinfo
|
||||||
|
pub fn make_outbound_dial_info_entry_filter(
|
||||||
|
dial_info: DialInfo,
|
||||||
|
) -> impl FnMut(&BucketEntryInner) -> bool {
|
||||||
|
// does the node's outbound capabilities match the dialinfo?
|
||||||
|
move |e| {
|
||||||
|
e.node_info()
|
||||||
|
.map(|n| {
|
||||||
|
let mut dif = DialInfoFilter::all();
|
||||||
|
dif = dif.with_protocol_type_set(n.outbound_protocols);
|
||||||
|
dif = dif.with_address_type_set(n.address_types);
|
||||||
|
dial_info.matches_filter(&dif)
|
||||||
|
})
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a filter that wraps another filter
|
||||||
|
pub fn combine_filters<F, G>(mut f1: F, mut f2: G) -> impl FnMut(&BucketEntryInner) -> bool
|
||||||
|
where
|
||||||
|
F: FnMut(&BucketEntryInner) -> bool,
|
||||||
|
G: FnMut(&BucketEntryInner) -> bool,
|
||||||
|
{
|
||||||
|
move |e| {
|
||||||
|
if !f1(e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if !f2(e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the fastest nodes in the routing table matching an entry filter
|
||||||
|
pub fn find_fast_public_nodes_filtered<F>(
|
||||||
&self,
|
&self,
|
||||||
node_count: usize,
|
node_count: usize,
|
||||||
dial_info_filter: &DialInfoFilter,
|
mut entry_filter: F,
|
||||||
) -> Vec<NodeRef> {
|
) -> Vec<NodeRef>
|
||||||
let dial_info_filter1 = dial_info_filter.clone();
|
where
|
||||||
|
F: FnMut(&BucketEntryInner) -> bool,
|
||||||
|
{
|
||||||
self.find_fastest_nodes(
|
self.find_fastest_nodes(
|
||||||
// count
|
// count
|
||||||
node_count,
|
node_count,
|
||||||
@ -26,25 +75,13 @@ impl RoutingTable {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// does it have matching public dial info?
|
// skip nodes that dont match entry filter
|
||||||
e.node_info()
|
entry_filter(e)
|
||||||
.map(|n| {
|
|
||||||
n.first_filtered_dial_info_detail(|did| {
|
|
||||||
did.matches_filter(&dial_info_filter1)
|
|
||||||
})
|
|
||||||
.is_some()
|
|
||||||
})
|
|
||||||
.unwrap_or(false)
|
|
||||||
})
|
})
|
||||||
}),
|
}),
|
||||||
// transform
|
// transform
|
||||||
|k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
|k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||||
NodeRef::new(
|
NodeRef::new(self.clone(), k, v.unwrap().clone(), None)
|
||||||
self.clone(),
|
|
||||||
k,
|
|
||||||
v.unwrap().clone(),
|
|
||||||
Some(dial_info_filter.clone()),
|
|
||||||
)
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -80,57 +80,41 @@ impl RPCProcessor {
|
|||||||
// Use the address type though, to ensure we reach an ipv6 capable node if this is
|
// Use the address type though, to ensure we reach an ipv6 capable node if this is
|
||||||
// an ipv6 address
|
// an ipv6 address
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let filter = DialInfoFilter::global().with_address_type(dial_info.address_type());
|
|
||||||
let sender_id = msg.header.envelope.get_sender_id();
|
let sender_id = msg.header.envelope.get_sender_id();
|
||||||
let node_count = {
|
let node_count = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
c.network.dht.max_find_node_count as usize
|
c.network.dht.max_find_node_count as usize
|
||||||
};
|
};
|
||||||
let peers = routing_table.find_fast_public_nodes_filtered(node_count, &filter);
|
|
||||||
|
// Filter on nodes that can validate dial info, and can reach a specific dial info
|
||||||
|
let outbound_dial_info_entry_filter =
|
||||||
|
RoutingTable::make_outbound_dial_info_entry_filter(dial_info.clone());
|
||||||
|
let will_validate_dial_info_filter = |e: &BucketEntryInner| {
|
||||||
|
if let Some(status) = &e.peer_stats().status {
|
||||||
|
status.will_validate_dial_info
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let filter = RoutingTable::combine_filters(
|
||||||
|
outbound_dial_info_entry_filter,
|
||||||
|
will_validate_dial_info_filter,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Find nodes matching filter to redirect this to
|
||||||
|
let peers = routing_table.find_fast_public_nodes_filtered(node_count, filter);
|
||||||
if peers.is_empty() {
|
if peers.is_empty() {
|
||||||
return Err(RPCError::internal(format!(
|
return Err(RPCError::internal(format!(
|
||||||
"no peers matching filter '{:?}'",
|
"no peers able to reach dialinfo '{:?}'",
|
||||||
filter
|
dial_info
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
for mut peer in peers {
|
for peer in peers {
|
||||||
// Ensure the peer is not the one asking for the validation
|
// Ensure the peer is not the one asking for the validation
|
||||||
if peer.node_id() == sender_id {
|
if peer.node_id() == sender_id {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release the filter on the peer because we don't need to send the redirect with the filter
|
|
||||||
// we just wanted to make sure we only selected nodes that were capable of
|
|
||||||
// using the correct protocol for the dial info being validated
|
|
||||||
peer.set_filter(None);
|
|
||||||
|
|
||||||
// Ensure the peer's status is known and that it is capable of
|
|
||||||
// making outbound connections for the dial info we want to verify
|
|
||||||
// and if this peer can validate dial info
|
|
||||||
let can_contact_dial_info = peer.operate(|e: &BucketEntryInner| {
|
|
||||||
if let Some(ni) = e.node_info() {
|
|
||||||
ni.outbound_protocols.contains(dial_info.protocol_type())
|
|
||||||
&& ni.can_validate_dial_info()
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if !can_contact_dial_info {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if this peer will validate dial info
|
|
||||||
let will_validate_dial_info = peer.operate(|e: &BucketEntryInner| {
|
|
||||||
if let Some(status) = &e.peer_stats().status {
|
|
||||||
status.will_validate_dial_info
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if !will_validate_dial_info {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a copy of the request, without the redirect flag
|
// Make a copy of the request, without the redirect flag
|
||||||
let validate_dial_info = RPCOperationValidateDialInfo {
|
let validate_dial_info = RPCOperationValidateDialInfo {
|
||||||
dial_info: dial_info.clone(),
|
dial_info: dial_info.clone(),
|
||||||
|
Loading…
Reference in New Issue
Block a user