diff --git a/.vscode/launch.json b/.vscode/launch.json index b0f99ce6..84701cd6 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -103,6 +103,28 @@ ], "cwd": "${workspaceFolder}/target/debug/" }, + { + "type": "lldb", + "request": "launch", + "name": "Debug veilid-tools unit test", + "cargo": { + "args": [ + "test", + "--no-run", + "--features=rt-tokio", + "--manifest-path", + "veilid-tools/Cargo.toml" + ], + "filter": { + "kind": "cdylib", + "name": "veilid-tools" + } + }, + "args": [ + "${selectedText}" + ], + "cwd": "${workspaceFolder}/target/debug/" + }, { "type": "lldb", "request": "launch", diff --git a/veilid-tools/src/assembly_buffer.rs b/veilid-tools/src/assembly_buffer.rs index 3bc8abe5..d60e7108 100644 --- a/veilid-tools/src/assembly_buffer.rs +++ b/veilid-tools/src/assembly_buffer.rs @@ -282,7 +282,17 @@ impl AssemblyBuffer { let peer_messages = e.get_mut(); // Insert the fragment and see what comes out - peer_messages.insert_fragment(seq, off, len, chunk) + let out = peer_messages.insert_fragment(seq, off, len, chunk); + + // If we are returning a message, see if there are any more assemblies for this peer + // If not, remove the peer + if out.is_some() { + if peer_messages.assemblies.len() == 0 { + e.remove(); + } + } + + out } std::collections::hash_map::Entry::Vacant(v) => { // See if we have room for one more @@ -331,10 +341,10 @@ impl AssemblyBuffer { &self, data: Vec, remote_addr: SocketAddr, - sender: S, + mut sender: S, ) -> std::io::Result> where - S: Fn(Vec, SocketAddr) -> F, + S: FnMut(Vec, SocketAddr) -> F, F: Future>>, { if data.len() > MAX_LEN { diff --git a/veilid-tools/src/tests/native/test_assembly_buffer.rs b/veilid-tools/src/tests/native/test_assembly_buffer.rs index 45c448ac..52550dc3 100644 --- a/veilid-tools/src/tests/native/test_assembly_buffer.rs +++ b/veilid-tools/src/tests/native/test_assembly_buffer.rs @@ -1,3 +1,5 @@ +use rand::seq::SliceRandom; + use crate::*; fn random_sockaddr() -> SocketAddr { @@ -15,6 +17,7 @@ fn random_sockaddr() -> SocketAddr { } pub async fn test_single_out_in() { + info!("-- test_single_out_in"); let assbuf_out = AssemblyBuffer::new(); let assbuf_in = AssemblyBuffer::new(); let (net_tx, net_rx) = flume::unbounded(); @@ -30,7 +33,9 @@ pub async fn test_single_out_in() { }; for _ in 0..1000 { - let message = vec![1u8; 1000]; + let random_len = (get_random_u32() % 1000) as usize; + let mut message = vec![1u8; random_len]; + random_bytes(&mut message); let remote_addr = random_sockaddr(); // Send single message below fragmentation limit @@ -58,6 +63,344 @@ pub async fn test_single_out_in() { assert!(net_rx.is_empty()) } +pub async fn test_one_frag_out_in() { + info!("-- test_one_frag_out_in"); + let assbuf_out = AssemblyBuffer::new(); + let assbuf_in = AssemblyBuffer::new(); + let (net_tx, net_rx) = flume::unbounded(); + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| { + let net_tx = net_tx.clone(); + async move { + net_tx + .send_async((framed_chunk, remote_addr)) + .await + .expect("should send"); + Ok(NetworkResult::value(())) + } + }; + + let mut all_sent = HashSet::new(); + + // Sending + println!("sending"); + for _ in 0..10000 { + let random_len = (get_random_u32() % 1000) as usize + 1280; + let mut message = vec![1u8; random_len]; + random_bytes(&mut message); + let remote_addr = random_sockaddr(); + + // Send single message above fragmentation limit + all_sent.insert((message.clone(), remote_addr)); + assert!(matches!( + assbuf_out + .split_message(message.clone(), remote_addr, sender) + .await, + Ok(NetworkResult::Value(())) + )); + } + + println!("all_sent len={}", all_sent.len()); + + println!("fragments sent = {}", net_rx.len()); + + drop(net_tx); + + // Receiving + println!("receiving"); + + while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { + // Send to input + let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + + // We should have gotten the same message + if let Some(r_message) = r_message { + assert!(all_sent.remove(&(r_message, r_remote_addr))); + } + } + println!("all_sent len={}", all_sent.len()); + + // Shoud have dropped no packets + assert_eq!(all_sent.len(), 0); +} + +pub async fn test_many_frags_out_in() { + info!("-- test_many_frags_out_in"); + let assbuf_out = AssemblyBuffer::new(); + let assbuf_in = AssemblyBuffer::new(); + let (net_tx, net_rx) = flume::unbounded(); + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| { + let net_tx = net_tx.clone(); + async move { + net_tx + .send_async((framed_chunk, remote_addr)) + .await + .expect("should send"); + Ok(NetworkResult::value(())) + } + }; + + let mut all_sent = HashSet::new(); + + // Sending + let mut total_sent_size = 0usize; + println!("sending"); + for _ in 0..1000 { + let random_len = (get_random_u32() % 65536) as usize; + total_sent_size += random_len; + let mut message = vec![1u8; random_len]; + random_bytes(&mut message); + let remote_addr = random_sockaddr(); + + // Send single message + all_sent.insert((message.clone(), remote_addr)); + assert!(matches!( + assbuf_out + .split_message(message.clone(), remote_addr, sender) + .await, + Ok(NetworkResult::Value(())) + )); + } + + println!("all_sent len={}", all_sent.len()); + println!("total_sent_size = {}", total_sent_size); + println!("fragments sent = {}", net_rx.len()); + + drop(net_tx); + + // Receiving + println!("receiving"); + + while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { + // Send to input + let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + + // We should have gotten the same message + if let Some(r_message) = r_message { + assert!(all_sent.remove(&(r_message, r_remote_addr))); + } + } + println!("all_sent len={}", all_sent.len()); + + // Shoud have dropped no packets + assert_eq!(all_sent.len(), 0); +} + +pub async fn test_many_frags_out_in_single_host() { + info!("-- test_many_frags_out_in_single_host"); + let assbuf_out = AssemblyBuffer::new(); + let assbuf_in = AssemblyBuffer::new(); + let (net_tx, net_rx) = flume::unbounded(); + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| { + let net_tx = net_tx.clone(); + async move { + net_tx + .send_async((framed_chunk, remote_addr)) + .await + .expect("should send"); + Ok(NetworkResult::value(())) + } + }; + + let mut all_sent = HashSet::new(); + + // Sending + let mut total_sent_size = 0usize; + println!("sending"); + for _ in 0..1000 { + let random_len = (get_random_u32() % 65536) as usize; + total_sent_size += random_len; + let mut message = vec![1u8; random_len]; + random_bytes(&mut message); + let remote_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 5678)); + + // Send single message + all_sent.insert((message.clone(), remote_addr)); + assert!(matches!( + assbuf_out + .split_message(message.clone(), remote_addr, sender) + .await, + Ok(NetworkResult::Value(())) + )); + } + + println!("all_sent len={}", all_sent.len()); + println!("total_sent_size = {}", total_sent_size); + println!("fragments sent = {}", net_rx.len()); + + drop(net_tx); + + // Receiving + println!("receiving"); + + while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { + // Send to input + let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + + // We should have gotten the same message + if let Some(r_message) = r_message { + assert!(all_sent.remove(&(r_message, r_remote_addr))); + } + } + println!("all_sent len={}", all_sent.len()); + + // Shoud have dropped no packets + assert_eq!(all_sent.len(), 0); +} + +pub async fn test_many_frags_with_drops() { + info!("-- test_many_frags_with_drops"); + let assbuf_out = AssemblyBuffer::new(); + let assbuf_in = AssemblyBuffer::new(); + let (net_tx, net_rx) = flume::unbounded(); + + let first = Arc::new(AtomicBool::new(true)); + + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| { + let net_tx = net_tx.clone(); + let first = first.clone(); + async move { + // Send only first packet, drop rest + if first.swap(false, Ordering::Relaxed) { + net_tx + .send_async((framed_chunk, remote_addr)) + .await + .expect("should send"); + } + Ok(NetworkResult::value(())) + } + }; + + let mut all_sent = HashSet::new(); + + // Sending + let mut total_sent_size = 0usize; + let mut total_fragged = 0usize; + println!("sending"); + for _ in 0..1000 { + let random_len = (get_random_u32() % 65536) as usize; + if random_len > 1280 { + total_fragged += 1; + } + total_sent_size += random_len; + let mut message = vec![1u8; random_len]; + random_bytes(&mut message); + let remote_addr = random_sockaddr(); + + // Send single message + all_sent.insert((message.clone(), remote_addr)); + assert!(matches!( + assbuf_out + .split_message(message.clone(), remote_addr, sender) + .await, + Ok(NetworkResult::Value(())) + )); + + first.store(true, Ordering::Relaxed); + } + + println!("all_sent len={}", all_sent.len()); + println!("total_sent_size = {}", total_sent_size); + println!("fragments sent = {}", net_rx.len()); + println!("total_fragged = {}", total_fragged); + drop(net_tx); + + // Receiving + println!("receiving"); + + while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { + // Send to input + let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + + // We should have gotten the same message + if let Some(r_message) = r_message { + assert!(all_sent.remove(&(r_message, r_remote_addr))); + } + } + println!("all_sent len={}", all_sent.len()); + + // Shoud have dropped all fragged packets + assert_eq!(all_sent.len(), total_fragged); +} + +pub async fn test_many_frags_reordered() { + info!("-- test_many_frags_reordered"); + let assbuf_out = AssemblyBuffer::new(); + let assbuf_in = AssemblyBuffer::new(); + let (net_tx, net_rx) = flume::unbounded(); + + let reorder_buffer = Arc::new(Mutex::new(Vec::new())); + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| { + let reorder_buffer = reorder_buffer.clone(); + async move { + reorder_buffer.lock().push((framed_chunk, remote_addr)); + Ok(NetworkResult::Value(())) + } + }; + + let mut all_sent = HashSet::new(); + + // Sending + let mut total_sent_size = 0usize; + let mut rng = rand::thread_rng(); + println!("sending"); + for _ in 0..1000 { + let random_len = (get_random_u32() % 65536) as usize; + total_sent_size += random_len; + let mut message = vec![1u8; random_len]; + random_bytes(&mut message); + let remote_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 5678)); + + // Send single message + all_sent.insert((message.clone(), remote_addr)); + assert!(matches!( + assbuf_out + .split_message(message.clone(), remote_addr, sender) + .await, + Ok(NetworkResult::Value(())) + )); + + // Shuffle fragments + let items = { + let mut rbinner = reorder_buffer.lock(); + rbinner.shuffle(&mut rng); + let items = rbinner.clone(); + rbinner.clear(); + items + }; + for p in items { + net_tx.send_async(p).await.expect("should send"); + } + } + + println!("all_sent len={}", all_sent.len()); + println!("total_sent_size = {}", total_sent_size); + println!("fragments sent = {}", net_rx.len()); + + drop(net_tx); + + // Receiving + println!("receiving"); + + while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { + // Send to input + let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + + // We should have gotten the same message + if let Some(r_message) = r_message { + assert!(all_sent.remove(&(r_message, r_remote_addr))); + } + } + println!("all_sent len={}", all_sent.len()); + + // Shoud have dropped no packets + assert_eq!(all_sent.len(), 0); +} + pub async fn test_all() { test_single_out_in().await; + test_one_frag_out_in().await; + test_many_frags_out_in().await; + test_many_frags_out_in_single_host().await; + test_many_frags_with_drops().await; + test_many_frags_reordered().await; }