From facb34316075ee587be96d5984e0eaa4f420bae9 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 3 Nov 2023 15:57:40 -0400 Subject: [PATCH] clean up protect/refs --- .../src/network_manager/connection_manager.rs | 76 ++++++++++------- .../src/network_manager/connection_table.rs | 10 +-- .../native/discovery_context.rs | 2 +- .../network_manager/native/protocol/tcp.rs | 4 +- .../src/network_manager/network_connection.rs | 39 +++++---- veilid-core/src/network_manager/stats.rs | 2 + .../tasks/public_address_check.rs | 4 +- veilid-core/src/routing_table/node_ref.rs | 4 + .../src/routing_table/route_spec_store/mod.rs | 4 +- .../src/routing_table/tasks/ping_validator.rs | 6 +- veilid-core/src/rpc_processor/mod.rs | 6 +- veilid-core/src/rpc_processor/rpc_app_call.rs | 2 +- .../src/rpc_processor/rpc_find_node.rs | 3 +- .../src/rpc_processor/rpc_get_value.rs | 2 +- .../src/rpc_processor/rpc_set_value.rs | 2 +- veilid-core/src/rpc_processor/rpc_status.rs | 3 +- veilid-core/src/veilid_api/debug.rs | 2 +- veilid-tools/src/must_join_single_future.rs | 14 +--- .../tests/native/test_async_peek_stream.rs | 81 +++++++------------ 19 files changed, 128 insertions(+), 138 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index c76cb74a..c2c53eb6 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -17,33 +17,22 @@ enum ConnectionManagerEvent { pub(crate) struct ConnectionRefScope { connection_manager: ConnectionManager, descriptor: ConnectionDescriptor, - protect: bool, } impl ConnectionRefScope { - pub fn new( - connection_manager: ConnectionManager, - descriptor: ConnectionDescriptor, - protect: bool, - ) -> Self { - connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef, protect); + pub fn new(connection_manager: ConnectionManager, descriptor: ConnectionDescriptor) -> Self { + connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef); Self { connection_manager, descriptor, - protect, } } } impl Drop for ConnectionRefScope { fn drop(&mut self) { - if !self.protect { - self.connection_manager.connection_ref( - self.descriptor, - ConnectionRefKind::RemoveRef, - false, - ); - } + self.connection_manager + .connection_ref(self.descriptor, ConnectionRefKind::RemoveRef); } } @@ -169,6 +158,33 @@ impl ConnectionManager { debug!("finished connection manager shutdown"); } + // Internal routine to see if we should keep this connection + // from being LRU removed. Used on our initiated relay connections. + fn should_protect_connection(&self, conn: &NetworkConnection) -> Option { + let netman = self.network_manager(); + let routing_table = netman.routing_table(); + let remote_address = conn.connection_descriptor().remote_address().address(); + let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else { + return None; + }; + let Some(rn) = routing_table.relay_node(routing_domain) else { + return None; + }; + let relay_nr = rn.filtered_clone( + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_address_type(conn.connection_descriptor().address_type()) + .with_protocol_type(conn.connection_descriptor().protocol_type()), + ); + let dids = relay_nr.all_filtered_dial_info_details(); + for did in dids { + if did.dial_info.address() == remote_address { + return Some(relay_nr); + } + } + None + } + // Internal routine to register new connection atomically. // Registers connection in the connection table for later access // and spawns a message processing loop for the connection @@ -193,9 +209,15 @@ impl ConnectionManager { None => bail!("not creating connection because we are stopping"), }; - let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); + let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); let handle = conn.get_handle(); + // See if this should be a protected connection + if let Some(protect_nr) = self.should_protect_connection(&conn) { + log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(get_aligned_timestamp()), protect_nr); + conn.protect(protect_nr); + } + // Add to the connection table match self.arc.connection_table.add_connection(conn) { Ok(None) => { @@ -253,22 +275,13 @@ impl ConnectionManager { } // Protects a network connection if one already is established - fn connection_ref( - &self, - descriptor: ConnectionDescriptor, - kind: ConnectionRefKind, - protect: bool, - ) { + fn connection_ref(&self, descriptor: ConnectionDescriptor, kind: ConnectionRefKind) { self.arc .connection_table - .ref_connection_by_descriptor(descriptor, kind, protect); + .ref_connection_by_descriptor(descriptor, kind); } - pub fn connection_ref_scope( - &self, - descriptor: ConnectionDescriptor, - protect: bool, - ) -> ConnectionRefScope { - ConnectionRefScope::new(self.clone(), descriptor, protect) + pub fn connection_ref_scope(&self, descriptor: ConnectionDescriptor) -> ConnectionRefScope { + ConnectionRefScope::new(self.clone(), descriptor) } /// Called when we want to create a new connection or get the current one that already exists @@ -446,6 +459,11 @@ impl ConnectionManager { // Inform the processor of the event if let Some(conn) = conn { + // If the connection closed while it was protected, report it on the node the connection was established on + // In-use connections will already get reported because they will cause a 'question_lost' stat on the remote node + if let Some(protect_nr) = conn.protected_node_ref() { + protect_nr.report_protected_connection_dropped(); + } let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await; } } diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index ef04af41..88aa8bc8 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -195,7 +195,7 @@ impl ConnectionTable { // Find a free connection to terminate to make room let dead_k = { let Some(lruk) = inner.conn_by_id[protocol_index].iter().find_map(|(k, v)| { - if !v.is_in_use() { + if !v.is_in_use() && v.protected_node_ref().is_none() { Some(*k) } else { None @@ -254,7 +254,6 @@ impl ConnectionTable { &self, descriptor: ConnectionDescriptor, ref_type: ConnectionRefKind, - protect: bool, ) -> bool { if descriptor.protocol_type() == ProtocolType::UDP { return false; @@ -269,11 +268,8 @@ impl ConnectionTable { let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap(); match ref_type { - ConnectionRefKind::AddRef => out.change_ref_count(true), - ConnectionRefKind::RemoveRef => out.change_ref_count(false), - } - if protect { - out.protect(); + ConnectionRefKind::AddRef => out.add_ref(), + ConnectionRefKind::RemoveRef => out.remove_ref(), } true diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index fe42f46f..b2713a3b 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -135,7 +135,7 @@ impl DiscoveryContext { async fn request_public_address(&self, node_ref: NodeRef) -> Option { let rpc = self.unlocked_inner.routing_table.rpc_processor(); - let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone()), false).await { + let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await { Ok(v) => v, Err(e) => { log_net!(error diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 755d1ce0..52e1e27f 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -27,8 +27,8 @@ impl RawTcpNetworkConnection { instrument(level = "trace", err, skip(self)) )] pub async fn close(&self) -> io::Result> { - // Make an attempt to flush the stream - self.stream.clone().close().await?; + let mut stream = self.stream.clone(); + let _ = stream.close().await; Ok(NetworkResult::value(())) // // Then shut down the write side of the socket to effect a clean close diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index bd4f49ea..fe99179a 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -86,7 +86,7 @@ pub struct NetworkConnectionStats { pub type NetworkConnectionId = AlignedU64; #[derive(Debug)] -pub struct NetworkConnection { +pub(in crate::network_manager) struct NetworkConnection { connection_id: NetworkConnectionId, descriptor: ConnectionDescriptor, processor: Option>, @@ -94,7 +94,7 @@ pub struct NetworkConnection { stats: Arc>, sender: flume::Sender<(Option, Vec)>, stop_source: Option, - protected: bool, + protected_nr: Option, ref_count: usize, } @@ -123,7 +123,7 @@ impl NetworkConnection { })), sender, stop_source: None, - protected: false, + protected_nr: None, ref_count: 0, } } @@ -170,7 +170,7 @@ impl NetworkConnection { stats, sender, stop_source: Some(stop_source), - protected: false, + protected_nr: None, ref_count: 0, } } @@ -188,19 +188,23 @@ impl NetworkConnection { } pub fn is_in_use(&self) -> bool { - self.protected || self.ref_count > 0 + self.ref_count > 0 } - pub fn protect(&mut self) { - self.protected = true; + pub fn protected_node_ref(&self) -> Option{ + self.protected_nr.clone() } - pub fn change_ref_count(&mut self, add: bool) { - if add { - self.ref_count += 1; - } else { - self.ref_count -= 1; - } + pub fn protect(&mut self, protect_nr: NodeRef) { + self.protected_nr = Some(protect_nr); + } + + pub fn add_ref(&mut self) { + self.ref_count += 1; + } + + pub fn remove_ref(&mut self) { + self.ref_count -= 1; } pub fn close(&mut self) { @@ -373,7 +377,6 @@ impl NetworkConnection { // Touch the LRU for this connection connection_manager.touch_connection_by_id(connection_id); - RecvLoopAction::Recv } } @@ -439,13 +442,19 @@ impl NetworkConnection { } pub fn debug_print(&self, cur_ts: Timestamp) -> String { - format!("{} <- {} | {} | est {} sent {} rcvd {}", + format!("{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}", self.descriptor.remote_address(), self.descriptor.local().map(|x| x.to_string()).unwrap_or("---".to_owned()), self.connection_id.as_u64(), debug_duration(cur_ts.as_u64().saturating_sub(self.established_time.as_u64())), self.stats().last_message_sent_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()), self.stats().last_message_recv_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()), + self.ref_count, + if let Some(pnr) = &self.protected_nr { + format!(" PROTECTED:{}",pnr) + } else { + "".to_owned() + } ) } } diff --git a/veilid-core/src/network_manager/stats.rs b/veilid-core/src/network_manager/stats.rs index b5296607..608cf1cf 100644 --- a/veilid-core/src/network_manager/stats.rs +++ b/veilid-core/src/network_manager/stats.rs @@ -42,6 +42,7 @@ impl NetworkManager { .self_stats .transfer_stats_accounting .add_up(bytes); + #[allow(clippy::unwrap_or_default)] inner .stats .per_address_stats @@ -58,6 +59,7 @@ impl NetworkManager { .self_stats .transfer_stats_accounting .add_down(bytes); + #[allow(clippy::unwrap_or_default)] inner .stats .per_address_stats diff --git a/veilid-core/src/network_manager/tasks/public_address_check.rs b/veilid-core/src/network_manager/tasks/public_address_check.rs index 55c9162d..e214e27a 100644 --- a/veilid-core/src/network_manager/tasks/public_address_check.rs +++ b/veilid-core/src/network_manager/tasks/public_address_check.rs @@ -192,7 +192,7 @@ impl NetworkManager { let pait = inner .public_address_inconsistencies_table .entry(addr_proto_type_key) - .or_insert_with(HashMap::new); + .or_default(); for i in &inconsistencies { pait.insert(*i, exp_ts); } @@ -204,7 +204,7 @@ impl NetworkManager { let pait = inner .public_address_inconsistencies_table .entry(addr_proto_type_key) - .or_insert_with(HashMap::new); + .or_default(); let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US; for i in inconsistencies { diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 97104d3b..1acaef96 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -321,6 +321,10 @@ pub(crate) trait NodeRefBase: Sized { }) } + fn report_protected_connection_dropped(&self) { + self.stats_failed_to_send(get_aligned_timestamp(), false); + } + fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_up(bytes); diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 035df5d9..23f87b30 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -703,7 +703,7 @@ impl RouteSpecStore { // Test with double-round trip ping to self let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); - let _res = match rpc_processor.rpc_call_status(dest, true).await? { + let _res = match rpc_processor.rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { // Did not error, but did not come back, just return false @@ -746,7 +746,7 @@ impl RouteSpecStore { // Test with double-round trip ping to self let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); - let _res = match rpc_processor.rpc_call_status(dest, true).await? { + let _res = match rpc_processor.rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { // Did not error, but did not come back, just return false diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index fdd54087..ff88b2d7 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -112,7 +112,7 @@ impl RoutingTable { futurequeue.push_back( async move { - rpc.rpc_call_status(Destination::direct(relay_nr_filtered), true) + rpc.rpc_call_status(Destination::direct(relay_nr_filtered)) .await } .instrument(Span::current()) @@ -148,7 +148,7 @@ impl RoutingTable { let rpc = rpc.clone(); log_rtab!("--> Validator ping to {:?}", nr); futurequeue.push_back( - async move { rpc.rpc_call_status(Destination::direct(nr), false).await } + async move { rpc.rpc_call_status(Destination::direct(nr)).await } .instrument(Span::current()) .boxed(), ); @@ -176,7 +176,7 @@ impl RoutingTable { // Just do a single ping with the best protocol for all the nodes futurequeue.push_back( - async move { rpc.rpc_call_status(Destination::direct(nr), false).await } + async move { rpc.rpc_call_status(Destination::direct(nr)).await } .instrument(Span::current()) .boxed(), ); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index aff24769..e9c4cc19 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1148,7 +1148,6 @@ impl RPCProcessor { dest: Destination, question: RPCQuestion, context: Option, - protect: bool, ) -> RPCNetworkResult { // Get sender peer info if we should send that let spi = self.get_sender_peer_info(&dest); @@ -1228,10 +1227,7 @@ impl RPCProcessor { let connection_ref_scope = self .network_manager() .connection_manager() - .connection_ref_scope( - send_data_method.connection_descriptor, - protect, - ); + .connection_ref_scope(send_data_method.connection_descriptor); // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index c465f696..239768ca 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -21,7 +21,7 @@ impl RPCProcessor { ); // Send the app call question - let waitable_reply = network_result_try!(self.question(dest, question, None, false).await?); + let waitable_reply = network_result_try!(self.question(dest, question, None).await?); // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 64dc41ca..ded13285 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -41,8 +41,7 @@ impl RPCProcessor { let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest); // Send the find_node request - let waitable_reply = - network_result_try!(self.question(dest, find_node_q, None, false).await?); + let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 34faf1ed..055aaa45 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -78,7 +78,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest.clone(), question, Some(question_context), false) + self.question(dest.clone(), question, Some(question_context)) .await? ); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index a48c1037..b507b73e 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -92,7 +92,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest.clone(), question, Some(question_context), false) + self.question(dest.clone(), question, Some(question_context)) .await? ); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 76f77c79..4126d7a9 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -22,7 +22,6 @@ impl RPCProcessor { pub async fn rpc_call_status( self, dest: Destination, - protect: bool, ) -> RPCNetworkResult>> { let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() { SafetySelection::Unsafe(_) => { @@ -109,7 +108,7 @@ impl RPCProcessor { // Send the info request let waitable_reply = - network_result_try!(self.question(dest.clone(), question, None, protect).await?); + network_result_try!(self.question(dest.clone(), question, None).await?); // Note what kind of ping this was and to what peer scope let send_data_method = waitable_reply.send_data_method.clone(); diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index ca510a1e..730870b8 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -930,7 +930,7 @@ impl VeilidAPI { // Send a StatusQ let out = match rpc - .rpc_call_status(dest, false) + .rpc_call_status(dest) .await .map_err(VeilidAPIError::internal)? { diff --git a/veilid-tools/src/must_join_single_future.rs b/veilid-tools/src/must_join_single_future.rs index 42663ad4..d9857ad6 100644 --- a/veilid-tools/src/must_join_single_future.rs +++ b/veilid-tools/src/must_join_single_future.rs @@ -75,9 +75,7 @@ where return Err(()); } }; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - + if let Some(mut jh) = maybe_jh { // See if we finished, if so, return the value of the last execution if let Poll::Ready(r) = poll!(&mut jh) { out = Some(r); @@ -110,8 +108,7 @@ where return Err(()); } }; - if maybe_jh.is_some() { - let jh = maybe_jh.unwrap(); + if let Some(jh) = maybe_jh { // Wait for return value of the last execution out = Some(jh.await); // Task finished, unlock with nothing @@ -141,9 +138,7 @@ where }; let mut run = true; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - + if let Some(mut jh) = maybe_jh { // See if we finished, if so, return the value of the last execution if let Poll::Ready(r) = poll!(&mut jh) { out = Some(r); @@ -183,8 +178,7 @@ where } }; let mut run = true; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); + if let Some(mut jh) = maybe_jh { // See if we finished, if so, return the value of the last execution if let Poll::Ready(r) = poll!(&mut jh) { out = Some(r); diff --git a/veilid-tools/src/tests/native/test_async_peek_stream.rs b/veilid-tools/src/tests/native/test_async_peek_stream.rs index 0797a4f8..0d6bb881 100644 --- a/veilid-tools/src/tests/native/test_async_peek_stream.rs +++ b/veilid-tools/src/tests/native/test_async_peek_stream.rs @@ -73,8 +73,7 @@ pub async fn test_nothing() { a.write_all(&outbuf).await.unwrap(); - let mut inbuf: Vec = Vec::new(); - inbuf.resize(outbuf.len(), 0u8); + let mut inbuf: Vec = vec![0; outbuf.len()]; c.read_exact(&mut inbuf).await.unwrap(); assert_eq!(inbuf, outbuf); @@ -88,8 +87,7 @@ pub async fn test_no_peek() { a.write_all(&outbuf).await.unwrap(); - let mut inbuf: Vec = Vec::new(); - inbuf.resize(outbuf.len(), 0u8); + let mut inbuf: Vec = vec![0; outbuf.len()]; c.read_exact(&mut inbuf).await.unwrap(); assert_eq!(inbuf, outbuf); @@ -104,14 +102,12 @@ pub async fn test_peek_all_read() { a.write_all(&outbuf).await.unwrap(); // peek everything - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len(), 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len()]; let peeksize1 = c.peek(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // read everything - let mut inbuf: Vec = Vec::new(); - inbuf.resize(outbuf.len(), 0u8); + let mut inbuf: Vec = vec![0; outbuf.len()]; c.read_exact(&mut inbuf).await.unwrap(); assert_eq!(inbuf, outbuf); @@ -128,13 +124,11 @@ pub async fn test_peek_some_read() { a.write_all(&outbuf).await.unwrap(); // peek partially - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len() / 2, 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len() / 2]; let peeksize1 = c.peek(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // read everything - let mut inbuf: Vec = Vec::new(); - inbuf.resize(outbuf.len(), 0u8); + let mut inbuf: Vec = vec![0; outbuf.len()]; c.read_exact(&mut inbuf).await.unwrap(); assert_eq!(inbuf, outbuf); @@ -151,20 +145,17 @@ pub async fn test_peek_some_peek_some_read() { a.write_all(&outbuf).await.unwrap(); // peek partially - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len() / 4, 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len() / 4]; let peeksize1 = c.peek(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // peek partially - let mut peekbuf2: Vec = Vec::new(); - peekbuf2.resize(peeksize1 + 1, 0u8); + let mut peekbuf2: Vec = vec![0; peeksize1 + 1]; let peeksize2 = c.peek(&mut peekbuf2).await.unwrap(); assert_eq!(peeksize2, peekbuf2.len()); // read everything - let mut inbuf: Vec = Vec::new(); - inbuf.resize(outbuf.len(), 0u8); + let mut inbuf: Vec = vec![0; outbuf.len()]; c.read_exact(&mut inbuf).await.unwrap(); assert_eq!(inbuf, outbuf); @@ -182,25 +173,21 @@ pub async fn test_peek_some_read_peek_some_read() { a.write_all(&outbuf).await.unwrap(); // peek partially - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len() / 4, 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len() / 4]; let peeksize1 = c.peek(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // read partially - let mut inbuf1: Vec = Vec::new(); - inbuf1.resize(peeksize1 - 1, 0u8); + let mut inbuf1: Vec = vec![0; peeksize1 - 1]; c.read_exact(&mut inbuf1).await.unwrap(); // peek partially - let mut peekbuf2: Vec = Vec::new(); - peekbuf2.resize(2, 0u8); + let mut peekbuf2: Vec = vec![0; 2]; let peeksize2 = c.peek(&mut peekbuf2).await.unwrap(); assert_eq!(peeksize2, peekbuf2.len()); // read partially - let mut inbuf2: Vec = Vec::new(); - inbuf2.resize(2, 0u8); + let mut inbuf2: Vec = vec![0; 2]; c.read_exact(&mut inbuf2).await.unwrap(); assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec()); @@ -219,25 +206,21 @@ pub async fn test_peek_some_read_peek_all_read() { a.write_all(&outbuf).await.unwrap(); // peek partially - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len() / 4, 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len() / 4]; let peeksize1 = c.peek(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // read partially - let mut inbuf1: Vec = Vec::new(); - inbuf1.resize(peeksize1 + 1, 0u8); + let mut inbuf1: Vec = vec![0; peeksize1 + 1]; c.read_exact(&mut inbuf1).await.unwrap(); // peek past end - let mut peekbuf2: Vec = Vec::new(); - peekbuf2.resize(outbuf.len(), 0u8); + let mut peekbuf2: Vec = vec![0; outbuf.len()]; let peeksize2 = c.peek(&mut peekbuf2).await.unwrap(); assert_eq!(peeksize2, outbuf.len() - (peeksize1 + 1)); // read remaining - let mut inbuf2: Vec = Vec::new(); - inbuf2.resize(peeksize2, 0u8); + let mut inbuf2: Vec = vec![0; peeksize2]; c.read_exact(&mut inbuf2).await.unwrap(); assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec()); @@ -259,29 +242,24 @@ pub async fn test_peek_some_read_peek_some_read_all_read() { a.write_all(&outbuf).await.unwrap(); // peek partially - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len() / 4, 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len() / 4]; let peeksize1 = c.peek(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // read partially - let mut inbuf1: Vec = Vec::new(); - inbuf1.resize(peeksize1 - 1, 0u8); + let mut inbuf1: Vec = vec![0; peeksize1 - 1]; c.read_exact(&mut inbuf1).await.unwrap(); // peek partially - let mut peekbuf2: Vec = Vec::new(); - peekbuf2.resize(2, 0u8); + let mut peekbuf2: Vec = vec![0; 2]; let peeksize2 = c.peek(&mut peekbuf2).await.unwrap(); assert_eq!(peeksize2, peekbuf2.len()); // read partially - let mut inbuf2: Vec = Vec::new(); - inbuf2.resize(1, 0u8); + let mut inbuf2: Vec = vec![0; 1]; c.read_exact(&mut inbuf2).await.unwrap(); // read remaining - let mut inbuf3: Vec = Vec::new(); - inbuf3.resize(outbuf.len() - peeksize1, 0u8); + let mut inbuf3: Vec = vec![0; outbuf.len() - peeksize1]; c.read_exact(&mut inbuf3).await.unwrap(); assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec()); @@ -304,29 +282,24 @@ pub async fn test_peek_exact_read_peek_exact_read_all_read() { a.write_all(&outbuf).await.unwrap(); // peek partially - let mut peekbuf1: Vec = Vec::new(); - peekbuf1.resize(outbuf.len() / 4, 0u8); + let mut peekbuf1: Vec = vec![0; outbuf.len() / 4]; let peeksize1 = c.peek_exact(&mut peekbuf1).await.unwrap(); assert_eq!(peeksize1, peekbuf1.len()); // read partially - let mut inbuf1: Vec = Vec::new(); - inbuf1.resize(peeksize1 - 1, 0u8); + let mut inbuf1: Vec = vec![0; peeksize1 - 1]; c.read_exact(&mut inbuf1).await.unwrap(); // peek partially - let mut peekbuf2: Vec = Vec::new(); - peekbuf2.resize(2, 0u8); + let mut peekbuf2: Vec = vec![0; 2]; let peeksize2 = c.peek_exact(&mut peekbuf2).await.unwrap(); assert_eq!(peeksize2, peekbuf2.len()); // read partially - let mut inbuf2: Vec = Vec::new(); - inbuf2.resize(1, 0u8); + let mut inbuf2: Vec = vec![0; 1]; c.read_exact(&mut inbuf2).await.unwrap(); // read remaining - let mut inbuf3: Vec = Vec::new(); - inbuf3.resize(outbuf.len() - peeksize1, 0u8); + let mut inbuf3: Vec = vec![0; outbuf.len() - peeksize1]; c.read_exact(&mut inbuf3).await.unwrap(); assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());