mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-02-22 07:29:57 -05:00
Merge branch 'relaying-cleanup' into 'main'
Bugfixes See merge request veilid/veilid!351
This commit is contained in:
commit
18a846910c
@ -334,3 +334,18 @@ macro_rules! impl_subscribe_event_bus {
|
||||
}
|
||||
|
||||
pub(crate) use impl_subscribe_event_bus;
|
||||
|
||||
// macro_rules! impl_subscribe_event_bus_async {
|
||||
// ($this:expr, $this_type:ty, $event_handler:ident ) => {{
|
||||
// let registry = $this.registry();
|
||||
// $this.event_bus().subscribe(move |evt| {
|
||||
// let registry = registry.clone();
|
||||
// Box::pin(async move {
|
||||
// let this = registry.lookup::<$this_type>().unwrap();
|
||||
// this.$event_handler(evt).await;
|
||||
// })
|
||||
// })
|
||||
// }};
|
||||
// }
|
||||
|
||||
// pub(crate) use impl_subscribe_event_bus_async;
|
||||
|
@ -37,7 +37,7 @@ struct AddressCheckCacheKey(RoutingDomain, ProtocolType, AddressType);
|
||||
pub struct AddressCheck {
|
||||
config: AddressCheckConfig,
|
||||
net: Network,
|
||||
current_network_class: BTreeMap<RoutingDomain, NetworkClass>,
|
||||
published_peer_info: BTreeMap<RoutingDomain, Arc<PeerInfo>>,
|
||||
current_addresses: BTreeMap<AddressCheckCacheKey, HashSet<SocketAddress>>,
|
||||
// Used by InboundCapable to determine if we have changed our address or re-do our network class
|
||||
address_inconsistency_table: BTreeMap<AddressCheckCacheKey, usize>,
|
||||
@ -50,7 +50,7 @@ impl fmt::Debug for AddressCheck {
|
||||
f.debug_struct("AddressCheck")
|
||||
.field("config", &self.config)
|
||||
//.field("net", &self.net)
|
||||
.field("current_network_class", &self.current_network_class)
|
||||
.field("current_peer_info", &self.published_peer_info)
|
||||
.field("current_addresses", &self.current_addresses)
|
||||
.field(
|
||||
"address_inconsistency_table",
|
||||
@ -66,20 +66,19 @@ impl AddressCheck {
|
||||
Self {
|
||||
config,
|
||||
net,
|
||||
current_network_class: BTreeMap::new(),
|
||||
published_peer_info: BTreeMap::new(),
|
||||
current_addresses: BTreeMap::new(),
|
||||
address_inconsistency_table: BTreeMap::new(),
|
||||
address_consistency_table: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept a report of any peerinfo that has changed
|
||||
pub fn report_peer_info_change(&mut self, peer_info: Arc<PeerInfo>) {
|
||||
let routing_domain = peer_info.routing_domain();
|
||||
let network_class = peer_info.signed_node_info().node_info().network_class();
|
||||
|
||||
self.current_network_class
|
||||
.insert(routing_domain, network_class);
|
||||
/// Accept a report of any peerinfo that has been -published-
|
||||
pub fn report_peer_info_change(
|
||||
&mut self,
|
||||
routing_domain: RoutingDomain,
|
||||
opt_peer_info: Option<Arc<PeerInfo>>,
|
||||
) {
|
||||
for protocol_type in ProtocolTypeSet::all() {
|
||||
for address_type in AddressTypeSet::all() {
|
||||
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
|
||||
@ -93,29 +92,36 @@ impl AddressCheck {
|
||||
}
|
||||
}
|
||||
|
||||
for did in peer_info
|
||||
.signed_node_info()
|
||||
.node_info()
|
||||
.dial_info_detail_list()
|
||||
{
|
||||
// Strip port from direct and mapped addresses
|
||||
// as the incoming dialinfo may not match the outbound
|
||||
// connections' NAT mapping. In this case we only check for IP address changes.
|
||||
let socket_address =
|
||||
if did.class == DialInfoClass::Direct || did.class == DialInfoClass::Mapped {
|
||||
did.dial_info.socket_address().with_port(0)
|
||||
} else {
|
||||
did.dial_info.socket_address()
|
||||
};
|
||||
if let Some(peer_info) = opt_peer_info {
|
||||
self.published_peer_info
|
||||
.insert(routing_domain, peer_info.clone());
|
||||
|
||||
let address_type = did.dial_info.address_type();
|
||||
let protocol_type = did.dial_info.protocol_type();
|
||||
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
|
||||
for did in peer_info
|
||||
.signed_node_info()
|
||||
.node_info()
|
||||
.dial_info_detail_list()
|
||||
{
|
||||
// Strip port from direct and mapped addresses
|
||||
// as the incoming dialinfo may not match the outbound
|
||||
// connections' NAT mapping. In this case we only check for IP address changes.
|
||||
let socket_address =
|
||||
if did.class == DialInfoClass::Direct || did.class == DialInfoClass::Mapped {
|
||||
did.dial_info.socket_address().with_port(0)
|
||||
} else {
|
||||
did.dial_info.socket_address()
|
||||
};
|
||||
|
||||
self.current_addresses
|
||||
.entry(acck)
|
||||
.or_default()
|
||||
.insert(socket_address);
|
||||
let address_type = did.dial_info.address_type();
|
||||
let protocol_type = did.dial_info.protocol_type();
|
||||
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
|
||||
|
||||
self.current_addresses
|
||||
.entry(acck)
|
||||
.or_default()
|
||||
.insert(socket_address);
|
||||
}
|
||||
} else {
|
||||
self.published_peer_info.remove(&routing_domain);
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,16 +135,16 @@ impl AddressCheck {
|
||||
flow: Flow, // the flow used
|
||||
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
|
||||
) {
|
||||
// Don't accept any reports if we're already in the middle of a public dial info check
|
||||
if self.net.needs_public_dial_info_check() {
|
||||
// Only process the PublicInternet RoutingDomain for now
|
||||
if !matches!(routing_domain, RoutingDomain::PublicInternet) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore the LocalNetwork routing domain because we know if our local addresses change
|
||||
// from our interfaces
|
||||
if matches!(routing_domain, RoutingDomain::LocalNetwork) {
|
||||
// Get the routing table and published peer info
|
||||
// If the peer info has invalid network class or is unconfirmed or unpublished this will return
|
||||
let Some(peer_info) = self.published_peer_info.get(&routing_domain).cloned() else {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Ignore flows that do not start from our listening port (unbound connections etc),
|
||||
// because a router is going to map these differently
|
||||
@ -177,15 +183,10 @@ impl AddressCheck {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get current network class / dial info
|
||||
// If we haven't gotten our own network class yet we're done for now
|
||||
let Some(network_class) = self.current_network_class.get(&routing_domain) else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Process the state of the address checker and see if we need to
|
||||
// perform a full address check for this routing domain
|
||||
let needs_address_detection = match network_class {
|
||||
let needs_address_detection = match peer_info.signed_node_info().node_info().network_class()
|
||||
{
|
||||
NetworkClass::InboundCapable => self.detect_for_inbound_capable(
|
||||
routing_domain,
|
||||
socket_address,
|
||||
@ -213,7 +214,7 @@ impl AddressCheck {
|
||||
);
|
||||
|
||||
// Re-detect the public dialinfo
|
||||
self.net.set_needs_public_dial_info_check(None);
|
||||
self.net.trigger_update_network_class(routing_domain);
|
||||
} else {
|
||||
warn!(
|
||||
"{:?} address may have changed. Restarting the server may be required.",
|
||||
|
@ -268,6 +268,7 @@ impl AddressFilter {
|
||||
inner.punishments_by_ip4.clear();
|
||||
inner.punishments_by_ip6_prefix.clear();
|
||||
inner.punishments_by_node_id.clear();
|
||||
inner.dial_info_failures.clear();
|
||||
|
||||
self.routing_table().clear_punishments();
|
||||
}
|
||||
|
@ -1184,7 +1184,7 @@ impl NetworkManager {
|
||||
fn peer_info_change_event_handler(&self, evt: Arc<PeerInfoChangeEvent>) {
|
||||
let mut inner = self.inner.lock();
|
||||
if let Some(address_check) = inner.address_check.as_mut() {
|
||||
address_check.report_peer_info_change(evt.peer_info.clone());
|
||||
address_check.report_peer_info_change(evt.routing_domain, evt.opt_peer_info.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,7 +262,9 @@ impl DiscoveryContext {
|
||||
}
|
||||
}
|
||||
if external_address_infos.len() < EXTERNAL_INFO_VALIDATIONS {
|
||||
log_net!(debug "not enough peers responded with an external address for type {:?}:{:?}",
|
||||
log_net!(debug "not enough peers ({}<{}) responded with an external address for type {:?}:{:?}",
|
||||
external_address_infos.len(),
|
||||
EXTERNAL_INFO_VALIDATIONS,
|
||||
protocol_type,
|
||||
address_type);
|
||||
return false;
|
||||
|
@ -87,16 +87,14 @@ struct NetworkInner {
|
||||
/// the number of consecutive dial info failures per routing domain,
|
||||
/// which may indicate the network is down for that domain
|
||||
dial_info_failure_count: BTreeMap<RoutingDomain, usize>,
|
||||
/// if we need to redo the publicinternet network class
|
||||
needs_update_network_class: bool,
|
||||
/// the next time we are allowed to check for better dialinfo when we are OutboundOnly
|
||||
next_outbound_only_dial_info_check: Timestamp,
|
||||
/// join handles for all the low level network background tasks
|
||||
join_handles: Vec<MustJoinHandle<()>>,
|
||||
/// stop source for shutting down the low level network background tasks
|
||||
stop_source: Option<StopSource>,
|
||||
/// set if we need to calculate our public dial info again
|
||||
needs_public_dial_info_check: bool,
|
||||
/// the punishment closure to enax
|
||||
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
||||
/// Actual bound addresses per protocol
|
||||
bound_address_per_protocol: BTreeMap<ProtocolType, Vec<SocketAddr>>,
|
||||
/// mapping of protocol handlers to accept messages from a set of bound socket addresses
|
||||
@ -156,9 +154,8 @@ impl Network {
|
||||
NetworkInner {
|
||||
network_needs_restart: false,
|
||||
dial_info_failure_count: BTreeMap::new(),
|
||||
needs_update_network_class: false,
|
||||
next_outbound_only_dial_info_check: Timestamp::default(),
|
||||
needs_public_dial_info_check: false,
|
||||
public_dial_info_check_punishment: None,
|
||||
join_handles: Vec::new(),
|
||||
stop_source: None,
|
||||
bound_address_per_protocol: BTreeMap::new(),
|
||||
@ -766,13 +763,16 @@ impl Network {
|
||||
network_state.protocol_config.inbound,
|
||||
network_state.protocol_config.family_local,
|
||||
network_state.protocol_config.local_network_capabilities,
|
||||
true,
|
||||
);
|
||||
|
||||
let confirmed_public_internet = !self.config().with(|c| c.network.detect_address_changes);
|
||||
editor_public_internet.setup_network(
|
||||
network_state.protocol_config.outbound,
|
||||
network_state.protocol_config.inbound,
|
||||
network_state.protocol_config.family_global,
|
||||
network_state.protocol_config.public_internet_capabilities,
|
||||
confirmed_public_internet,
|
||||
);
|
||||
|
||||
// Start listeners
|
||||
@ -829,9 +829,9 @@ impl Network {
|
||||
editor_local_network.publish();
|
||||
}
|
||||
|
||||
if self.config().with(|c| c.network.detect_address_changes) {
|
||||
// Say we need to detect the public dialinfo
|
||||
self.set_needs_public_dial_info_check(None);
|
||||
if !confirmed_public_internet {
|
||||
// Update public internet network class if we haven't confirmed it
|
||||
self.trigger_update_network_class(RoutingDomain::PublicInternet);
|
||||
} else {
|
||||
// Warn if we have no public dialinfo, because we're not going to magically find some
|
||||
// with detect address changes turned off
|
||||
@ -968,38 +968,25 @@ impl Network {
|
||||
}
|
||||
|
||||
//////////////////////////////////////////
|
||||
// pub fn set_needs_dial_info_check(&self, routing_domain: RoutingDomain) {
|
||||
// match routing_domain {
|
||||
// RoutingDomain::LocalNetwork => {
|
||||
// // nothing here yet
|
||||
// }
|
||||
// RoutingDomain::PublicInternet => {
|
||||
// self.set_needs_public_dial_info_check(None);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
pub fn set_needs_public_dial_info_check(
|
||||
&self,
|
||||
punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
||||
) {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
log_net!(debug "ignoring due to not started up");
|
||||
return;
|
||||
};
|
||||
let mut inner = self.inner.lock();
|
||||
inner.needs_public_dial_info_check = true;
|
||||
inner.public_dial_info_check_punishment = punishment;
|
||||
}
|
||||
|
||||
pub fn needs_public_dial_info_check(&self) -> bool {
|
||||
pub fn needs_update_network_class(&self) -> bool {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
log_net!(debug "ignoring due to not started up");
|
||||
return false;
|
||||
};
|
||||
let inner = self.inner.lock();
|
||||
inner.needs_public_dial_info_check
|
||||
|
||||
self.inner.lock().needs_update_network_class
|
||||
}
|
||||
|
||||
//////////////////////////////////////////
|
||||
pub fn trigger_update_network_class(&self, routing_domain: RoutingDomain) {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
log_net!(debug "ignoring due to not started up");
|
||||
return;
|
||||
};
|
||||
|
||||
if !matches!(routing_domain, RoutingDomain::PublicInternet) {
|
||||
return;
|
||||
}
|
||||
self.inner.lock().needs_update_network_class = true;
|
||||
}
|
||||
}
|
||||
|
@ -40,13 +40,13 @@ impl Network {
|
||||
}
|
||||
|
||||
// Determine if we need to check for public dialinfo
|
||||
fn needs_update_network_class_tick(&self) -> bool {
|
||||
fn wants_update_network_class_tick(&self) -> bool {
|
||||
let public_internet_network_class = self
|
||||
.routing_table()
|
||||
.get_network_class(RoutingDomain::PublicInternet);
|
||||
let needs_public_dial_info_check = self.needs_public_dial_info_check();
|
||||
|
||||
if needs_public_dial_info_check
|
||||
let needs_update_network_class = self.needs_update_network_class();
|
||||
if needs_update_network_class
|
||||
|| public_internet_network_class == NetworkClass::Invalid
|
||||
|| (public_internet_network_class == NetworkClass::OutboundOnly
|
||||
&& self.inner.lock().next_outbound_only_dial_info_check <= Timestamp::now())
|
||||
@ -98,7 +98,7 @@ impl Network {
|
||||
// Check our network interfaces to see if they have changed
|
||||
self.network_interfaces_task.tick().await?;
|
||||
|
||||
if self.needs_update_network_class_tick() {
|
||||
if self.wants_update_network_class_tick() {
|
||||
self.update_network_class_task.tick().await?;
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ impl Network {
|
||||
|
||||
// If any of the new addresses were PublicInternet addresses, re-run public dial info check
|
||||
if public_internet_changed {
|
||||
self.set_needs_public_dial_info_check(None);
|
||||
self.trigger_update_network_class(RoutingDomain::PublicInternet);
|
||||
}
|
||||
|
||||
Ok(local_network_changed || public_internet_changed)
|
||||
|
@ -25,9 +25,6 @@ impl Network {
|
||||
if finished {
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
// Note that we did the check successfully
|
||||
inner.needs_public_dial_info_check = false;
|
||||
|
||||
// Don't try to re-do OutboundOnly dialinfo for another 10 seconds
|
||||
inner.next_outbound_only_dial_info_check = Timestamp::now()
|
||||
+ TimestampDuration::new_secs(UPDATE_OUTBOUND_ONLY_NETWORK_CLASS_PERIOD_SECS)
|
||||
@ -133,14 +130,6 @@ impl Network {
|
||||
// Save off existing public dial info for change detection later
|
||||
let routing_table = self.routing_table();
|
||||
|
||||
let existing_public_dial_info: HashSet<DialInfoDetail> = routing_table
|
||||
.all_filtered_dial_info_details(
|
||||
RoutingDomain::PublicInternet.into(),
|
||||
&DialInfoFilter::all(),
|
||||
)
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// Set most permissive network config and start from scratch
|
||||
let mut editor = routing_table.edit_public_internet_routing_domain();
|
||||
editor.setup_network(
|
||||
@ -148,6 +137,7 @@ impl Network {
|
||||
protocol_config.inbound,
|
||||
protocol_config.family_global,
|
||||
protocol_config.public_internet_capabilities.clone(),
|
||||
false,
|
||||
);
|
||||
editor.clear_dial_info_details(None, None);
|
||||
editor.commit(true).await;
|
||||
@ -233,6 +223,12 @@ impl Network {
|
||||
self.update_with_detection_result(&mut editor, &inbound_protocol_map, dr);
|
||||
}
|
||||
|
||||
// If we got no external address types, try again
|
||||
if external_address_types.is_empty() {
|
||||
log_net!(debug "Network class discovery failed, trying again, got no external address types");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// See if we have any discovery contexts that did not complete for a
|
||||
// particular protocol type if its external address type was supported.
|
||||
let mut success = true;
|
||||
@ -251,33 +247,20 @@ impl Network {
|
||||
// All done
|
||||
log_net!(debug "Network class discovery finished with address_types {:?}", external_address_types);
|
||||
|
||||
// Set the address types we've seen
|
||||
// Set the address types we've seen and confirm the network class
|
||||
editor.setup_network(
|
||||
protocol_config.outbound,
|
||||
protocol_config.inbound,
|
||||
external_address_types,
|
||||
protocol_config.public_internet_capabilities,
|
||||
true,
|
||||
);
|
||||
if editor.commit(true).await {
|
||||
editor.publish();
|
||||
}
|
||||
|
||||
// See if the dial info changed
|
||||
let new_public_dial_info: HashSet<DialInfoDetail> = self
|
||||
.routing_table()
|
||||
.all_filtered_dial_info_details(
|
||||
RoutingDomain::PublicInternet.into(),
|
||||
&DialInfoFilter::all(),
|
||||
)
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// Punish nodes that told us our public address had changed when it didn't
|
||||
if new_public_dial_info == existing_public_dial_info {
|
||||
if let Some(punish) = self.inner.lock().public_dial_info_check_punishment.take() {
|
||||
punish();
|
||||
}
|
||||
}
|
||||
// Say we no longer need an update
|
||||
self.inner.lock().needs_update_network_class = false;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
@ -589,8 +589,9 @@ impl NetworkManager {
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
let mut target_node_ref =
|
||||
target_node_ref.filtered_clone(NodeRefFilter::from(dial_info_filter));
|
||||
let mut target_node_ref = target_node_ref.filtered_clone(
|
||||
NodeRefFilter::from(dial_info_filter).with_routing_domain(routing_domain),
|
||||
);
|
||||
if tighten {
|
||||
target_node_ref.set_sequencing(Sequencing::EnsureOrdered);
|
||||
}
|
||||
@ -616,6 +617,7 @@ impl NetworkManager {
|
||||
// but tcp hole punch is very very unreliable it seems
|
||||
let udp_target_node_ref = target_node_ref.filtered_clone(
|
||||
NodeRefFilter::new()
|
||||
.with_routing_domain(routing_domain)
|
||||
.with_dial_info_filter(dial_info_filter)
|
||||
.with_protocol_type(ProtocolType::UDP),
|
||||
);
|
||||
|
@ -1,7 +1,8 @@
|
||||
use super::*;
|
||||
|
||||
pub(crate) struct PeerInfoChangeEvent {
|
||||
pub peer_info: Arc<PeerInfo>,
|
||||
pub routing_domain: RoutingDomain,
|
||||
pub opt_peer_info: Option<Arc<PeerInfo>>,
|
||||
}
|
||||
|
||||
pub(crate) struct SocketAddressChangeEvent {
|
||||
|
@ -2,8 +2,8 @@ use super::*;
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
|
||||
pub(crate) enum NetworkClass {
|
||||
InboundCapable = 0, // I = Inbound capable without relay, may require signal
|
||||
OutboundOnly = 1, // O = Outbound only, inbound relay required except with reverse connect signal
|
||||
InboundCapable = 0, // I = Has inbound-capable dialinfo, including direct or holepunch-able NAT'd dialinfo
|
||||
OutboundOnly = 1, // O = Has no dialinfo but may be reachable via inbound relay or reverse connections.
|
||||
WebApp = 2, // W = PWA, outbound relay is required in most cases
|
||||
Invalid = 3, // X = Invalid network class, we don't know how to reach this node
|
||||
}
|
||||
|
@ -401,6 +401,7 @@ impl Network {
|
||||
protocol_config.inbound,
|
||||
protocol_config.family_global,
|
||||
protocol_config.public_internet_capabilities.clone(),
|
||||
true,
|
||||
);
|
||||
|
||||
// commit routing domain edits
|
||||
@ -481,17 +482,8 @@ impl Network {
|
||||
|
||||
//////////////////////////////////////////
|
||||
|
||||
pub fn set_needs_public_dial_info_check(
|
||||
&self,
|
||||
_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
||||
) {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
log_net!(debug "ignoring due to not started up");
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
pub fn needs_public_dial_info_check(&self) -> bool {
|
||||
#[expect(dead_code)]
|
||||
pub fn needs_update_network_class(&self) -> bool {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
log_net!(debug "ignoring due to not started up");
|
||||
return false;
|
||||
@ -500,6 +492,12 @@ impl Network {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn trigger_update_network_class(&self, _routing_domain: RoutingDomain) {
|
||||
let Ok(_guard) = self.startup_lock.enter() else {
|
||||
log_net!(debug "ignoring due to not started up");
|
||||
return;
|
||||
};
|
||||
}
|
||||
//////////////////////////////////////////
|
||||
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
|
||||
pub async fn tick(&self) -> EyreResult<()> {
|
||||
|
@ -15,6 +15,7 @@ pub trait RoutingDomainEditorCommonTrait {
|
||||
inbound_protocols: ProtocolTypeSet,
|
||||
address_types: AddressTypeSet,
|
||||
capabilities: Vec<Capability>,
|
||||
confirmed: bool,
|
||||
) -> &mut Self;
|
||||
fn commit(&mut self, pause_tasks: bool) -> SendPinBoxFutureLifetime<'_, bool>;
|
||||
fn shutdown(&mut self) -> SendPinBoxFutureLifetime<'_, ()>;
|
||||
@ -59,12 +60,14 @@ impl<T: RoutingDomainDetailCommonAccessors> RoutingDomainDetailApplyCommonChange
|
||||
inbound_protocols,
|
||||
address_types,
|
||||
capabilities,
|
||||
confirmed,
|
||||
} => {
|
||||
self.common_mut().setup_network(
|
||||
outbound_protocols,
|
||||
inbound_protocols,
|
||||
address_types,
|
||||
capabilities.clone(),
|
||||
confirmed,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -92,5 +95,6 @@ pub(super) enum RoutingDomainChangeCommon {
|
||||
inbound_protocols: ProtocolTypeSet,
|
||||
address_types: AddressTypeSet,
|
||||
capabilities: Vec<Capability>,
|
||||
confirmed: bool,
|
||||
},
|
||||
}
|
||||
|
@ -90,6 +90,7 @@ impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorLocalNetwork<'a>
|
||||
inbound_protocols: ProtocolTypeSet,
|
||||
address_types: AddressTypeSet,
|
||||
capabilities: Vec<Capability>,
|
||||
confirmed: bool,
|
||||
) -> &mut Self {
|
||||
self.changes.push(RoutingDomainChangeLocalNetwork::Common(
|
||||
RoutingDomainChangeCommon::SetupNetwork {
|
||||
@ -97,6 +98,7 @@ impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorLocalNetwork<'a>
|
||||
inbound_protocols,
|
||||
address_types,
|
||||
capabilities,
|
||||
confirmed,
|
||||
},
|
||||
));
|
||||
self
|
||||
|
@ -115,39 +115,54 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
|
||||
}
|
||||
|
||||
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool {
|
||||
let peer_info = {
|
||||
let pi = self.get_peer_info(rti);
|
||||
let opt_peer_info = {
|
||||
let opt_new_peer_info = {
|
||||
let pi = self.get_peer_info(rti);
|
||||
|
||||
// If the network class is not yet determined, don't publish
|
||||
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
|
||||
log_rtab!(debug "[LocalNetwork] Not publishing peer info with invalid network class");
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we need a relay and we don't have one, don't publish yet
|
||||
if let Some(_relay_kind) = self.requires_relay() {
|
||||
if pi.signed_node_info().relay_ids().is_empty() {
|
||||
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
|
||||
// If the network class is not yet determined, don't publish
|
||||
log_rtab!(debug "[LocalNetwork] Not publishing peer info with invalid network class");
|
||||
None
|
||||
} else if self.requires_relay().is_some()
|
||||
&& pi.signed_node_info().relay_ids().is_empty()
|
||||
{
|
||||
// If we need a relay and we don't have one, don't publish yet
|
||||
log_rtab!(debug "[LocalNetwork] Not publishing peer info that wants relay until we have a relay");
|
||||
return false;
|
||||
None
|
||||
} else {
|
||||
// This peerinfo is fit to publish
|
||||
Some(pi)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Don't publish if the peer info hasnt changed from our previous publication
|
||||
let mut ppi_lock = self.published_peer_info.lock();
|
||||
if let Some(old_peer_info) = &*ppi_lock {
|
||||
if pi.equivalent(old_peer_info) {
|
||||
log_rtab!(debug "[LocalNetwork] Not publishing peer info because it is equivalent");
|
||||
return false;
|
||||
if let Some(new_peer_info) = &opt_new_peer_info {
|
||||
if new_peer_info.equivalent(old_peer_info) {
|
||||
log_rtab!(debug "[LocalNetwork] Not publishing peer info because it is equivalent");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else if opt_new_peer_info.is_none() {
|
||||
log_rtab!(debug "[LocalNetwork] Not publishing peer info because it is still None");
|
||||
return false;
|
||||
}
|
||||
|
||||
log_rtab!(debug "[LocalNetwork] Published new peer info: {}", pi);
|
||||
*ppi_lock = Some(pi.clone());
|
||||
if opt_new_peer_info.is_some() {
|
||||
log_rtab!(debug "[LocalNetwork] Published new peer info: {}", opt_new_peer_info.as_ref().unwrap());
|
||||
} else {
|
||||
log_rtab!(debug "[LocalNetwork] Unpublishing because current peer info is invalid");
|
||||
}
|
||||
*ppi_lock = opt_new_peer_info.clone();
|
||||
|
||||
pi
|
||||
opt_new_peer_info
|
||||
};
|
||||
|
||||
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { peer_info }) {
|
||||
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent {
|
||||
routing_domain: RoutingDomain::LocalNetwork,
|
||||
opt_peer_info,
|
||||
}) {
|
||||
log_rtab!(debug "Failed to post event: {}", e);
|
||||
}
|
||||
|
||||
|
@ -28,19 +28,21 @@ pub trait RoutingDomainDetail {
|
||||
fn outbound_dial_info_filter(&self) -> DialInfoFilter;
|
||||
fn get_peer_info(&self, rti: &RoutingTableInner) -> Arc<PeerInfo>;
|
||||
|
||||
/// Can this routing domain contain a particular address
|
||||
// Can this routing domain contain a particular address
|
||||
fn can_contain_address(&self, address: Address) -> bool;
|
||||
fn ensure_dial_info_is_valid(&self, dial_info: &DialInfo) -> bool;
|
||||
|
||||
/// Refresh caches if external data changes
|
||||
// Refresh caches if external data changes
|
||||
fn refresh(&self);
|
||||
|
||||
/// Publish current peer info to the world
|
||||
// Confirm network class as valid
|
||||
|
||||
// Publish current peer info to the world
|
||||
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool;
|
||||
fn unpublish_peer_info(&self);
|
||||
|
||||
/// Get the contact method required for node A to reach node B in this routing domain
|
||||
/// Routing table must be locked for reading to use this function
|
||||
// Get the contact method required for node A to reach node B in this routing domain
|
||||
// Routing table must be locked for reading to use this function
|
||||
fn get_contact_method(
|
||||
&self,
|
||||
rti: &RoutingTableInner,
|
||||
@ -119,6 +121,7 @@ struct RoutingDomainDetailCommon {
|
||||
relay_node: Option<NodeRef>,
|
||||
capabilities: Vec<Capability>,
|
||||
dial_info_details: Vec<DialInfoDetail>,
|
||||
confirmed: bool,
|
||||
// caches
|
||||
cached_peer_info: Mutex<Option<Arc<PeerInfo>>>,
|
||||
relay_node_last_keepalive: Option<Timestamp>,
|
||||
@ -134,6 +137,7 @@ impl RoutingDomainDetailCommon {
|
||||
relay_node: Default::default(),
|
||||
capabilities: Default::default(),
|
||||
dial_info_details: Default::default(),
|
||||
confirmed: false,
|
||||
cached_peer_info: Mutex::new(Default::default()),
|
||||
relay_node_last_keepalive: Default::default(),
|
||||
}
|
||||
@ -147,7 +151,7 @@ impl RoutingDomainDetailCommon {
|
||||
if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
|
||||
NetworkClass::WebApp
|
||||
} else {
|
||||
if self.address_types.is_empty() {
|
||||
if self.address_types.is_empty() || !self.confirmed {
|
||||
NetworkClass::Invalid
|
||||
}
|
||||
else if self.dial_info_details.is_empty() {
|
||||
@ -260,11 +264,13 @@ impl RoutingDomainDetailCommon {
|
||||
inbound_protocols: ProtocolTypeSet,
|
||||
address_types: AddressTypeSet,
|
||||
capabilities: Vec<Capability>,
|
||||
confirmed: bool,
|
||||
) {
|
||||
self.outbound_protocols = outbound_protocols;
|
||||
self.inbound_protocols = inbound_protocols;
|
||||
self.address_types = address_types;
|
||||
self.capabilities = capabilities;
|
||||
self.confirmed = confirmed;
|
||||
self.clear_cache();
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,7 @@ impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet<'a
|
||||
inbound_protocols: ProtocolTypeSet,
|
||||
address_types: AddressTypeSet,
|
||||
capabilities: Vec<Capability>,
|
||||
confirmed: bool,
|
||||
) -> &mut Self {
|
||||
self.changes.push(RoutingDomainChangePublicInternet::Common(
|
||||
RoutingDomainChangeCommon::SetupNetwork {
|
||||
@ -108,6 +109,7 @@ impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet<'a
|
||||
inbound_protocols,
|
||||
address_types,
|
||||
capabilities,
|
||||
confirmed,
|
||||
},
|
||||
));
|
||||
self
|
||||
|
@ -93,39 +93,54 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
}
|
||||
|
||||
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool {
|
||||
let peer_info = {
|
||||
let pi = self.get_peer_info(rti);
|
||||
let opt_peer_info = {
|
||||
let opt_new_peer_info = {
|
||||
let pi = self.get_peer_info(rti);
|
||||
|
||||
// If the network class is not yet determined, don't publish
|
||||
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
|
||||
log_rtab!(debug "[PublicInternet] Not publishing peer info with invalid network class");
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we need a relay and we don't have one, d on't publish yet
|
||||
if let Some(_relay_kind) = self.requires_relay() {
|
||||
if pi.signed_node_info().relay_ids().is_empty() {
|
||||
if pi.signed_node_info().node_info().network_class() == NetworkClass::Invalid {
|
||||
// If the network class is not yet determined, don't publish
|
||||
log_rtab!(debug "[PublicInternet] Not publishing peer info with invalid network class");
|
||||
None
|
||||
} else if self.requires_relay().is_some()
|
||||
&& pi.signed_node_info().relay_ids().is_empty()
|
||||
{
|
||||
// If we need a relay and we don't have one, don't publish yet
|
||||
log_rtab!(debug "[PublicInternet] Not publishing peer info that wants relay until we have a relay");
|
||||
return false;
|
||||
None
|
||||
} else {
|
||||
// This peerinfo is fit to publish
|
||||
Some(pi)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Don't publish if the peer info hasnt changed from our previous publication
|
||||
let mut ppi_lock = self.published_peer_info.lock();
|
||||
if let Some(old_peer_info) = &*ppi_lock {
|
||||
if pi.equivalent(old_peer_info) {
|
||||
log_rtab!(debug "[PublicInternet] Not publishing peer info because it is equivalent");
|
||||
return false;
|
||||
if let Some(new_peer_info) = &opt_new_peer_info {
|
||||
if new_peer_info.equivalent(old_peer_info) {
|
||||
log_rtab!(debug "[PublicInternet] Not publishing peer info because it is equivalent");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else if opt_new_peer_info.is_none() {
|
||||
log_rtab!(debug "[PublicInternet] Not publishing peer info because it is still None");
|
||||
return false;
|
||||
}
|
||||
|
||||
log_rtab!(debug "[PublicInternet] Published new peer info: {}", pi);
|
||||
*ppi_lock = Some(pi.clone());
|
||||
if opt_new_peer_info.is_some() {
|
||||
log_rtab!(debug "[PublicInternet] Published new peer info: {}", opt_new_peer_info.as_ref().unwrap());
|
||||
} else {
|
||||
log_rtab!(debug "[PublicInternet] Unpublishing because current peer info is invalid");
|
||||
}
|
||||
*ppi_lock = opt_new_peer_info.clone();
|
||||
|
||||
pi
|
||||
opt_new_peer_info
|
||||
};
|
||||
|
||||
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { peer_info }) {
|
||||
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent {
|
||||
routing_domain: RoutingDomain::PublicInternet,
|
||||
opt_peer_info,
|
||||
}) {
|
||||
log_rtab!(debug "Failed to post event: {}", e);
|
||||
}
|
||||
|
||||
|
@ -315,6 +315,8 @@ impl RoutingTable {
|
||||
// Try a different dialinfo next time
|
||||
network_manager.address_filter().set_dial_info_failed(bsdi);
|
||||
} else {
|
||||
info!("bootstrap of {} successful via {}", crypto_kind, nr);
|
||||
|
||||
// otherwise this bootstrap is valid, lets ask it to find ourselves now
|
||||
routing_table.reverse_find_node(crypto_kind, nr, true, vec![]).await
|
||||
}
|
||||
|
@ -6,8 +6,14 @@ impl RoutingTable {
|
||||
fn public_internet_wants_relay(&self) -> Option<RelayKind> {
|
||||
let own_peer_info = self.get_current_peer_info(RoutingDomain::PublicInternet);
|
||||
let own_node_info = own_peer_info.signed_node_info().node_info();
|
||||
let network_class = own_node_info.network_class();
|
||||
|
||||
// If we need a relay, always request one
|
||||
// Never give a relay to something with an invalid network class
|
||||
if matches!(network_class, NetworkClass::Invalid) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If we -need- a relay always request one
|
||||
let requires_relay = self
|
||||
.inner
|
||||
.read()
|
||||
|
@ -283,6 +283,36 @@ fn get_dht_key(
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_node_ref(
|
||||
registry: VeilidComponentRegistry,
|
||||
safety_selection: SafetySelection,
|
||||
) -> impl FnOnce(&str) -> SendPinBoxFuture<Option<NodeRef>> {
|
||||
move |text| {
|
||||
let text = text.to_owned();
|
||||
Box::pin(async move {
|
||||
let nr = if let Some(key) = get_public_key(&text) {
|
||||
let node_id = TypedKey::new(best_crypto_kind(), key);
|
||||
registry
|
||||
.rpc_processor()
|
||||
.resolve_node(node_id, safety_selection)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()?
|
||||
} else if let Some(node_id) = get_typed_key(&text) {
|
||||
registry
|
||||
.rpc_processor()
|
||||
.resolve_node(node_id, safety_selection)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()?
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
Some(nr)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_filtered_node_ref(
|
||||
registry: VeilidComponentRegistry,
|
||||
safety_selection: SafetySelection,
|
||||
@ -683,18 +713,19 @@ impl VeilidAPI {
|
||||
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
|
||||
let registry = self.core_context()?.registry();
|
||||
|
||||
let relay_node = get_debug_argument_at(
|
||||
let relay_node = async_get_debug_argument_at(
|
||||
&args,
|
||||
0,
|
||||
"debug_relay",
|
||||
"node_id",
|
||||
get_node_ref(registry.clone()),
|
||||
resolve_node_ref(registry.clone(), SafetySelection::default()),
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
let routing_domain = get_debug_argument_at(
|
||||
&args,
|
||||
0,
|
||||
1,
|
||||
"debug_relay",
|
||||
"routing_domain",
|
||||
get_routing_domain,
|
||||
|
Loading…
x
Reference in New Issue
Block a user