mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
dial info failure reprioritization
This commit is contained in:
parent
10ec693fb4
commit
0249b7c7ae
@ -61,7 +61,7 @@ pub fn veilid_version() -> (u32, u32, u32) {
|
||||
#[cfg(target_os = "android")]
|
||||
pub use intf::android::veilid_core_setup_android;
|
||||
|
||||
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [
|
||||
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 23] = [
|
||||
"mio",
|
||||
"h2",
|
||||
"hyper",
|
||||
@ -83,6 +83,8 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [
|
||||
"trust_dns_resolver",
|
||||
"trust_dns_proto",
|
||||
"attohttpc",
|
||||
"ws_stream_wasm",
|
||||
"keyvaluedb_web",
|
||||
];
|
||||
|
||||
use cfg_if::*;
|
||||
|
@ -4,6 +4,8 @@ use alloc::collections::btree_map::Entry;
|
||||
// XXX: Move to config eventually?
|
||||
const PUNISHMENT_DURATION_MIN: usize = 60;
|
||||
const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536;
|
||||
const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10;
|
||||
const MAX_DIAL_INFO_FAILURES: usize = 65536;
|
||||
|
||||
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum AddressFilterError {
|
||||
@ -28,6 +30,7 @@ struct AddressFilterInner {
|
||||
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
|
||||
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
|
||||
punishments_by_node_id: BTreeMap<TypedKey, Timestamp>,
|
||||
dial_info_failures: BTreeMap<DialInfo, Timestamp>,
|
||||
}
|
||||
|
||||
struct AddressFilterUnlockedInner {
|
||||
@ -36,6 +39,7 @@ struct AddressFilterUnlockedInner {
|
||||
max_connections_per_ip6_prefix_size: usize,
|
||||
max_connection_frequency_per_min: usize,
|
||||
punishment_duration_min: usize,
|
||||
dial_info_failure_duration_min: usize,
|
||||
routing_table: RoutingTable,
|
||||
}
|
||||
|
||||
@ -56,6 +60,10 @@ impl fmt::Debug for AddressFilterUnlockedInner {
|
||||
&self.max_connection_frequency_per_min,
|
||||
)
|
||||
.field("punishment_duration_min", &self.punishment_duration_min)
|
||||
.field(
|
||||
"dial_info_failure_duration_min",
|
||||
&self.dial_info_failure_duration_min,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@ -78,6 +86,7 @@ impl AddressFilter {
|
||||
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min
|
||||
as usize,
|
||||
punishment_duration_min: PUNISHMENT_DURATION_MIN,
|
||||
dial_info_failure_duration_min: DIAL_INFO_FAILURE_DURATION_MIN,
|
||||
routing_table,
|
||||
}),
|
||||
inner: Arc::new(Mutex::new(AddressFilterInner {
|
||||
@ -88,10 +97,17 @@ impl AddressFilter {
|
||||
punishments_by_ip4: BTreeMap::new(),
|
||||
punishments_by_ip6_prefix: BTreeMap::new(),
|
||||
punishments_by_node_id: BTreeMap::new(),
|
||||
dial_info_failures: BTreeMap::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
// When the network restarts, some of the address filter can be cleared
|
||||
pub fn restart(&self) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.dial_info_failures.clear();
|
||||
}
|
||||
|
||||
fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
|
||||
// v4
|
||||
{
|
||||
@ -180,6 +196,22 @@ impl AddressFilter {
|
||||
}
|
||||
}
|
||||
}
|
||||
// dial info
|
||||
{
|
||||
let mut dead_keys = Vec::<DialInfo>::new();
|
||||
for (key, value) in &mut inner.dial_info_failures {
|
||||
// Drop failures older than the failure duration
|
||||
if cur_ts.as_u64().saturating_sub(value.as_u64())
|
||||
> self.unlocked_inner.dial_info_failure_duration_min as u64 * 60_000_000u64
|
||||
{
|
||||
dead_keys.push(key.clone());
|
||||
}
|
||||
}
|
||||
for key in dead_keys {
|
||||
log_net!(debug ">>> DIALINFO PERMIT: {}", key);
|
||||
inner.dial_info_failures.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool {
|
||||
@ -198,6 +230,14 @@ impl AddressFilter {
|
||||
false
|
||||
}
|
||||
|
||||
fn get_dial_info_failed_ts_inner(
|
||||
&self,
|
||||
inner: &AddressFilterInner,
|
||||
dial_info: &DialInfo,
|
||||
) -> Option<Timestamp> {
|
||||
inner.dial_info_failures.get(dial_info).copied()
|
||||
}
|
||||
|
||||
pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool {
|
||||
let inner = self.inner.lock();
|
||||
let ipblock = ip_to_ipblock(
|
||||
@ -207,6 +247,27 @@ impl AddressFilter {
|
||||
self.is_ip_addr_punished_inner(&*inner, ipblock)
|
||||
}
|
||||
|
||||
pub fn get_dial_info_failed_ts(&self, dial_info: &DialInfo) -> Option<Timestamp> {
|
||||
let inner = self.inner.lock();
|
||||
self.get_dial_info_failed_ts_inner(&*inner, dial_info)
|
||||
}
|
||||
|
||||
pub fn set_dial_info_failed(&self, dial_info: DialInfo) {
|
||||
let ts = get_aligned_timestamp();
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES {
|
||||
log_net!(debug ">>> DIALINFO FAILURE TABLE FULL: {}", dial_info);
|
||||
return;
|
||||
}
|
||||
log_net!(debug ">>> DIALINFO FAILURE: {:?}", dial_info);
|
||||
inner
|
||||
.dial_info_failures
|
||||
.entry(dial_info)
|
||||
.and_modify(|v| *v = ts)
|
||||
.or_insert(ts);
|
||||
}
|
||||
|
||||
pub fn punish_ip_addr(&self, addr: IpAddr) {
|
||||
log_net!(debug ">>> PUNISHED: {}", addr);
|
||||
let ts = get_aligned_timestamp();
|
||||
|
@ -380,6 +380,9 @@ impl NetworkManager {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Clean address filter for things that should not be persistent
|
||||
self.address_filter().restart();
|
||||
|
||||
// Create network components
|
||||
let connection_manager = ConnectionManager::new(self.clone());
|
||||
let net = Network::new(
|
||||
|
@ -384,6 +384,21 @@ impl Network {
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
|
||||
// Record DialInfo failures
|
||||
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
|
||||
&self,
|
||||
dial_info: DialInfo,
|
||||
fut: F,
|
||||
) -> EyreResult<NetworkResult<T>> {
|
||||
let network_result = fut.await?;
|
||||
if matches!(network_result, NetworkResult::NoConnection(_)) {
|
||||
self.network_manager()
|
||||
.address_filter()
|
||||
.set_dial_info_failed(dial_info);
|
||||
}
|
||||
Ok(network_result)
|
||||
}
|
||||
|
||||
// Send data to a dial info, unbound, using a new connection from a random port
|
||||
// This creates a short-lived connection in the case of connection-oriented protocols
|
||||
// for the purpose of sending this one message.
|
||||
@ -394,59 +409,62 @@ impl Network {
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> EyreResult<NetworkResult<()>> {
|
||||
let data_len = data.len();
|
||||
let connect_timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
self.record_dial_info_failure(dial_info.clone(), async move {
|
||||
let data_len = data.len();
|
||||
let connect_timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("create socket failure")?;
|
||||
let _ = network_result_try!(h
|
||||
.send_message(data, peer_socket_addr)
|
||||
.await
|
||||
.map(NetworkResult::Value)
|
||||
.wrap_err("send message failure")?);
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let pnc = network_result_try!(RawTcpProtocolHandler::connect(
|
||||
None,
|
||||
peer_socket_addr,
|
||||
connect_timeout_ms
|
||||
)
|
||||
.await
|
||||
.wrap_err("create socket failure")?;
|
||||
let _ = network_result_try!(h
|
||||
.send_message(data, peer_socket_addr)
|
||||
.wrap_err("connect failure")?);
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
|
||||
None,
|
||||
&dial_info,
|
||||
connect_timeout_ms
|
||||
)
|
||||
.await
|
||||
.map(NetworkResult::Value)
|
||||
.wrap_err("send message failure")?);
|
||||
.wrap_err("connect failure")?);
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
}
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let pnc = network_result_try!(RawTcpProtocolHandler::connect(
|
||||
None,
|
||||
peer_socket_addr,
|
||||
connect_timeout_ms
|
||||
)
|
||||
.await
|
||||
.wrap_err("connect failure")?);
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
|
||||
None,
|
||||
&dial_info,
|
||||
connect_timeout_ms
|
||||
)
|
||||
.await
|
||||
.wrap_err("connect failure")?);
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
}
|
||||
}
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
Ok(NetworkResult::Value(()))
|
||||
Ok(NetworkResult::Value(()))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
// Send data to a dial info, unbound, using a new connection from a random port
|
||||
@ -461,85 +479,95 @@ impl Network {
|
||||
data: Vec<u8>,
|
||||
timeout_ms: u32,
|
||||
) -> EyreResult<NetworkResult<Vec<u8>>> {
|
||||
let data_len = data.len();
|
||||
let connect_timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
self.record_dial_info_failure(dial_info.clone(), async move {
|
||||
let data_len = data.len();
|
||||
let connect_timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("create socket failure")?;
|
||||
network_result_try!(h
|
||||
.send_message(data, peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("send message failure")?);
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
// receive single response
|
||||
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
|
||||
let (recv_len, recv_addr) = network_result_try!(timeout(
|
||||
timeout_ms,
|
||||
h.recv_message(&mut out).instrument(Span::current())
|
||||
)
|
||||
.await
|
||||
.into_network_result())
|
||||
.wrap_err("recv_message failure")?;
|
||||
|
||||
let recv_socket_addr = recv_addr.remote_address().to_socket_addr();
|
||||
self.network_manager()
|
||||
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
|
||||
|
||||
// if the from address is not the same as the one we sent to, then drop this
|
||||
if recv_socket_addr != peer_socket_addr {
|
||||
bail!("wrong address");
|
||||
}
|
||||
out.resize(recv_len, 0u8);
|
||||
Ok(NetworkResult::Value(out))
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => unreachable!(),
|
||||
ProtocolType::TCP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
RawTcpProtocolHandler::connect(None, peer_socket_addr, connect_timeout_ms)
|
||||
.await
|
||||
.wrap_err("connect failure")?
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
|
||||
.await
|
||||
.wrap_err("connect failure")?
|
||||
}
|
||||
});
|
||||
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("create socket failure")?;
|
||||
network_result_try!(h
|
||||
.send_message(data, peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("send message failure")?);
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
|
||||
// receive single response
|
||||
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
|
||||
let (recv_len, recv_addr) = network_result_try!(timeout(
|
||||
timeout_ms,
|
||||
h.recv_message(&mut out).instrument(Span::current())
|
||||
)
|
||||
.await
|
||||
.into_network_result())
|
||||
.wrap_err("recv failure")?);
|
||||
.wrap_err("recv_message failure")?;
|
||||
|
||||
self.network_manager()
|
||||
.stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64));
|
||||
let recv_socket_addr = recv_addr.remote_address().to_socket_addr();
|
||||
self.network_manager()
|
||||
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
|
||||
|
||||
Ok(NetworkResult::Value(out))
|
||||
// if the from address is not the same as the one we sent to, then drop this
|
||||
if recv_socket_addr != peer_socket_addr {
|
||||
bail!("wrong address");
|
||||
}
|
||||
out.resize(recv_len, 0u8);
|
||||
Ok(NetworkResult::Value(out))
|
||||
}
|
||||
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => unreachable!(),
|
||||
ProtocolType::TCP => {
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
RawTcpProtocolHandler::connect(
|
||||
None,
|
||||
peer_socket_addr,
|
||||
connect_timeout_ms,
|
||||
)
|
||||
.await
|
||||
.wrap_err("connect failure")?
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
|
||||
.await
|
||||
.wrap_err("connect failure")?
|
||||
}
|
||||
});
|
||||
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
let out =
|
||||
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
|
||||
.await
|
||||
.into_network_result())
|
||||
.wrap_err("recv failure")?);
|
||||
|
||||
self.network_manager().stats_packet_rcvd(
|
||||
dial_info.to_ip_addr(),
|
||||
ByteCount::new(out.len() as u64),
|
||||
);
|
||||
|
||||
Ok(NetworkResult::Value(out))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
|
||||
@ -609,41 +637,44 @@ impl Network {
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
|
||||
let data_len = data.len();
|
||||
let connection_descriptor;
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
// Handle connectionless protocol
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
|
||||
Some(ph) => ph,
|
||||
None => bail!("no appropriate UDP protocol handler for dial_info"),
|
||||
};
|
||||
connection_descriptor = network_result_try!(ph
|
||||
.send_message(data, peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("failed to send data to dial info")?);
|
||||
} else {
|
||||
// Handle connection-oriented protocols
|
||||
let conn = network_result_try!(
|
||||
self.connection_manager()
|
||||
.get_or_create_connection(dial_info.clone())
|
||||
.await?
|
||||
);
|
||||
self.record_dial_info_failure(dial_info.clone(), async move {
|
||||
let data_len = data.len();
|
||||
let connection_descriptor;
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
// Handle connectionless protocol
|
||||
let peer_socket_addr = dial_info.to_socket_addr();
|
||||
let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
|
||||
Some(ph) => ph,
|
||||
None => bail!("no appropriate UDP protocol handler for dial_info"),
|
||||
};
|
||||
connection_descriptor = network_result_try!(ph
|
||||
.send_message(data, peer_socket_addr)
|
||||
.await
|
||||
.wrap_err("failed to send data to dial info")?);
|
||||
} else {
|
||||
// Handle connection-oriented protocols
|
||||
let conn = network_result_try!(
|
||||
self.connection_manager()
|
||||
.get_or_create_connection(dial_info.clone())
|
||||
.await?
|
||||
);
|
||||
|
||||
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
|
||||
return Ok(NetworkResult::NoConnection(io::Error::new(
|
||||
io::ErrorKind::ConnectionReset,
|
||||
"failed to send",
|
||||
)));
|
||||
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
|
||||
return Ok(NetworkResult::NoConnection(io::Error::new(
|
||||
io::ErrorKind::ConnectionReset,
|
||||
"failed to send",
|
||||
)));
|
||||
}
|
||||
connection_descriptor = conn.connection_descriptor();
|
||||
}
|
||||
connection_descriptor = conn.connection_descriptor();
|
||||
}
|
||||
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
Ok(NetworkResult::value(connection_descriptor))
|
||||
Ok(NetworkResult::value(connection_descriptor))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
@ -357,6 +357,24 @@ impl NetworkManager {
|
||||
// log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan());
|
||||
// sequencing = Sequencing::PreferOrdered;
|
||||
// }
|
||||
|
||||
// Deprioritize dial info that have recently failed
|
||||
let address_filter = self.address_filter();
|
||||
let mut dial_info_failures_map = BTreeMap::<DialInfo, Timestamp>::new();
|
||||
for did in peer_b.signed_node_info().node_info().all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |_| true) {
|
||||
if let Some(ts) = address_filter.get_dial_info_failed_ts(&did.dial_info) {
|
||||
dial_info_failures_map.insert(did.dial_info, ts);
|
||||
}
|
||||
}
|
||||
let dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if dial_info_failures_map.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(move |a: &DialInfoDetail, b: &DialInfoDetail| {
|
||||
let ats = dial_info_failures_map.get(&a.dial_info).copied().unwrap_or_default();
|
||||
let bts = dial_info_failures_map.get(&b.dial_info).copied().unwrap_or_default();
|
||||
ats.cmp(&bts)
|
||||
}))
|
||||
};
|
||||
|
||||
// Get the best contact method with these parameters from the routing domain
|
||||
let cm = routing_table.get_contact_method(
|
||||
@ -365,6 +383,7 @@ impl NetworkManager {
|
||||
&peer_b,
|
||||
dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort,
|
||||
);
|
||||
|
||||
// Translate the raw contact method to a referenced contact method
|
||||
|
@ -118,47 +118,66 @@ impl Network {
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
||||
// Record DialInfo failures
|
||||
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
|
||||
&self,
|
||||
dial_info: DialInfo,
|
||||
fut: F,
|
||||
) -> EyreResult<NetworkResult<T>> {
|
||||
let network_result = fut.await?;
|
||||
if matches!(network_result, NetworkResult::NoConnection(_)) {
|
||||
self.network_manager()
|
||||
.address_filter()
|
||||
.set_dial_info_failed(dial_info);
|
||||
}
|
||||
Ok(network_result)
|
||||
}
|
||||
|
||||
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
|
||||
pub async fn send_data_unbound_to_dial_info(
|
||||
&self,
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> EyreResult<NetworkResult<()>> {
|
||||
let data_len = data.len();
|
||||
let timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
self.record_dial_info_failure(dial_info.clone(), async move {
|
||||
let data_len = data.len();
|
||||
let timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
bail!("no support for UDP protocol")
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc =
|
||||
network_result_try!(WebsocketProtocolHandler::connect(&dial_info, timeout_ms)
|
||||
.await
|
||||
.wrap_err("connect failure")?);
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
}
|
||||
};
|
||||
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
bail!("no support for UDP protocol")
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
|
||||
&dial_info, timeout_ms
|
||||
)
|
||||
.await
|
||||
.wrap_err("connect failure")?);
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(NetworkResult::Value(()))
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
Ok(NetworkResult::Value(()))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
// Send data to a dial info, unbound, using a new connection from a random port
|
||||
@ -173,53 +192,59 @@ impl Network {
|
||||
data: Vec<u8>,
|
||||
timeout_ms: u32,
|
||||
) -> EyreResult<NetworkResult<Vec<u8>>> {
|
||||
let data_len = data.len();
|
||||
let connect_timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
self.record_dial_info_failure(dial_info.clone(), async move {
|
||||
let data_len = data.len();
|
||||
let connect_timeout_ms = {
|
||||
let c = self.config.get();
|
||||
c.network.connection_initial_timeout_ms
|
||||
};
|
||||
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
if self
|
||||
.network_manager()
|
||||
.address_filter()
|
||||
.is_ip_addr_punished(dial_info.address().to_ip_addr())
|
||||
{
|
||||
return Ok(NetworkResult::no_connection_other("punished"));
|
||||
}
|
||||
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
bail!("no support for UDP protocol")
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => unreachable!(),
|
||||
ProtocolType::TCP => unreachable!(),
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms)
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
bail!("no support for UDP protocol")
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
let pnc = network_result_try!(match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => unreachable!(),
|
||||
ProtocolType::TCP => unreachable!(),
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms)
|
||||
.await
|
||||
.wrap_err("connect failure")?
|
||||
}
|
||||
});
|
||||
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
let out =
|
||||
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
|
||||
.await
|
||||
.wrap_err("connect failure")?
|
||||
}
|
||||
});
|
||||
.into_network_result())
|
||||
.wrap_err("recv failure")?);
|
||||
|
||||
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
self.network_manager().stats_packet_rcvd(
|
||||
dial_info.to_ip_addr(),
|
||||
ByteCount::new(out.len() as u64),
|
||||
);
|
||||
|
||||
let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
|
||||
.await
|
||||
.into_network_result())
|
||||
.wrap_err("recv failure")?);
|
||||
|
||||
self.network_manager()
|
||||
.stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64));
|
||||
|
||||
Ok(NetworkResult::Value(out))
|
||||
Ok(NetworkResult::Value(out))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
|
||||
@ -273,34 +298,37 @@ impl Network {
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
|
||||
let data_len = data.len();
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
bail!("no support for UDP protocol");
|
||||
}
|
||||
if dial_info.protocol_type() == ProtocolType::TCP {
|
||||
bail!("no support for TCP protocol");
|
||||
}
|
||||
self.record_dial_info_failure(dial_info.clone(), async move {
|
||||
let data_len = data.len();
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
bail!("no support for UDP protocol");
|
||||
}
|
||||
if dial_info.protocol_type() == ProtocolType::TCP {
|
||||
bail!("no support for TCP protocol");
|
||||
}
|
||||
|
||||
// Handle connection-oriented protocols
|
||||
let conn = network_result_try!(
|
||||
self.connection_manager()
|
||||
.get_or_create_connection(dial_info.clone())
|
||||
.await?
|
||||
);
|
||||
// Handle connection-oriented protocols
|
||||
let conn = network_result_try!(
|
||||
self.connection_manager()
|
||||
.get_or_create_connection(dial_info.clone())
|
||||
.await?
|
||||
);
|
||||
|
||||
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
|
||||
return Ok(NetworkResult::NoConnection(io::Error::new(
|
||||
io::ErrorKind::ConnectionReset,
|
||||
"failed to send",
|
||||
)));
|
||||
}
|
||||
let connection_descriptor = conn.connection_descriptor();
|
||||
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
|
||||
return Ok(NetworkResult::NoConnection(io::Error::new(
|
||||
io::ErrorKind::ConnectionReset,
|
||||
"failed to send",
|
||||
)));
|
||||
}
|
||||
let connection_descriptor = conn.connection_descriptor();
|
||||
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
|
||||
|
||||
Ok(NetworkResult::value(connection_descriptor))
|
||||
Ok(NetworkResult::value(connection_descriptor))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
@ -538,6 +538,7 @@ impl RoutingTable {
|
||||
peer_b: &PeerInfo,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
sequencing: Sequencing,
|
||||
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
|
||||
) -> ContactMethod {
|
||||
self.inner.read().get_contact_method(
|
||||
routing_domain,
|
||||
@ -545,6 +546,7 @@ impl RoutingTable {
|
||||
peer_b,
|
||||
dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -401,6 +401,7 @@ impl RouteSpecStore {
|
||||
current_node,
|
||||
DialInfoFilter::all(),
|
||||
sequencing,
|
||||
None,
|
||||
);
|
||||
if matches!(cm, ContactMethod::Unreachable) {
|
||||
reachable = false;
|
||||
@ -415,6 +416,7 @@ impl RouteSpecStore {
|
||||
current_node,
|
||||
DialInfoFilter::all(),
|
||||
Sequencing::EnsureOrdered,
|
||||
None,
|
||||
);
|
||||
if matches!(cm, ContactMethod::Unreachable) {
|
||||
can_do_sequenced = false;
|
||||
@ -438,6 +440,7 @@ impl RouteSpecStore {
|
||||
current_node,
|
||||
DialInfoFilter::all(),
|
||||
sequencing,
|
||||
None,
|
||||
);
|
||||
if matches!(cm, ContactMethod::Unreachable) {
|
||||
reachable = false;
|
||||
@ -452,6 +455,7 @@ impl RouteSpecStore {
|
||||
current_node,
|
||||
DialInfoFilter::all(),
|
||||
Sequencing::EnsureOrdered,
|
||||
None,
|
||||
);
|
||||
if matches!(cm, ContactMethod::Unreachable) {
|
||||
can_do_sequenced = false;
|
||||
|
@ -220,6 +220,7 @@ pub trait RoutingDomainDetail {
|
||||
peer_b: &PeerInfo,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
sequencing: Sequencing,
|
||||
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
|
||||
) -> ContactMethod;
|
||||
}
|
||||
|
||||
@ -245,6 +246,7 @@ fn first_filtered_dial_info_detail_between_nodes(
|
||||
to_node: &NodeInfo,
|
||||
dial_info_filter: &DialInfoFilter,
|
||||
sequencing: Sequencing,
|
||||
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>
|
||||
) -> Option<DialInfoDetail> {
|
||||
let dial_info_filter = dial_info_filter.clone().filtered(
|
||||
&DialInfoFilter::all()
|
||||
@ -253,11 +255,28 @@ fn first_filtered_dial_info_detail_between_nodes(
|
||||
);
|
||||
|
||||
// Apply sequencing and get sort
|
||||
// Include sorting by external dial info sort for rotating through dialinfo
|
||||
// based on an external preference table, for example the one kept by
|
||||
// AddressFilter to deprioritize dialinfo that have recently failed to connect
|
||||
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
|
||||
let sort = if ordered {
|
||||
Some(DialInfoDetail::ordered_sequencing_sort)
|
||||
let sort: Option<Box<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if ordered {
|
||||
if let Some(dif_sort) = dif_sort {
|
||||
Some(Box::new(move |a, b| {
|
||||
let mut ord = dif_sort(a,b);
|
||||
if ord == core::cmp::Ordering::Equal {
|
||||
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
|
||||
}
|
||||
ord
|
||||
}))
|
||||
} else {
|
||||
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
if let Some(dif_sort) = dif_sort {
|
||||
Some(Box::new(move |a,b| { dif_sort(a,b) }))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// If the filter is dead then we won't be able to connect
|
||||
@ -287,6 +306,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
peer_b: &PeerInfo,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
sequencing: Sequencing,
|
||||
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
|
||||
) -> ContactMethod {
|
||||
// Get the nodeinfos for convenience
|
||||
let node_a = peer_a.signed_node_info().node_info();
|
||||
@ -304,7 +324,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
|
||||
// Get the best match dial info for node B if we have it
|
||||
if let Some(target_did) =
|
||||
first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing)
|
||||
first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort.clone())
|
||||
{
|
||||
// Do we need to signal before going inbound?
|
||||
if !target_did.class.requires_signal() {
|
||||
@ -334,6 +354,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
node_b_relay,
|
||||
&dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort.clone(),
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
@ -347,6 +368,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
node_a,
|
||||
&dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort.clone()
|
||||
) {
|
||||
// Ensure we aren't on the same public IP address (no hairpin nat)
|
||||
if reverse_did.dial_info.to_ip_addr()
|
||||
@ -373,6 +395,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
node_b,
|
||||
&udp_dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort.clone()
|
||||
) {
|
||||
// Does node A have a direct udp dialinfo that node B can reach?
|
||||
if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes(
|
||||
@ -380,6 +403,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
node_a,
|
||||
&udp_dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort.clone(),
|
||||
) {
|
||||
// Ensure we aren't on the same public IP address (no hairpin nat)
|
||||
if reverse_udp_did.dial_info.to_ip_addr()
|
||||
@ -422,6 +446,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
|
||||
&node_b_relay,
|
||||
&dial_info_filter,
|
||||
sequencing,
|
||||
dif_sort.clone()
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
@ -496,6 +521,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
|
||||
peer_b: &PeerInfo,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
sequencing: Sequencing,
|
||||
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
|
||||
) -> ContactMethod {
|
||||
// Scope the filter down to protocols node A can do outbound
|
||||
let dial_info_filter = dial_info_filter.filtered(
|
||||
@ -504,20 +530,31 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
|
||||
.with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()),
|
||||
);
|
||||
|
||||
// Get first filtered dialinfo
|
||||
let (sort, dial_info_filter) = match sequencing {
|
||||
Sequencing::NoPreference => (None, dial_info_filter),
|
||||
Sequencing::PreferOrdered => (
|
||||
Some(DialInfoDetail::ordered_sequencing_sort),
|
||||
dial_info_filter,
|
||||
),
|
||||
Sequencing::EnsureOrdered => (
|
||||
Some(DialInfoDetail::ordered_sequencing_sort),
|
||||
dial_info_filter.filtered(
|
||||
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
|
||||
),
|
||||
),
|
||||
// Apply sequencing and get sort
|
||||
// Include sorting by external dial info sort for rotating through dialinfo
|
||||
// based on an external preference table, for example the one kept by
|
||||
// AddressFilter to deprioritize dialinfo that have recently failed to connect
|
||||
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
|
||||
let sort: Option<Box<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if ordered {
|
||||
if let Some(dif_sort) = dif_sort {
|
||||
Some(Box::new(move |a, b| {
|
||||
let mut ord = dif_sort(a,b);
|
||||
if ord == core::cmp::Ordering::Equal {
|
||||
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
|
||||
}
|
||||
ord
|
||||
}))
|
||||
} else {
|
||||
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
|
||||
}
|
||||
} else {
|
||||
if let Some(dif_sort) = dif_sort {
|
||||
Some(Box::new(move |a,b| { dif_sort(a,b) }))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// If the filter is dead then we won't be able to connect
|
||||
if dial_info_filter.is_dead() {
|
||||
return ContactMethod::Unreachable;
|
||||
|
@ -226,9 +226,10 @@ impl RoutingTableInner {
|
||||
peer_b: &PeerInfo,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
sequencing: Sequencing,
|
||||
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
|
||||
) -> ContactMethod {
|
||||
self.with_routing_domain(routing_domain, |rdd| {
|
||||
rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing)
|
||||
rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing, dif_sort)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -403,7 +403,7 @@ packages:
|
||||
path: ".."
|
||||
relative: true
|
||||
source: path
|
||||
version: "0.1.9"
|
||||
version: "0.1.10"
|
||||
web:
|
||||
dependency: transitive
|
||||
description:
|
||||
|
Loading…
Reference in New Issue
Block a user