From d750b7c5c35e0fb493a01f1ac3a338f1570f1ce1 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 30 Oct 2023 20:17:07 -0400 Subject: [PATCH] wasm dht test passes --- .../src/network_manager/connection_manager.rs | 99 +++++++++++-------- .../src/network_manager/connection_table.rs | 84 +++++++++------- .../src/network_manager/network_connection.rs | 24 ++++- veilid-core/src/routing_table/node_ref.rs | 11 --- veilid-core/src/rpc_processor/mod.rs | 17 +++- veilid-core/src/rpc_processor/rpc_app_call.rs | 2 +- .../src/rpc_processor/rpc_find_node.rs | 3 +- .../src/rpc_processor/rpc_get_value.rs | 2 +- .../src/rpc_processor/rpc_set_value.rs | 2 +- veilid-core/src/rpc_processor/rpc_status.rs | 9 +- 10 files changed, 151 insertions(+), 102 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 4aabf0b0..c47cf32e 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -1,4 +1,5 @@ use super::*; +pub(crate) use connection_table::ConnectionRefKind; use connection_table::*; use network_connection::*; use stop_token::future::FutureExt; @@ -12,6 +13,40 @@ enum ConnectionManagerEvent { Dead(NetworkConnection), } +#[derive(Debug)] +pub(crate) struct ConnectionRefScope { + connection_manager: ConnectionManager, + descriptor: ConnectionDescriptor, + protect: bool, +} + +impl ConnectionRefScope { + pub fn new( + connection_manager: ConnectionManager, + descriptor: ConnectionDescriptor, + protect: bool, + ) -> Self { + connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef, protect); + Self { + connection_manager, + descriptor, + protect, + } + } +} + +impl Drop for ConnectionRefScope { + fn drop(&mut self) { + if !self.protect { + self.connection_manager.connection_ref( + self.descriptor, + ConnectionRefKind::RemoveRef, + false, + ); + } + } +} + #[derive(Debug)] struct ConnectionManagerInner { next_id: NetworkConnectionId, @@ -134,36 +169,6 @@ impl ConnectionManager { debug!("finished connection manager shutdown"); } - // Internal routine to see if we should keep this connection - // from being LRU removed. Used on our initiated relay connections and allocated routes - fn should_protect_connection(&self, conn: &NetworkConnection) -> bool { - let netman = self.network_manager(); - let routing_table = netman.routing_table(); - - // See if this is a relay connection - let remote_address = conn.connection_descriptor().remote_address().address(); - let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else { - return false; - }; - let Some(rn) = routing_table.relay_node(routing_domain) else { - return false; - }; - let relay_nr = rn.filtered_clone( - NodeRefFilter::new() - .with_routing_domain(routing_domain) - .with_address_type(conn.connection_descriptor().address_type()) - .with_protocol_type(conn.connection_descriptor().protocol_type()), - ); - let dids = relay_nr.all_filtered_dial_info_details(); - for did in dids { - if did.dial_info.address() == remote_address { - return true; - } - } - - false - } - // Internal routine to register new connection atomically. // Registers connection in the connection table for later access // and spawns a message processing loop for the connection @@ -188,16 +193,9 @@ impl ConnectionManager { None => bail!("not creating connection because we are stopping"), }; - let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); + let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); let handle = conn.get_handle(); - // See if this should be a protected connection - let protect = self.should_protect_connection(&conn); - if protect { - log_net!(debug "== PROTECTING connection: {} -> {}", id, conn.debug_print(get_aligned_timestamp())); - conn.protect(); - } - // Add to the connection table match self.arc.connection_table.add_connection(conn) { Ok(None) => { @@ -227,6 +225,15 @@ impl ConnectionManager { desc ))); } + Err(ConnectionTableAddError::TableFull(conn)) => { + // Connection table is full + let desc = conn.connection_descriptor(); + let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); + return Ok(NetworkResult::no_connection_other(format!( + "connection table is full: {:?}", + desc + ))); + } }; Ok(NetworkResult::Value(handle)) } @@ -239,10 +246,22 @@ impl ConnectionManager { } // Protects a network connection if one already is established - pub fn protect_connection(&self, descriptor: ConnectionDescriptor) -> bool { + fn connection_ref( + &self, + descriptor: ConnectionDescriptor, + kind: ConnectionRefKind, + protect: bool, + ) { self.arc .connection_table - .protect_connection_by_descriptor(descriptor) + .ref_connection_by_descriptor(descriptor, kind, protect); + } + pub fn connection_ref_scope( + &self, + descriptor: ConnectionDescriptor, + protect: bool, + ) -> ConnectionRefScope { + ConnectionRefScope::new(self.clone(), descriptor, protect) } /// Called when we want to create a new connection or get the current one that already exists diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 47bbd8b2..eeda548e 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -9,6 +9,8 @@ pub(in crate::network_manager) enum ConnectionTableAddError { AlreadyExists(NetworkConnection), #[error("Connection address was filtered")] AddressFilter(NetworkConnection, AddressFilterError), + #[error("Connection table is full")] + TableFull(NetworkConnection), } impl ConnectionTableAddError { @@ -18,6 +20,14 @@ impl ConnectionTableAddError { pub fn address_filter(conn: NetworkConnection, err: AddressFilterError) -> Self { ConnectionTableAddError::AddressFilter(conn, err) } + pub fn table_full(conn: NetworkConnection) -> Self { + ConnectionTableAddError::TableFull(conn) + } +} + +pub(crate) enum ConnectionRefKind { + AddRef, + RemoveRef, } /////////////////////////////////////////////////////////////////////////////// @@ -176,31 +186,35 @@ impl ConnectionTable { } }; + // if we have reached the maximum number of connections per protocol type + // then drop the least recently used connection that is not protected or referenced + let mut out_conn = None; + if inner.conn_by_id[protocol_index].len() >= inner.max_connections[protocol_index] { + // Find a free connection to terminate to make room + let dead_k = { + let Some(lruk) = inner.conn_by_id[protocol_index].iter().find_map(|(k, v)| { + if !v.is_in_use() { + Some(*k) + } else { + None + } + }) else { + // Can't make room, connection table is full + return Err(ConnectionTableAddError::table_full(network_connection)); + }; + lruk + }; + + let dead_conn = Self::remove_connection_records(&mut inner, dead_k); + log_net!(debug "== LRU Connection Killed: {} -> {}", dead_k, dead_conn.debug_print(get_aligned_timestamp())); + + out_conn = Some(dead_conn); + } + // Add the connection to the table let res = inner.conn_by_id[protocol_index].insert(id, network_connection); assert!(res.is_none()); - // if we have reached the maximum number of connections per protocol type - // then drop the least recently used connection - let mut out_conn = None; - if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] { - while let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() { - let lruk = *lruk; - - // Don't LRU protected connections - if lru_conn.is_protected() { - // Mark as recently used - log_net!(debug "== No LRU Out for PROTECTED connection: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp())); - inner.conn_by_id[protocol_index].get(&lruk); - continue; - } - - log_net!(debug "== LRU Connection Killed: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp())); - out_conn = Some(Self::remove_connection_records(&mut inner, lruk)); - break; - } - } - // add connection records inner.protocol_index_by_id.insert(id, protocol_index); inner.id_by_descriptor.insert(descriptor, id); @@ -218,18 +232,6 @@ impl ConnectionTable { Some(out.get_handle()) } - //#[instrument(level = "trace", skip(self), ret)] - #[allow(dead_code)] - pub fn protect_connection_by_id(&self, id: NetworkConnectionId) -> bool { - let mut inner = self.inner.lock(); - let Some(protocol_index) = inner.protocol_index_by_id.get(&id).copied() else { - return false; - }; - let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap(); - out.protect(); - true - } - //#[instrument(level = "trace", skip(self), ret)] pub fn get_connection_by_descriptor( &self, @@ -244,7 +246,12 @@ impl ConnectionTable { } //#[instrument(level = "trace", skip(self), ret)] - pub fn protect_connection_by_descriptor(&self, descriptor: ConnectionDescriptor) -> bool { + pub fn ref_connection_by_descriptor( + &self, + descriptor: ConnectionDescriptor, + ref_type: ConnectionRefKind, + protect: bool, + ) -> bool { let mut inner = self.inner.lock(); let Some(id) = inner.id_by_descriptor.get(&descriptor).copied() else { @@ -252,7 +259,14 @@ impl ConnectionTable { }; let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap(); - out.protect(); + match ref_type { + ConnectionRefKind::AddRef => out.change_ref_count(true), + ConnectionRefKind::RemoveRef => out.change_ref_count(false), + } + if protect { + out.protect(); + } + true } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 158a8e7f..8a782bdc 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -95,8 +95,18 @@ pub struct NetworkConnection { sender: flume::Sender<(Option, Vec)>, stop_source: Option, protected: bool, + ref_count: usize, } +impl Drop for NetworkConnection { + fn drop(&mut self) { + if self.ref_count != 0 && self.stop_source.is_some() { + log_net!(error "ref_count for network connection should be zero: {:?}", self.ref_count); + } + } +} + + impl NetworkConnection { pub(super) fn dummy(id: NetworkConnectionId, descriptor: ConnectionDescriptor) -> Self { // Create handle for sending (dummy is immediately disconnected) @@ -114,6 +124,7 @@ impl NetworkConnection { sender, stop_source: None, protected: false, + ref_count: 0, } } @@ -160,6 +171,7 @@ impl NetworkConnection { sender, stop_source: Some(stop_source), protected: false, + ref_count: 0, } } @@ -175,14 +187,22 @@ impl NetworkConnection { ConnectionHandle::new(self.connection_id, self.descriptor, self.sender.clone()) } - pub fn is_protected(&self) -> bool { - self.protected + pub fn is_in_use(&self) -> bool { + self.protected || self.ref_count > 0 } pub fn protect(&mut self) { self.protected = true; } + pub fn change_ref_count(&mut self, add: bool) { + if add { + self.ref_count += 1; + } else { + self.ref_count -= 1; + } + } + pub fn close(&mut self) { if let Some(stop_source) = self.stop_source.take() { // drop the stopper diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index d7993c1a..97104d3b 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -308,17 +308,6 @@ pub(crate) trait NodeRefBase: Sized { }) } - fn protect_last_connection(&self) -> bool { - if let Some(descriptor) = self.last_connection() { - self.routing_table() - .network_manager() - .connection_manager() - .protect_connection(descriptor) - } else { - false - } - } - fn has_any_dial_info(&self) -> bool { self.operate(|_rti, e| { for rtd in RoutingDomain::all() { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 288d1167..aff24769 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -189,6 +189,7 @@ struct WaitableReply { safety_route: Option, remote_private_route: Option, reply_private_route: Option, + _connection_ref_scope: ConnectionRefScope, } ///////////////////////////////////////////////////////////////////// @@ -1147,7 +1148,8 @@ impl RPCProcessor { dest: Destination, question: RPCQuestion, context: Option, - ) ->RPCNetworkResult { + protect: bool, + ) -> RPCNetworkResult { // Get sender peer info if we should send that let spi = self.get_sender_peer_info(&dest); @@ -1157,7 +1159,7 @@ impl RPCProcessor { // Log rpc send #[cfg(feature = "verbose-tracing")] - debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest); + debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest, protect); // Produce rendered operation let RenderedOperation { @@ -1221,6 +1223,16 @@ impl RPCProcessor { remote_private_route, ); + + // Ref the connection so it doesn't go away until we're done with the waitable reply + let connection_ref_scope = self + .network_manager() + .connection_manager() + .connection_ref_scope( + send_data_method.connection_descriptor, + protect, + ); + // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { handle, @@ -1231,6 +1243,7 @@ impl RPCProcessor { safety_route, remote_private_route, reply_private_route, + _connection_ref_scope: connection_ref_scope, })) } diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 239768ca..c465f696 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -21,7 +21,7 @@ impl RPCProcessor { ); // Send the app call question - let waitable_reply = network_result_try!(self.question(dest, question, None).await?); + let waitable_reply = network_result_try!(self.question(dest, question, None, false).await?); // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index ded13285..64dc41ca 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -41,7 +41,8 @@ impl RPCProcessor { let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest); // Send the find_node request - let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); + let waitable_reply = + network_result_try!(self.question(dest, find_node_q, None, false).await?); // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 055aaa45..34faf1ed 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -78,7 +78,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest.clone(), question, Some(question_context)) + self.question(dest.clone(), question, Some(question_context), false) .await? ); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index b507b73e..a48c1037 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -92,7 +92,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest.clone(), question, Some(question_context)) + self.question(dest.clone(), question, Some(question_context), false) .await? ); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 9b831571..76f77c79 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -109,14 +109,7 @@ impl RPCProcessor { // Send the info request let waitable_reply = - network_result_try!(self.question(dest.clone(), question, None).await?); - - // Optionally protect the connection in the event this for a relay or route keepalive - if protect { - self.network_manager() - .connection_manager() - .protect_connection(waitable_reply.send_data_method.connection_descriptor); - } + network_result_try!(self.question(dest.clone(), question, None, protect).await?); // Note what kind of ping this was and to what peer scope let send_data_method = waitable_reply.send_data_method.clone();