various fixes, including node_ref last_connection sorting problem

This commit is contained in:
John Smith 2022-10-04 11:27:38 -04:00
parent 0a01c0d23e
commit 4b2164a546
16 changed files with 740 additions and 517 deletions

936
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -77,4 +77,5 @@ rustup target add aarch64-linux-android armv7-linux-androideabi i686-linux-andro
cargo install wasm-bindgen-cli cargo install wasm-bindgen-cli
# Ensure packages are installed # Ensure packages are installed
sudo apt-get install libc6-dev-i386 libc6:i386 libncurses5:i386 libstdc++6:i386 lib32z1 libbz2-1.0:i386 openjdk-11-jdk llvm wabt capnproto sudo apt-get install libc6-dev-i386 libc6:i386 libncurses5:i386 libstdc++6:i386 lib32z1 libbz2-1.0:i386 openjdk-11-jdk llvm wabt capnproto protobuf-compiler

View File

@ -114,5 +114,5 @@ if [ "$BREW_USER" == "" ]; then
BREW_USER=`whoami` BREW_USER=`whoami`
fi fi
fi fi
sudo -H -u $BREW_USER brew install capnp cmake wabt llvm sudo -H -u $BREW_USER brew install capnp cmake wabt llvm protobuf

View File

@ -26,6 +26,7 @@ struct ConnectionManagerArc {
connection_initial_timeout_ms: u32, connection_initial_timeout_ms: u32,
connection_inactivity_timeout_ms: u32, connection_inactivity_timeout_ms: u32,
connection_table: ConnectionTable, connection_table: ConnectionTable,
address_lock_table: AsyncTagLockTable<SocketAddr>,
inner: Mutex<Option<ConnectionManagerInner>>, inner: Mutex<Option<ConnectionManagerInner>>,
} }
impl core::fmt::Debug for ConnectionManagerArc { impl core::fmt::Debug for ConnectionManagerArc {
@ -69,6 +70,7 @@ impl ConnectionManager {
connection_initial_timeout_ms, connection_initial_timeout_ms,
connection_inactivity_timeout_ms, connection_inactivity_timeout_ms,
connection_table: ConnectionTable::new(config), connection_table: ConnectionTable::new(config),
address_lock_table: AsyncTagLockTable::new(),
inner: Mutex::new(None), inner: Mutex::new(None),
} }
} }
@ -196,7 +198,7 @@ impl ConnectionManager {
} }
// Returns a network connection if one already is established // Returns a network connection if one already is established
#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> { pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
self.arc self.arc
.connection_table .connection_table
@ -236,11 +238,6 @@ impl ConnectionManager {
did_kill did_kill
} }
/// Locak remote address
// async fn lock_remote_address(&self, remote_addr: SocketAddr) -> {
// }
/// Called when we want to create a new connection or get the current one that already exists /// Called when we want to create a new connection or get the current one that already exists
/// This will kill off any connections that are in conflict with the new connection to be made /// This will kill off any connections that are in conflict with the new connection to be made
/// in order to make room for the new connection in the system's connection table /// in order to make room for the new connection in the system's connection table
@ -251,18 +248,17 @@ impl ConnectionManager {
local_addr: Option<SocketAddr>, local_addr: Option<SocketAddr>,
dial_info: DialInfo, dial_info: DialInfo,
) -> EyreResult<NetworkResult<ConnectionHandle>> { ) -> EyreResult<NetworkResult<ConnectionHandle>> {
warn!( // Async lock on the remote address for atomicity per remote
let peer_address = dial_info.to_peer_address();
let remote_addr = peer_address.to_socket_addr();
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr);
log_net!(
"== get_or_create_connection local_addr={:?} dial_info={:?}", "== get_or_create_connection local_addr={:?} dial_info={:?}",
local_addr.green(), local_addr.green(),
dial_info.green() dial_info.green()
); );
// Make a connection descriptor for this dialinfo
let peer_address = dial_info.to_peer_address();
// Async lock on the remote address for atomicity
//let _lock_guard = self.lock_remote_address(peer_address.to_socket_addr());
// Kill off any possibly conflicting connections // Kill off any possibly conflicting connections
let did_kill = self.kill_off_colliding_connections(&dial_info).await; let did_kill = self.kill_off_colliding_connections(&dial_info).await;
let mut retry_count = if did_kill { 2 } else { 0 }; let mut retry_count = if did_kill { 2 } else { 0 };
@ -299,6 +295,22 @@ impl ConnectionManager {
} }
Err(e) => { Err(e) => {
if retry_count == 0 { if retry_count == 0 {
// Try one last time to return a connection from the table, in case
// an 'accept' happened at literally the same time as our connect
if let Some(conn) = self
.arc
.connection_table
.get_last_connection_by_remote(peer_address)
{
log_net!(
"== Returning existing connection in race local_addr={:?} peer_address={:?}",
local_addr.green(),
peer_address.green()
);
return Ok(NetworkResult::Value(conn));
}
return Err(e).wrap_err("failed to connect"); return Err(e).wrap_err("failed to connect");
} }
} }

View File

@ -144,7 +144,7 @@ impl ConnectionTable {
let mut out_conn = None; let mut out_conn = None;
if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] { if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] {
if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].remove_lru() { if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].remove_lru() {
debug!("connection lru out: {:?}", lru_conn); log_net!(debug "connection lru out: {:?}", lru_conn);
out_conn = Some(lru_conn); out_conn = Some(lru_conn);
Self::remove_connection_records(&mut *inner, lruk); Self::remove_connection_records(&mut *inner, lruk);
} }
@ -158,7 +158,8 @@ impl ConnectionTable {
Ok(out_conn) Ok(out_conn)
} }
#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
#[allow(dead_code)]
pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option<ConnectionHandle> { pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option<ConnectionHandle> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let protocol_index = *inner.protocol_index_by_id.get(&id)?; let protocol_index = *inner.protocol_index_by_id.get(&id)?;
@ -166,7 +167,7 @@ impl ConnectionTable {
Some(out.get_handle()) Some(out.get_handle())
} }
#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
pub fn get_connection_by_descriptor( pub fn get_connection_by_descriptor(
&self, &self,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
@ -179,7 +180,7 @@ impl ConnectionTable {
Some(out.get_handle()) Some(out.get_handle())
} }
#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<ConnectionHandle> { pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<ConnectionHandle> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -189,7 +190,8 @@ impl ConnectionTable {
Some(out.get_handle()) Some(out.get_handle())
} }
#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
#[allow(dead_code)]
pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec<NetworkConnectionId> { pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec<NetworkConnectionId> {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner inner

View File

@ -1949,47 +1949,53 @@ impl NetworkManager {
.clone() .clone()
.unlocked_inner .unlocked_inner
.node_info_update_single_future .node_info_update_single_future
.single_spawn(async move { .single_spawn(
// Only update if we actually have valid signed node info for this routing domain async move {
if !this.routing_table().has_valid_own_node_info(routing_domain) { // Only update if we actually have valid signed node info for this routing domain
trace!( if !this.routing_table().has_valid_own_node_info(routing_domain) {
trace!(
"not sending node info update because our network class is not yet valid" "not sending node info update because our network class is not yet valid"
); );
return; return;
}
// Get the list of refs to all nodes to update
let cur_ts = intf::get_timestamp();
let node_refs =
this.routing_table()
.get_nodes_needing_updates(routing_domain, cur_ts, all);
// Send the updates
log_net!(debug "Sending node info updates to {} nodes", node_refs.len());
let mut unord = FuturesUnordered::new();
for nr in node_refs {
let rpc = this.rpc_processor();
unord.push(
async move {
// Update the node
if let Err(e) = rpc
.rpc_call_node_info_update(nr.clone(), routing_domain)
.await
{
// Not fatal, but we should be able to see if this is happening
trace!("failed to send node info update to {:?}: {}", nr, e);
return;
}
// Mark the node as having seen our node info
nr.set_seen_our_node_info(routing_domain);
}
.instrument(Span::current()),
);
}
// Wait for futures to complete
while unord.next().await.is_some() {}
log_rtab!(debug "Finished sending node updates");
} }
.instrument(Span::current()),
// Get the list of refs to all nodes to update )
let cur_ts = intf::get_timestamp();
let node_refs =
this.routing_table()
.get_nodes_needing_updates(routing_domain, cur_ts, all);
// Send the updates
log_net!(debug "Sending node info updates to {} nodes", node_refs.len());
let mut unord = FuturesUnordered::new();
for nr in node_refs {
let rpc = this.rpc_processor();
unord.push(async move {
// Update the node
if let Err(e) = rpc
.rpc_call_node_info_update(nr.clone(), routing_domain)
.await
{
// Not fatal, but we should be able to see if this is happening
trace!("failed to send node info update to {:?}: {}", nr, e);
return;
}
// Mark the node as having seen our node info
nr.set_seen_our_node_info(routing_domain);
});
}
// Wait for futures to complete
while unord.next().await.is_some() {}
log_rtab!(debug "Finished sending node updates");
})
.await; .await;
} }
} }

View File

@ -255,6 +255,13 @@ impl DiscoveryContext {
{ {
return Some(external_mapped_dial_info); return Some(external_mapped_dial_info);
} else { } else {
warn!("UPNP port mapping succeeded but port {}/{} is still unreachable.\nYou may need to add a local firewall allowed port on this machine.\n",
local_port, match llpt {
LowLevelProtocolType::UDP => "udp",
LowLevelProtocolType::TCP => "tcp",
}
);
// release the mapping if we're still unreachable // release the mapping if we're still unreachable
let _ = self let _ = self
.net .net
@ -628,6 +635,7 @@ impl Network {
} }
Some(vec![udpv4_context]) Some(vec![udpv4_context])
} }
.instrument(trace_span!("do_public_dial_info_check UDPv4"))
.boxed(), .boxed(),
); );
} }
@ -647,6 +655,7 @@ impl Network {
} }
Some(vec![udpv6_context]) Some(vec![udpv6_context])
} }
.instrument(trace_span!("do_public_dial_info_check UDPv6"))
.boxed(), .boxed(),
); );
} }
@ -669,6 +678,7 @@ impl Network {
} }
Some(vec![tcpv4_context]) Some(vec![tcpv4_context])
} }
.instrument(trace_span!("do_public_dial_info_check TCPv4"))
.boxed(), .boxed(),
); );
} }
@ -688,6 +698,7 @@ impl Network {
} }
Some(vec![wsv4_context]) Some(vec![wsv4_context])
} }
.instrument(trace_span!("do_public_dial_info_check WSv4"))
.boxed(), .boxed(),
); );
} }
@ -710,6 +721,7 @@ impl Network {
} }
Some(vec![tcpv6_context]) Some(vec![tcpv6_context])
} }
.instrument(trace_span!("do_public_dial_info_check TCPv6"))
.boxed(), .boxed(),
); );
} }
@ -729,6 +741,7 @@ impl Network {
} }
Some(vec![wsv6_context]) Some(vec![wsv6_context])
} }
.instrument(trace_span!("do_public_dial_info_check WSv6"))
.boxed(), .boxed(),
); );
} }

View File

@ -109,11 +109,11 @@ impl Network {
}; };
// XXX // XXX
warn!( // warn!(
"DEBUGACCEPT: local={} remote={}", // "DEBUGACCEPT: local={} remote={}",
tcp_stream.local_addr().unwrap(), // tcp_stream.local_addr().unwrap(),
tcp_stream.peer_addr().unwrap(), // tcp_stream.peer_addr().unwrap(),
); // );
let listener_state = listener_state.clone(); let listener_state = listener_state.clone();
let connection_manager = connection_manager.clone(); let connection_manager = connection_manager.clone();

View File

@ -84,7 +84,7 @@ impl Network {
} }
} }
} }
}; }.instrument(Span::current());
protocol_handlers_unordered.push(ph_future); protocol_handlers_unordered.push(ph_future);
} }

View File

@ -185,7 +185,7 @@ pub async fn nonblocking_connect(
let socket2_addr = socket2::SockAddr::from(addr); let socket2_addr = socket2::SockAddr::from(addr);
// XXX // XXX
let bind_local_addr = socket.local_addr().unwrap().as_socket().unwrap(); //let bind_local_addr = socket.local_addr().unwrap().as_socket().unwrap();
// Connect to the remote address // Connect to the remote address
match socket.connect(&socket2_addr) { match socket.connect(&socket2_addr) {
@ -197,24 +197,24 @@ pub async fn nonblocking_connect(
} }
.map_err(|e| { .map_err(|e| {
// XXX // XXX
warn!( // warn!(
"DEBUGCONNECT XXXFAILXXX: bind={} local={} remote={}\nbacktrace={:?}", // "DEBUGCONNECT XXXFAILXXX: bind={} local={} remote={}\nbacktrace={:?}",
bind_local_addr, // bind_local_addr,
socket.local_addr().unwrap().as_socket().unwrap(), // socket.local_addr().unwrap().as_socket().unwrap(),
addr, // addr,
backtrace::Backtrace::new(), // backtrace::Backtrace::new(),
); // );
e e
})?; })?;
// XXX // XXX
warn!( // warn!(
"DEBUGCONNECT: bind={} local={} remote={}\nbacktrace={:?}", // "DEBUGCONNECT: bind={} local={} remote={}\nbacktrace={:?}",
bind_local_addr, // bind_local_addr,
socket.local_addr().unwrap().as_socket().unwrap(), // socket.local_addr().unwrap().as_socket().unwrap(),
addr, // addr,
backtrace::Backtrace::new(), // backtrace::Backtrace::new(),
); // );
let async_stream = Async::new(std::net::TcpStream::from(socket))?; let async_stream = Async::new(std::net::TcpStream::from(socket))?;

View File

@ -210,11 +210,13 @@ impl NetworkConnection {
Ok(NetworkResult::Value(out)) Ok(NetworkResult::Value(out))
} }
#[allow(dead_code)]
pub fn stats(&self) -> NetworkConnectionStats { pub fn stats(&self) -> NetworkConnectionStats {
let stats = self.stats.lock(); let stats = self.stats.lock();
stats.clone() stats.clone()
} }
#[allow(dead_code)]
pub fn established_time(&self) -> u64 { pub fn established_time(&self) -> u64 {
self.established_time self.established_time
} }
@ -260,10 +262,11 @@ impl NetworkConnection {
need_sender = false; need_sender = false;
let sender_fut = receiver.recv_async().then(|res| async { let sender_fut = receiver.recv_async().then(|res| async {
match res { match res {
Ok((span_id, message)) => { Ok((_span_id, message)) => {
let recv_span = span!(parent: None, Level::TRACE, "process_connection recv"); let recv_span = span!(Level::TRACE, "process_connection recv");
recv_span.follows_from(span_id); // xxx: causes crash (Missing otel data span extensions)
// recv_span.follows_from(span_id);
// send the packet // send the packet
if let Err(e) = Self::send_internal( if let Err(e) = Self::send_internal(

View File

@ -246,7 +246,7 @@ impl ReceiptManager {
if let Some(callback) = if let Some(callback) =
Self::perform_callback(ReceiptEvent::Expired, &mut expired_record_mut) Self::perform_callback(ReceiptEvent::Expired, &mut expired_record_mut)
{ {
callbacks.push(callback) callbacks.push(callback.instrument(Span::current()))
} }
} }

View File

@ -264,32 +264,62 @@ impl BucketEntryInner {
self.last_connections.clear(); self.last_connections.clear();
} }
// Gets the best 'last connection' that matches a set of routing domain, protocol types and address types // Gets the 'last connection' that matches a specific connection key
pub(super) fn last_connection( // pub(super) fn last_connection(
// &self,
// protocol_type: ProtocolType,
// address_type: AddressType,
// ) -> Option<(ConnectionDescriptor, u64)> {
// let key = LastConnectionKey(protocol_type, address_type);
// self.last_connections.get(&key).cloned()
// }
// Gets all the 'last connections' that match a particular filter
pub(super) fn last_connections(
&self, &self,
routing_table_inner: &RoutingTableInner, routing_table_inner: &RoutingTableInner,
node_ref_filter: Option<NodeRefFilter>, filter: Option<NodeRefFilter>,
) -> Option<(ConnectionDescriptor, u64)> { ) -> Vec<(ConnectionDescriptor, u64)> {
// Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same let mut out: Vec<(ConnectionDescriptor, u64)> = self
let nrf = node_ref_filter.unwrap_or_default(); .last_connections
for pt in nrf.dial_info_filter.protocol_type_set { .iter()
for at in nrf.dial_info_filter.address_type_set { .filter_map(|(k, v)| {
let key = LastConnectionKey(pt, at); let include = if let Some(filter) = &filter {
if let Some(v) = self.last_connections.get(&key) { let remote_address = v.0.remote_address().address();
// Verify this connection could be in the filtered routing domain if let Some(routing_domain) = RoutingTable::routing_domain_for_address_inner(
let address = v.0.remote_address().address(); routing_table_inner,
if let Some(rd) = remote_address,
RoutingTable::routing_domain_for_address_inner(routing_table_inner, address) ) {
{ if filter.routing_domain_set.contains(routing_domain)
if nrf.routing_domain_set.contains(rd) { && filter.dial_info_filter.protocol_type_set.contains(k.0)
return Some(*v); && filter.dial_info_filter.address_type_set.contains(k.1)
{
// matches filter
true
} else {
// does not match filter
false
} }
} else {
// no valid routing domain
false
} }
} else {
// no filter
true
};
if include {
Some(v.clone())
} else {
None
} }
} })
} .collect();
None // Sort with newest timestamps first
out.sort_by(|a, b| b.1.cmp(&a.1));
out
} }
pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) { pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) {
self.min_max_version = Some(min_max_version); self.min_max_version = Some(min_max_version);
} }

View File

@ -318,24 +318,29 @@ impl NodeRef {
} }
pub fn last_connection(&self) -> Option<ConnectionDescriptor> { pub fn last_connection(&self) -> Option<ConnectionDescriptor> {
// Get the last connection and the last time we saw anything with this connection // Get the last connections and the last time we saw anything with this connection
let (last_connection, last_seen) = // Filtered first and then sorted by most recent
self.operate(|rti, e| e.last_connection(rti, self.filter.clone()))?; let last_connections = self.operate(|rti, e| e.last_connections(rti, self.filter.clone()));
// Should we check the connection table? // Do some checks to ensure these are possibly still 'live'
if last_connection.protocol_type().is_connection_oriented() { for (last_connection, last_seen) in last_connections {
// Look the connection up in the connection manager and see if it's still there // Should we check the connection table?
let connection_manager = self.routing_table.network_manager().connection_manager(); if last_connection.protocol_type().is_connection_oriented() {
connection_manager.get_connection(last_connection)?; // Look the connection up in the connection manager and see if it's still there
} else { let connection_manager = self.routing_table.network_manager().connection_manager();
// If this is not connection oriented, then we check our last seen time if connection_manager.get_connection(last_connection).is_some() {
// to see if this mapping has expired (beyond our timeout) return Some(last_connection);
let cur_ts = intf::get_timestamp(); }
if (last_seen + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) < cur_ts { } else {
return None; // If this is not connection oriented, then we check our last seen time
// to see if this mapping has expired (beyond our timeout)
let cur_ts = intf::get_timestamp();
if (last_seen + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts {
return Some(last_connection);
}
} }
} }
Some(last_connection) None
} }
pub fn clear_last_connections(&self) { pub fn clear_last_connections(&self) {

View File

@ -941,13 +941,12 @@ impl RPCProcessor {
stop_token: StopToken, stop_token: StopToken,
receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>, receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>,
) { ) {
while let Ok(Ok((span_id, msg))) = while let Ok(Ok((_span_id, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await receiver.recv_async().timeout_at(stop_token.clone()).await
{ {
let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker"); let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
//let rpc_worker_span = span!(Level::TRACE, "rpc_worker"); // xxx: causes crash (Missing otel data span extensions)
// fixme: causes crashes? "Missing otel data span extensions"?? // rpc_worker_span.follows_from(span_id);
rpc_worker_span.follows_from(span_id);
let _ = self let _ = self
.process_rpc_message(msg) .process_rpc_message(msg)
.instrument(rpc_worker_span) .instrument(rpc_worker_span)

View File

@ -127,8 +127,8 @@ where
let (_span_id, ret) = res.take_value().unwrap(); let (_span_id, ret) = res.take_value().unwrap();
let end_ts = intf::get_timestamp(); let end_ts = intf::get_timestamp();
// fixme: causes crashes? "Missing otel data span extensions"?? //xxx: causes crash (Missing otel data span extensions)
//Span::current().follows_from(span_id); // Span::current().follows_from(span_id);
(ret, end_ts - start_ts) (ret, end_ts - start_ts)
})) }))