mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-12-28 08:49:29 -05:00
fix connections
This commit is contained in:
parent
2b9044fdfa
commit
855a5a0756
@ -286,17 +286,12 @@ impl BucketEntryInner {
|
||||
}
|
||||
|
||||
// Check connections
|
||||
let connection_manager = rti.network_manager().connection_manager();
|
||||
let last_connections = self.last_connections(
|
||||
rti,
|
||||
true,
|
||||
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
|
||||
);
|
||||
for lc in last_connections {
|
||||
if connection_manager.get_connection(lc.0).is_some() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
!last_connections.is_empty()
|
||||
}
|
||||
|
||||
pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> {
|
||||
@ -343,21 +338,21 @@ impl BucketEntryInner {
|
||||
}
|
||||
// Check connections
|
||||
let mut best_routing_domain: Option<RoutingDomain> = None;
|
||||
let connection_manager = rti.network_manager().connection_manager();
|
||||
let last_connections = self.last_connections(
|
||||
rti,
|
||||
true,
|
||||
Some(NodeRefFilter::new().with_routing_domain_set(routing_domain_set)),
|
||||
);
|
||||
for lc in last_connections {
|
||||
if connection_manager.get_connection(lc.0).is_some() {
|
||||
if let Some(rd) = rti.routing_domain_for_address(lc.0.remote_address().address()) {
|
||||
if let Some(brd) = best_routing_domain {
|
||||
if rd < brd {
|
||||
best_routing_domain = Some(rd);
|
||||
}
|
||||
} else {
|
||||
if let Some(rd) =
|
||||
rti.routing_domain_for_address(lc.0.remote_address().address())
|
||||
{
|
||||
if let Some(brd) = best_routing_domain {
|
||||
if rd < brd {
|
||||
best_routing_domain = Some(rd);
|
||||
}
|
||||
} else {
|
||||
best_routing_domain = Some(rd);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -383,12 +378,16 @@ impl BucketEntryInner {
|
||||
self.last_connections.clear();
|
||||
}
|
||||
|
||||
// Gets all the 'last connections' that match a particular filter
|
||||
// Gets all the 'last connections' that match a particular filter, and their accompanying timestamps of last use
|
||||
pub(super) fn last_connections(
|
||||
&self,
|
||||
rti: &RoutingTableInner,
|
||||
only_live: bool,
|
||||
filter: Option<NodeRefFilter>,
|
||||
) -> Vec<(ConnectionDescriptor, u64)> {
|
||||
let connection_manager =
|
||||
rti.unlocked_inner.network_manager.connection_manager();
|
||||
|
||||
let mut out: Vec<(ConnectionDescriptor, u64)> = self
|
||||
.last_connections
|
||||
.iter()
|
||||
@ -414,7 +413,29 @@ impl BucketEntryInner {
|
||||
// no filter
|
||||
true
|
||||
};
|
||||
if include {
|
||||
|
||||
if !include {
|
||||
return None;
|
||||
}
|
||||
|
||||
if !only_live {
|
||||
return Some(v.clone());
|
||||
}
|
||||
|
||||
// Check if the connection is still considered live
|
||||
let alive =
|
||||
// Should we check the connection table?
|
||||
if v.0.protocol_type().is_connection_oriented() {
|
||||
// Look the connection up in the connection manager and see if it's still there
|
||||
connection_manager.get_connection(v.0).is_some()
|
||||
} else {
|
||||
// If this is not connection oriented, then we check our last seen time
|
||||
// to see if this mapping has expired (beyond our timeout)
|
||||
let cur_ts = get_timestamp();
|
||||
(v.1 + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts
|
||||
};
|
||||
|
||||
if alive {
|
||||
Some(v.clone())
|
||||
} else {
|
||||
None
|
||||
|
@ -30,9 +30,17 @@ pub use routing_table_inner::*;
|
||||
pub use stats_accounting::*;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// How frequently we tick the relay management routine
|
||||
pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
|
||||
|
||||
/// How frequently we tick the private route management routine
|
||||
pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1;
|
||||
|
||||
// Connectionless protocols like UDP are dependent on a NAT translation timeout
|
||||
// We should ping them with some frequency and 30 seconds is typical timeout
|
||||
pub const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29;
|
||||
|
||||
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
|
||||
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -2,10 +2,6 @@ use super::*;
|
||||
use crate::crypto::*;
|
||||
use alloc::fmt;
|
||||
|
||||
// Connectionless protocols like UDP are dependent on a NAT translation timeout
|
||||
// We should ping them with some frequency and 30 seconds is typical timeout
|
||||
const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct NodeRefBaseCommon {
|
||||
@ -272,28 +268,8 @@ pub trait NodeRefBase: Sized {
|
||||
// Get the last connections and the last time we saw anything with this connection
|
||||
// Filtered first and then sorted by most recent
|
||||
self.operate(|rti, e| {
|
||||
let last_connections = e.last_connections(rti, self.common().filter.clone());
|
||||
|
||||
// Do some checks to ensure these are possibly still 'live'
|
||||
for (last_connection, last_seen) in last_connections {
|
||||
// Should we check the connection table?
|
||||
if last_connection.protocol_type().is_connection_oriented() {
|
||||
// Look the connection up in the connection manager and see if it's still there
|
||||
let connection_manager =
|
||||
rti.unlocked_inner.network_manager.connection_manager();
|
||||
if connection_manager.get_connection(last_connection).is_some() {
|
||||
return Some(last_connection);
|
||||
}
|
||||
} else {
|
||||
// If this is not connection oriented, then we check our last seen time
|
||||
// to see if this mapping has expired (beyond our timeout)
|
||||
let cur_ts = get_timestamp();
|
||||
if (last_seen + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts {
|
||||
return Some(last_connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
let last_connections = e.last_connections(rti, true, self.common().filter.clone());
|
||||
last_connections.first().map(|x| x.0)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -686,7 +686,7 @@ impl RPCProcessor {
|
||||
|
||||
/// Get signed node info to package with RPC messages to improve
|
||||
/// routing table caching when it is okay to do so
|
||||
#[instrument(skip(self), ret, err)]
|
||||
#[instrument(level = "trace", skip(self), ret, err)]
|
||||
fn get_sender_signed_node_info(
|
||||
&self,
|
||||
dest: &Destination,
|
||||
|
Loading…
Reference in New Issue
Block a user