Fixes for validation of dial info

This commit is contained in:
John Smith 2022-06-05 13:23:18 -04:00
parent cfcf430a99
commit 1eb26758e9
11 changed files with 112 additions and 46 deletions

View File

@ -127,7 +127,7 @@ impl ConnectionManager {
if let Some(conn) = inner if let Some(conn) = inner
.connection_table .connection_table
.get_last_connection_by_remote(descriptor.remote) .get_last_connection_by_remote(descriptor.remote())
{ {
log_net!( log_net!(
"== Returning existing connection local_addr={:?} peer_address={:?}", "== Returning existing connection local_addr={:?} peer_address={:?}",
@ -138,34 +138,39 @@ impl ConnectionManager {
return Ok(conn); return Ok(conn);
} }
// Drop any other protocols connections that have the same local addr // Drop any other protocols connections to this remote that have the same local addr
// otherwise this connection won't succeed due to binding // otherwise this connection won't succeed due to binding
let mut killed = false;
if let Some(local_addr) = local_addr { if let Some(local_addr) = local_addr {
if local_addr.port() != 0 { if local_addr.port() != 0 {
for pt in [ProtocolType::TCP, ProtocolType::WS, ProtocolType::WSS] { for pt in [ProtocolType::TCP, ProtocolType::WS, ProtocolType::WSS] {
let pa = PeerAddress::new(descriptor.remote.socket_address, pt); let pa = PeerAddress::new(descriptor.remote_address().clone(), pt);
for desc in inner for prior_descriptor in inner
.connection_table .connection_table
.get_connection_descriptors_by_remote(pa) .get_connection_descriptors_by_remote(pa)
{ {
let mut kill = false; let mut kill = false;
if let Some(conn_local) = desc.local { // See if the local address would collide
if let Some(prior_local) = prior_descriptor.local() {
if (local_addr.ip().is_unspecified() if (local_addr.ip().is_unspecified()
|| (local_addr.ip() == conn_local.to_ip_addr())) || prior_local.to_ip_addr().is_unspecified()
&& conn_local.port() == local_addr.port() || (local_addr.ip() == prior_local.to_ip_addr()))
&& prior_local.port() == local_addr.port()
{ {
kill = true; kill = true;
} }
} }
if kill { if kill {
log_net!(debug log_net!(debug
">< Terminating connection local_addr={:?} peer_address={:?}", ">< Terminating connection prior_descriptor={:?}",
local_addr.green(), prior_descriptor
pa.green()
); );
if let Err(e) = inner.connection_table.remove_connection(descriptor) { if let Err(e) =
inner.connection_table.remove_connection(prior_descriptor)
{
log_net!(error e); log_net!(error e);
} }
killed = true;
} }
} }
} }
@ -173,7 +178,21 @@ impl ConnectionManager {
} }
// Attempt new connection // Attempt new connection
let conn = ProtocolNetworkConnection::connect(local_addr, dial_info).await?; let mut retry_count = if killed { 2 } else { 0 };
let conn = loop {
match ProtocolNetworkConnection::connect(local_addr, dial_info.clone()).await {
Ok(v) => break Ok(v),
Err(e) => {
if retry_count == 0 {
break Err(e);
}
log_net!(debug "get_or_create_connection retries left: {}", retry_count);
retry_count -= 1;
intf::sleep(500).await;
}
}
}?;
self.on_new_protocol_network_connection(&mut *inner, conn) self.on_new_protocol_network_connection(&mut *inner, conn)
} }

View File

@ -43,7 +43,7 @@ impl ConnectionTable {
pub fn add_connection(&mut self, conn: NetworkConnection) -> Result<(), String> { pub fn add_connection(&mut self, conn: NetworkConnection) -> Result<(), String> {
let descriptor = conn.connection_descriptor(); let descriptor = conn.connection_descriptor();
let ip_addr = descriptor.remote.socket_address.to_ip_addr(); let ip_addr = descriptor.remote_address().to_ip_addr();
let index = protocol_to_index(descriptor.protocol_type()); let index = protocol_to_index(descriptor.protocol_type());
if self.conn_by_descriptor[index].contains_key(&descriptor) { if self.conn_by_descriptor[index].contains_key(&descriptor) {
@ -72,7 +72,7 @@ impl ConnectionTable {
// add connection records // add connection records
let descriptors = self let descriptors = self
.descriptors_by_remote .descriptors_by_remote
.entry(descriptor.remote) .entry(descriptor.remote())
.or_default(); .or_default();
warn!("add_connection: {:?}", descriptor); warn!("add_connection: {:?}", descriptor);
@ -125,10 +125,10 @@ impl ConnectionTable {
} }
fn remove_connection_records(&mut self, descriptor: ConnectionDescriptor) { fn remove_connection_records(&mut self, descriptor: ConnectionDescriptor) {
let ip_addr = descriptor.remote.socket_address.to_ip_addr(); let ip_addr = descriptor.remote_address().to_ip_addr();
// conns_by_remote // conns_by_remote
match self.descriptors_by_remote.entry(descriptor.remote) { match self.descriptors_by_remote.entry(descriptor.remote()) {
Entry::Vacant(_) => { Entry::Vacant(_) => {
panic!("inconsistency in connection table") panic!("inconsistency in connection table")
} }

View File

@ -18,9 +18,9 @@ pub mod tests;
pub use network_connection::*; pub use network_connection::*;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
use connection_handle::*;
use connection_limits::*; use connection_limits::*;
use connection_manager::*; use connection_manager::*;
use connection_handle::*;
use dht::*; use dht::*;
use hashlink::LruCache; use hashlink::LruCache;
use intf::*; use intf::*;
@ -1048,7 +1048,7 @@ impl NetworkManager {
); );
// Network accounting // Network accounting
self.stats_packet_rcvd(descriptor.remote.to_socket_addr().ip(), data.len() as u64); self.stats_packet_rcvd(descriptor.remote_address().to_ip_addr(), data.len() as u64);
// Ensure we can read the magic number // Ensure we can read the magic number
if data.len() < 4 { if data.len() < 4 {

View File

@ -310,10 +310,10 @@ impl Network {
// Handle connectionless protocol // Handle connectionless protocol
if descriptor.protocol_type() == ProtocolType::UDP { if descriptor.protocol_type() == ProtocolType::UDP {
// send over the best udp socket we have bound since UDP is not connection oriented // send over the best udp socket we have bound since UDP is not connection oriented
let peer_socket_addr = descriptor.remote.to_socket_addr(); let peer_socket_addr = descriptor.remote().to_socket_addr();
if let Some(ph) = self.find_best_udp_protocol_handler( if let Some(ph) = self.find_best_udp_protocol_handler(
&peer_socket_addr, &peer_socket_addr,
&descriptor.local.map(|sa| sa.to_socket_addr()), &descriptor.local().map(|sa| sa.to_socket_addr()),
) { ) {
log_net!( log_net!(
"send_data_to_existing_connection connectionless to {:?}", "send_data_to_existing_connection connectionless to {:?}",
@ -345,7 +345,7 @@ impl Network {
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(descriptor.remote.to_socket_addr().ip(), data_len as u64); .stats_packet_sent(descriptor.remote().to_socket_addr().ip(), data_len as u64);
// Data was consumed // Data was consumed
Ok(None) Ok(None)

View File

@ -150,6 +150,12 @@ impl DiscoveryContext {
redirect: bool, redirect: bool,
) -> bool { ) -> bool {
let rpc = self.routing_table.rpc_processor(); let rpc = self.routing_table.rpc_processor();
// asking for node validation doesn't have to use the dial info filter of the dial info we are validating
let mut node_ref = node_ref.clone();
node_ref.set_filter(None);
// ask the node to send us a dial info validation receipt
rpc.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect) rpc.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect)
.await .await
.map_err(logthru_net!( .map_err(logthru_net!(
@ -229,7 +235,7 @@ impl DiscoveryContext {
// If we know we are not behind NAT, check our firewall status // If we know we are not behind NAT, check our firewall status
pub async fn protocol_process_no_nat(&self) -> Result<(), String> { pub async fn protocol_process_no_nat(&self) -> Result<(), String> {
let (node_b, external_1_dial_info) = { let (node_1, external_1_dial_info) = {
let inner = self.inner.lock(); let inner = self.inner.lock();
( (
inner.node_1.as_ref().unwrap().clone(), inner.node_1.as_ref().unwrap().clone(),
@ -239,7 +245,7 @@ impl DiscoveryContext {
// Do a validate_dial_info on the external address from a redirected node // Do a validate_dial_info on the external address from a redirected node
if self if self
.validate_dial_info(node_b.clone(), external_1_dial_info.clone(), true) .validate_dial_info(node_1.clone(), external_1_dial_info.clone(), true)
.await .await
{ {
// Add public dial info with Direct dialinfo class // Add public dial info with Direct dialinfo class

View File

@ -55,7 +55,7 @@ impl Network {
// Network accounting // Network accounting
network_manager.stats_packet_rcvd( network_manager.stats_packet_rcvd(
descriptor.remote.to_socket_addr().ip(), descriptor.remote_address().to_ip_addr(),
size as u64, size as u64,
); );

View File

@ -180,10 +180,10 @@ impl RawTcpProtocolHandler {
// Wrap the stream in a network connection and return it // Wrap the stream in a network connection and return it
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(
ConnectionDescriptor { ConnectionDescriptor::new(
local: Some(SocketAddress::from_socket_addr(actual_local_address)), dial_info.to_peer_address(),
remote: dial_info.to_peer_address(), SocketAddress::from_socket_addr(actual_local_address),
}, ),
ps, ps,
ts, ts,
)); ));

View File

@ -243,10 +243,10 @@ impl WebsocketProtocolHandler {
.map_err(logthru_net!())?; .map_err(logthru_net!())?;
// Make our connection descriptor // Make our connection descriptor
let descriptor = ConnectionDescriptor { let descriptor = ConnectionDescriptor::new(
local: Some(SocketAddress::from_socket_addr(actual_local_addr)), dial_info.to_peer_address(),
remote: dial_info.to_peer_address(), SocketAddress::from_socket_addr(actual_local_addr),
}; );
// Negotiate TLS if this is WSS // Negotiate TLS if this is WSS
if tls { if tls {
let connector = TlsConnector::default(); let connector = TlsConnector::default();

View File

@ -317,9 +317,8 @@ impl NetworkConnection {
} }
log_net!( log_net!(
"== Connection loop finished local_addr={:?} remote={:?}", "== Connection loop finished descriptor={:?}",
descriptor.local.green(), descriptor.green()
descriptor.remote.green()
); );
connection_manager connection_manager

View File

@ -799,7 +799,7 @@ impl RPCProcessor {
let socket_address = peer_noderef let socket_address = peer_noderef
.last_connection() .last_connection()
.await .await
.map(|c| c.remote.socket_address); .map(|c| c.remote_address().clone());
SenderInfo { socket_address } SenderInfo { socket_address }
} }
@ -905,27 +905,51 @@ impl RPCProcessor {
// Redirect this request if we are asked to // Redirect this request if we are asked to
if redirect { if redirect {
// Find peers capable of validating this dial info
// We filter on the -outgoing- protocol capability status not the node's dial info
// Use the address type though, to ensure we reach an ipv6 capable node if this is
// an ipv6 address
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let filter = dial_info.make_filter(true); let filter = DialInfoFilter::global().with_address_type(dial_info.address_type());
let sender_id = rpcreader.header.envelope.get_sender_id(); let sender_id = rpcreader.header.envelope.get_sender_id();
let peers = routing_table.find_fast_public_nodes_filtered(&filter); let mut peers = routing_table.find_fast_public_nodes_filtered(&filter);
if peers.is_empty() { if peers.is_empty() {
return Err(rpc_error_internal(format!( return Err(rpc_error_internal(format!(
"no peers matching filter '{:?}'", "no peers matching filter '{:?}'",
filter filter
))); )));
} }
for peer in peers { for peer in &mut peers {
// Ensure the peer is not the one asking for the validation // Ensure the peer is not the one asking for the validation
if peer.node_id() == sender_id { if peer.node_id() == sender_id {
continue; continue;
} }
// Release the filter on the peer because we don't need to send the redirect with the filter
// we just wanted to make sure we only selected nodes that were capable of
// using the correct protocol for the dial info being validated
peer.set_filter(None);
// Ensure the peer's status is known and that it is capable of
// making outbound connections for the dial info we want to verify
// and if this peer can validate dial info
let can_contact_dial_info = peer.operate(|e: &mut BucketEntry| {
if let Some(ni) = &e.node_info() {
ni.outbound_protocols.contains(dial_info.protocol_type()) && ni.can_validate_dial_info()
} else {
false
}
});
if !can_contact_dial_info {
continue;
}
// See if this peer will validate dial info // See if this peer will validate dial info
let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| { let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| {
if let Some(ni) = &e.peer_stats().status { if let Some(status) = &e.peer_stats().status {
ni.will_validate_dial_info status.will_validate_dial_info
} else { } else {
true true
} }
@ -933,6 +957,7 @@ impl RPCProcessor {
if !will_validate_dial_info { if !will_validate_dial_info {
continue; continue;
} }
// Make a copy of the request, without the redirect flag // Make a copy of the request, without the redirect flag
let vdi_msg_reader = { let vdi_msg_reader = {
let mut vdi_msg = ::capnp::message::Builder::new_default(); let mut vdi_msg = ::capnp::message::Builder::new_default();

View File

@ -1315,8 +1315,8 @@ impl PeerInfo {
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
pub struct PeerAddress { pub struct PeerAddress {
pub socket_address: SocketAddress, socket_address: SocketAddress,
pub protocol_type: ProtocolType, protocol_type: ProtocolType,
} }
impl PeerAddress { impl PeerAddress {
@ -1327,6 +1327,14 @@ impl PeerAddress {
} }
} }
pub fn socket_address(&self) -> &SocketAddress {
&self.socket_address
}
pub fn protocol_type(&self) -> ProtocolType {
self.protocol_type
}
pub fn to_socket_addr(&self) -> SocketAddr { pub fn to_socket_addr(&self) -> SocketAddr {
self.socket_address.to_socket_addr() self.socket_address.to_socket_addr()
} }
@ -1338,8 +1346,8 @@ impl PeerAddress {
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ConnectionDescriptor { pub struct ConnectionDescriptor {
pub remote: PeerAddress, remote: PeerAddress,
pub local: Option<SocketAddress>, local: Option<SocketAddress>,
} }
impl ConnectionDescriptor { impl ConnectionDescriptor {
@ -1355,6 +1363,15 @@ impl ConnectionDescriptor {
local: None, local: None,
} }
} }
pub fn remote(&self) -> PeerAddress {
self.remote
}
pub fn remote_address(&self) -> &SocketAddress {
self.remote.socket_address()
}
pub fn local(&self) -> Option<SocketAddress> {
self.local
}
pub fn protocol_type(&self) -> ProtocolType { pub fn protocol_type(&self) -> ProtocolType {
self.remote.protocol_type self.remote.protocol_type
} }