add support for maintaining AddressType-translation relays

This commit is contained in:
Christien Rioux 2024-04-27 16:41:26 -04:00
parent 19a1e0cf1b
commit b3c7c93f97
5 changed files with 218 additions and 126 deletions

View File

@ -8,6 +8,7 @@ mod low_level_protocol_type;
mod network_class;
mod peer_address;
mod protocol_type;
mod relay_kind;
mod signal_info;
mod socket_address;
@ -23,5 +24,6 @@ pub use low_level_protocol_type::*;
pub use network_class::*;
pub use peer_address::*;
pub use protocol_type::*;
pub use relay_kind::*;
pub use signal_info::*;
pub use socket_address::*;

View File

@ -0,0 +1,13 @@
use super::*;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum RelayKind {
Inbound = 0,
Outbound = 1,
}
impl Default for RelayKind {
fn default() -> Self {
Self::Inbound
}
}

View File

@ -103,7 +103,11 @@ impl RoutingDomainDetailCommon {
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details
}
pub(super) fn clear_dial_info_details(&mut self, address_type: Option<AddressType>, protocol_type: Option<ProtocolType>) {
pub(super) fn clear_dial_info_details(
&mut self,
address_type: Option<AddressType>,
protocol_type: Option<ProtocolType>,
) {
self.dial_info_details.retain_mut(|e| {
let mut remove = true;
if let Some(pt) = protocol_type {
@ -138,41 +142,23 @@ impl RoutingDomainDetailCommon {
VALID_ENVELOPE_VERSIONS.to_vec(),
VALID_CRYPTO_KINDS.to_vec(),
self.capabilities.clone(),
self.dial_info_details.clone()
self.dial_info_details.clone(),
);
// Check if any of our dialinfo require a relay for signaling
// FullConeNAT requires a relay but it does not have to be published because it does not require signaling
let mut publish_relay = node_info.network_class().inbound_wants_relay() || node_info.network_class().outbound_wants_relay();
if !publish_relay {
// Check the dialinfo to see if they might want to publish a relay for signalling specifically
for did in self.dial_info_details() {
if did.class.requires_signal() {
publish_relay = true;
break;
}
}
}
let relay_info = if publish_relay {
self
.relay_node
.as_ref()
.and_then(|rn| {
let opt_relay_pi = rn.locked(rti).make_peer_info(self.routing_domain);
if let Some(relay_pi) = opt_relay_pi {
let (relay_ids, relay_sni) = relay_pi.destructure();
match relay_sni {
SignedNodeInfo::Direct(d) => Some((relay_ids, d)),
SignedNodeInfo::Relayed(_) => {
warn!("relay node should not have a relay itself! if this happens, a relay updated its signed node info and became a relay, which should cause the relay to be dropped");
None
},
let relay_info = if let Some(rn) = &self.relay_node {
let opt_relay_pi = rn.locked(rti).make_peer_info(self.routing_domain);
if let Some(relay_pi) = opt_relay_pi {
let (relay_ids, relay_sni) = relay_pi.destructure();
match relay_sni {
SignedNodeInfo::Direct(d) => Some((relay_ids, d)),
SignedNodeInfo::Relayed(_) => {
warn!("relay node should not have a relay itself! if this happens, a relay updated its signed node info and became a relay, which should cause the relay to be dropped");
None
}
} else {
None
}
})
} else {
None
}
} else {
None
};
@ -184,7 +170,7 @@ impl RoutingDomainDetailCommon {
rti.unlocked_inner.node_id_typed_key_pairs(),
node_info,
relay_ids,
relay_sdni,
relay_sdni,
)
.unwrap(),
),
@ -194,7 +180,7 @@ impl RoutingDomainDetailCommon {
rti.unlocked_inner.node_id_typed_key_pairs(),
node_info,
)
.unwrap()
.unwrap(),
),
};
@ -277,9 +263,8 @@ fn first_filtered_dial_info_detail_between_nodes(
to_node: &NodeInfo,
dial_info_filter: &DialInfoFilter,
sequencing: Sequencing,
dif_sort: Option<Arc<DialInfoDetailSort>>
dif_sort: Option<Arc<DialInfoDetailSort>>,
) -> Option<DialInfoDetail> {
// Consider outbound capabilities
let dial_info_filter = (*dial_info_filter).filtered(
&DialInfoFilter::all()
@ -289,23 +274,25 @@ fn first_filtered_dial_info_detail_between_nodes(
// Apply sequencing and get sort
// Include sorting by external dial info sort for rotating through dialinfo
// based on an external preference table, for example the one kept by
// based on an external preference table, for example the one kept by
// AddressFilter to deprioritize dialinfo that have recently failed to connect
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
let sort: Option<Box<DialInfoDetailSort>> = if ordered {
if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a, b| {
let mut ord = dif_sort(a,b);
let mut ord = dif_sort(a, b);
if ord == core::cmp::Ordering::Equal {
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
ord = DialInfoDetail::ordered_sequencing_sort(a, b);
}
ord
}))
} else {
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
Some(Box::new(move |a, b| {
DialInfoDetail::ordered_sequencing_sort(a, b)
}))
}
} else if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a,b| { dif_sort(a,b) }))
Some(Box::new(move |a, b| dif_sort(a, b)))
} else {
None
};
@ -354,9 +341,13 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
let node_b_id = peer_b.node_ids().get(best_ck).unwrap();
// Get the best match dial info for node B if we have it
if let Some(target_did) =
first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort.clone())
{
if let Some(target_did) = first_filtered_dial_info_detail_between_nodes(
node_a,
node_b,
&dial_info_filter,
sequencing,
dif_sort.clone(),
) {
// Do we need to signal before going inbound?
if !target_did.class.requires_signal() {
// Go direct without signaling
@ -365,16 +356,20 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
// Get the target's inbound relay, it must have one or it is not reachable
if let Some(node_b_relay) = peer_b.signed_node_info().relay_info() {
// Note that relay_peer_info could be node_a, in which case a connection already exists
// and we only get here if the connection had dropped, in which case node_a is unreachable until
// it gets a new relay connection up
if peer_b.signed_node_info().relay_ids().contains_any(peer_a.node_ids()) {
if peer_b
.signed_node_info()
.relay_ids()
.contains_any(peer_a.node_ids())
{
return ContactMethod::Existing;
}
// Get best node id to contact relay with
let Some(node_b_relay_id) = peer_b.signed_node_info().relay_ids().get(best_ck) else {
let Some(node_b_relay_id) = peer_b.signed_node_info().relay_ids().get(best_ck)
else {
// No best relay id
return ContactMethod::Unreachable;
};
@ -399,12 +394,10 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_a,
&dial_info_filter,
sequencing,
dif_sort.clone()
dif_sort.clone(),
) {
// Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_did.dial_info.ip_addr()
!= target_did.dial_info.ip_addr()
{
if reverse_did.dial_info.ip_addr() != target_did.dial_info.ip_addr() {
// Can we receive a direct reverse connection?
if !reverse_did.class.requires_signal() {
return ContactMethod::SignalReverse(
@ -425,16 +418,18 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_b,
&udp_dial_info_filter,
sequencing,
dif_sort.clone()
dif_sort.clone(),
) {
// Does node A have a direct udp dialinfo that node B can reach?
if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes(
node_b,
node_a,
&udp_dial_info_filter,
sequencing,
dif_sort.clone(),
) {
if let Some(reverse_udp_did) =
first_filtered_dial_info_detail_between_nodes(
node_b,
node_a,
&udp_dial_info_filter,
sequencing,
dif_sort.clone(),
)
{
// Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_udp_did.dial_info.ip_addr()
!= target_udp_did.dial_info.ip_addr()
@ -456,11 +451,14 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
}
// If the node B has no direct dial info, it needs to have an inbound relay
else if let Some(node_b_relay) = peer_b.signed_node_info().relay_info() {
// Note that relay_peer_info could be node_a, in which case a connection already exists
// and we only get here if the connection had dropped, in which case node_b is unreachable until
// it gets a new relay connection up
if peer_b.signed_node_info().relay_ids().contains_any(peer_a.node_ids()) {
if peer_b
.signed_node_info()
.relay_ids()
.contains_any(peer_a.node_ids())
{
return ContactMethod::Existing;
}
@ -476,7 +474,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_b_relay,
&dial_info_filter,
sequencing,
dif_sort.clone()
dif_sort.clone(),
)
.is_some()
{
@ -488,14 +486,11 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_a,
&dial_info_filter,
sequencing,
dif_sort.clone()
dif_sort.clone(),
) {
// Can we receive a direct reverse connection?
if !reverse_did.class.requires_signal() {
return ContactMethod::SignalReverse(
node_b_relay_id,
node_b_id,
);
return ContactMethod::SignalReverse(node_b_relay_id, node_b_id);
}
}
@ -504,7 +499,12 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
}
// If node A can't reach the node by other means, it may need to use its outbound relay
if peer_a.signed_node_info().node_info().network_class().outbound_wants_relay() {
if peer_a
.signed_node_info()
.node_info()
.network_class()
.outbound_wants_relay()
{
if let Some(node_a_relay_id) = peer_a.signed_node_info().relay_ids().get(best_ck) {
// Ensure it's not our relay we're trying to reach
if node_a_relay_id != node_b_id {
@ -573,7 +573,6 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
sequencing: Sequencing,
dif_sort: Option<Arc<DialInfoDetailSort>>,
) -> ContactMethod {
// Get the nodeinfos for convenience
let node_a = peer_a.signed_node_info().node_info();
let node_b = peer_b.signed_node_info().node_info();
@ -585,10 +584,16 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
return ContactMethod::Unreachable;
};
if let Some(target_did) = first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort) {
if let Some(target_did) = first_filtered_dial_info_detail_between_nodes(
node_a,
node_b,
&dial_info_filter,
sequencing,
dif_sort,
) {
return ContactMethod::Direct(target_did.dial_info);
}
ContactMethod::Unreachable
}
}

View File

@ -1,6 +1,51 @@
use super::*;
impl RoutingTable {
// Check if a relay is desired or not
fn public_internet_wants_relay(&self) -> Option<RelayKind> {
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
let own_node_info = own_peer_info.signed_node_info().node_info();
let network_class = own_node_info.network_class();
// Never give a relay to something with an invalid network class
if matches!(network_class, NetworkClass::Invalid) {
return None;
}
// If we -need- a relay always request one
if let Some(rk) = own_node_info.requires_relay() {
return Some(rk);
}
// If we don't always need a relay, but we don't have support for
// all the address types then we should request one anyway
let mut address_types = AddressTypeSet::empty();
for did in own_node_info.dial_info_detail_list() {
address_types |= did.dial_info.address_type();
}
if address_types != AddressTypeSet::all() {
return Some(RelayKind::Inbound);
}
// If we are behind some NAT, then we should get ourselves a relay just
// in case we need to navigate hairpin NAT to our own network
let mut inbound_addresses = HashSet::<SocketAddr>::new();
for did in own_node_info.dial_info_detail_list() {
inbound_addresses.insert(did.dial_info.to_socket_addr());
}
let own_local_peer_info = self.get_own_peer_info(RoutingDomain::LocalNetwork);
let own_local_node_info = own_local_peer_info.signed_node_info().node_info();
for ldid in own_local_node_info.dial_info_detail_list() {
inbound_addresses.remove(&ldid.dial_info.to_socket_addr());
}
if !inbound_addresses.is_empty() {
return Some(RelayKind::Inbound);
}
// No relay is desired
None
}
// Keep relays assigned and accessible
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn relay_management_task_routine(
@ -9,10 +54,8 @@ impl RoutingTable {
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
let own_node_info = own_peer_info.signed_node_info().node_info();
let network_class = own_node_info.network_class();
let relay_node_filter = self.make_public_internet_relay_node_filter();
let relay_desired = self.public_internet_wants_relay();
// Get routing domain editor
let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet);
@ -36,19 +79,10 @@ impl RoutingTable {
editor.clear_relay_node();
false
}
// Relay node is no longer required
else if !own_node_info.requires_relay() {
// Relay node is no longer wanted
else if relay_desired.is_none() {
log_rtab!(debug
"Relay node no longer required, dropping relay {}",
relay_node
);
editor.clear_relay_node();
false
}
// Should not have relay for invalid network class
else if !self.has_valid_network_class(RoutingDomain::PublicInternet) {
log_rtab!(debug
"Invalid network class does not get a relay, dropping relay {}",
"Relay node no longer desired, dropping relay {}",
relay_node
);
editor.clear_relay_node();
@ -61,11 +95,13 @@ impl RoutingTable {
}
};
// Do we need a relay?
if !has_relay && own_node_info.requires_relay() {
// Do we want a relay?
if !has_relay && relay_desired.is_some() {
let relay_desired = relay_desired.unwrap();
// Do we want an outbound relay?
let mut got_outbound_relay = false;
if network_class.outbound_wants_relay() {
if matches!(relay_desired, RelayKind::Outbound) {
// The outbound relay is the host of the PWA
if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await {
// Register new outbound relay
@ -89,7 +125,11 @@ impl RoutingTable {
}
if !got_outbound_relay {
// Find a node in our routing table that is an acceptable inbound relay
if let Some(nr) = self.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) {
if let Some(nr) = self.find_inbound_relay(
RoutingDomain::PublicInternet,
cur_ts,
relay_node_filter,
) {
log_rtab!(debug "Inbound relay node selected: {}", nr);
editor.set_relay_node(nr);
}
@ -107,60 +147,92 @@ impl RoutingTable {
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let mapped_port_info = self.get_low_level_port_info();
let ip6_prefix_size = self
.unlocked_inner
.config
.get()
.network
.max_connections_per_ip6_prefix_size as usize;
let our_ip_blocks = self
.get_own_peer_info(RoutingDomain::PublicInternet)
.signed_node_info()
.node_info()
.dial_info_detail_list()
.iter()
.map(|did| ip_to_ipblock(ip6_prefix_size, did.dial_info.to_socket_addr().ip()))
.collect::<HashSet<_>>();
move |e: &BucketEntryInner| {
// Ensure this node is not on the local network
// Ensure this node is not on the local network and is on the public internet
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
return false;
}
let Some(node_info) = e.node_info(RoutingDomain::PublicInternet) else {
return false;
};
// Disqualify nodes that don't have relay capability or require a relay themselves
if !(node_info.has_capability(CAP_RELAY) && node_info.is_fully_direct_inbound()) {
// Needs to be able to accept packets to relay directly
return false;
}
// Disqualify nodes that don't cover all our inbound ports for tcp and udp
// as we need to be able to use the relay for keepalives for all nat mappings
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
let dids = node_info.all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| {
did.matches_filter(&outbound_dif)
});
for did in &dids {
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) {
low_level_protocol_ports.remove(&(*llpt, at, *port));
}
}
if !low_level_protocol_ports.is_empty() {
return false;
}
let can_serve_as_relay = e
.node_info(RoutingDomain::PublicInternet)
.map(|n| {
if !(n.has_capability(CAP_RELAY) && n.is_fully_direct_inbound()) {
// Needs to be able to accept packets to relay directly
// For all protocol types we could connect to the relay by, ensure the relay supports all address types
let mut address_type_mappings = HashMap::<ProtocolType, AddressTypeSet>::new();
let dids = node_info.dial_info_detail_list();
for did in dids {
address_type_mappings
.entry(did.dial_info.protocol_type())
.and_modify(|x| {
x.insert(did.dial_info.address_type());
})
.or_insert_with(|| did.dial_info.address_type().into());
}
for pt in outbound_dif.protocol_type_set.iter() {
if let Some(ats) = address_type_mappings.get(&pt) {
if *ats != AddressTypeSet::all() {
return false;
}
}
}
let dids = n.all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| {
did.matches_filter(&outbound_dif)
});
for did in &dids {
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at))
{
low_level_protocol_ports.remove(&(*llpt, at, *port));
}
}
low_level_protocol_ports.is_empty()
})
.unwrap_or(false);
if !can_serve_as_relay {
return false;
// Exclude any nodes that have our same network block
for did in dids {
let ipblock = ip_to_ipblock(ip6_prefix_size, did.dial_info.to_socket_addr().ip());
if our_ip_blocks.contains(&ipblock) {
return false;
}
}
true
}
}
#[instrument(level = "trace", skip(self), ret)]
#[instrument(level = "trace", skip(self, relay_node_filter), ret)]
pub fn find_inbound_relay(
&self,
routing_domain: RoutingDomain,
cur_ts: Timestamp,
relay_node_filter: impl Fn(&BucketEntryInner) -> bool,
) -> Option<NodeRef> {
// Get relay filter function
let relay_node_filter = match routing_domain {
RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(),
RoutingDomain::LocalNetwork => {
unimplemented!();
}
};
// Go through all entries and find fastest entry that matches filter function
let inner = self.inner.read();
let inner = &*inner;

View File

@ -129,24 +129,24 @@ impl NodeInfo {
}
/// Is some relay required either for signal or inbound relay or outbound relay?
pub fn requires_relay(&self) -> bool {
pub fn requires_relay(&self) -> Option<RelayKind> {
match self.network_class {
NetworkClass::InboundCapable => {
for did in &self.dial_info_detail_list {
if did.class.requires_relay() {
return true;
return Some(RelayKind::Inbound);
}
}
}
NetworkClass::OutboundOnly => {
return true;
return Some(RelayKind::Inbound);
}
NetworkClass::WebApp => {
return true;
return Some(RelayKind::Outbound);
}
NetworkClass::Invalid => {}
}
false
None
}
pub fn has_capability(&self, cap: Capability) -> bool {