From e4f97cfefa6399b4d167de3bd1628c9c27d303d0 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 23 Jun 2023 12:05:28 -0400 Subject: [PATCH] assembly buffer --- Cargo.lock | 2 + .../network_manager/native/protocol/mod.rs | 2 - .../network_manager/native/protocol/udp.rs | 1 - veilid-tools/Cargo.toml | 2 + .../src}/assembly_buffer.rs | 144 +++++++++++++++++- veilid-tools/src/lib.rs | 2 + veilid-tools/src/tests/native/mod.rs | 12 ++ .../src/tests/native/test_assembly_buffer.rs | 63 ++++++++ 8 files changed, 219 insertions(+), 9 deletions(-) rename {veilid-core/src/network_manager/native/protocol => veilid-tools/src}/assembly_buffer.rs (62%) create mode 100644 veilid-tools/src/tests/native/test_assembly_buffer.rs diff --git a/Cargo.lock b/Cargo.lock index f7d5ed30..3c0b0907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6608,6 +6608,7 @@ dependencies = [ "cfg-if 1.0.0", "console_error_panic_hook", "eyre", + "flume", "fn_name", "futures-util", "jni 0.21.1", @@ -6626,6 +6627,7 @@ dependencies = [ "paranoid-android", "parking_lot 0.11.2", "rand 0.7.3", + "range-set-blaze", "rust-fsm", "send_wrapper 0.6.0", "serial_test", diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index e7de0f50..0a41a77b 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -4,8 +4,6 @@ pub mod udp; pub mod wrtc; pub mod ws; -mod assembly_buffer; - use super::*; use std::io; diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 5375921d..e29369ee 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -1,5 +1,4 @@ use super::*; -use assembly_buffer::*; use sockets::*; #[derive(Clone)] diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 1deb901a..f92d6ad2 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -36,6 +36,8 @@ rand = "^0.7" rust-fsm = "^0" backtrace = "^0" fn_name = "^0" +range-set-blaze = "0.1.5" +flume = { version = "^0", features = ["async"] } # Dependencies for native builds only # Linux, Windows, Mac, iOS, Android diff --git a/veilid-core/src/network_manager/native/protocol/assembly_buffer.rs b/veilid-tools/src/assembly_buffer.rs similarity index 62% rename from veilid-core/src/network_manager/native/protocol/assembly_buffer.rs rename to veilid-tools/src/assembly_buffer.rs index 94eceb22..3bc8abe5 100644 --- a/veilid-core/src/network_manager/native/protocol/assembly_buffer.rs +++ b/veilid-tools/src/assembly_buffer.rs @@ -11,7 +11,7 @@ const HEADER_LEN: usize = 8; const MAX_LEN: usize = LengthType::MAX as usize; // XXX: keep statistics on all drops and why we dropped them -// XXX: move to config +// XXX: move to config eventually? const FRAGMENT_LEN: usize = 1280 - HEADER_LEN; const MAX_CONCURRENT_HOSTS: usize = 256; const MAX_ASSEMBLIES_PER_HOST: usize = 256; @@ -27,7 +27,7 @@ struct PeerKey { #[derive(Clone, Eq, PartialEq)] struct MessageAssembly { - timestamp: Timestamp, + timestamp: u64, seq: SequenceType, data: Vec, parts: RangeSetBlaze, @@ -35,13 +35,115 @@ struct MessageAssembly { #[derive(Clone, Eq, PartialEq)] struct PeerMessages { - assemblies: LinkedList, + total_buffer: usize, + assemblies: VecDeque, } impl PeerMessages { pub fn new() -> Self { Self { - assemblies: LinkedList::new(), + total_buffer: 0, + assemblies: VecDeque::new(), + } + } + + fn merge_in_data( + &mut self, + timestamp: u64, + ass: usize, + off: LengthType, + len: LengthType, + chunk: &[u8], + ) -> bool { + let assembly = &mut self.assemblies[ass]; + + // Ensure the new fragment hasn't redefined the message length, reusing the same seq + if assembly.data.len() != len as usize { + // Drop the assembly and just go with the new fragment as starting a new assembly + let seq = assembly.seq; + drop(assembly); + self.remove_assembly(ass); + self.new_assembly(timestamp, seq, off, len, chunk); + return false; + } + + let part_start = off; + let part_end = off + chunk.len() as LengthType - 1; + let part = RangeSetBlaze::from_iter([part_start..=part_end]); + + // if fragments overlap, drop the old assembly and go with a new one + if !assembly.parts.is_disjoint(&part) { + let seq = assembly.seq; + drop(assembly); + self.remove_assembly(ass); + self.new_assembly(timestamp, seq, off, len, chunk); + return false; + } + + // Merge part + assembly.parts |= part; + assembly.data[part_start as usize..=part_end as usize].copy_from_slice(chunk); + + // Check to see if this part is done + if assembly.parts.ranges_len() == 1 + && assembly.parts.first().unwrap() == 0 + && assembly.parts.last().unwrap() == len - 1 + { + return true; + } + false + } + + fn new_assembly( + &mut self, + timestamp: u64, + seq: SequenceType, + off: LengthType, + len: LengthType, + chunk: &[u8], + ) -> usize { + // ensure we have enough space for the new assembly + self.reclaim_space(len as usize); + + // make the assembly + let part_start = off; + let part_end = off + chunk.len() as LengthType - 1; + + let mut assembly = MessageAssembly { + timestamp, + seq, + data: vec![0u8; len as usize], + parts: RangeSetBlaze::from_iter([part_start..=part_end]), + }; + assembly.data[part_start as usize..=part_end as usize].copy_from_slice(chunk); + + // Add the buffer length in + self.total_buffer += assembly.data.len(); + self.assemblies.push_front(assembly); + + // Was pushed front, return the front index + 0 + } + + fn remove_assembly(&mut self, index: usize) -> MessageAssembly { + let assembly = self.assemblies.remove(index).unwrap(); + self.total_buffer -= assembly.data.len(); + assembly + } + + fn truncate_assemblies(&mut self, new_len: usize) { + for an in new_len..self.assemblies.len() { + self.total_buffer -= self.assemblies[an].data.len(); + } + self.assemblies.truncate(new_len); + } + + fn reclaim_space(&mut self, needed_space: usize) { + // If we have too many assemblies or too much buffer rotate some out + while self.assemblies.len() > (MAX_ASSEMBLIES_PER_HOST - 1) + || self.total_buffer > (MAX_BUFFER_PER_HOST - needed_space) + { + self.remove_assembly(self.assemblies.len() - 1); } } @@ -56,7 +158,37 @@ impl PeerMessages { let cur_ts = get_timestamp(); // Get the assembly this belongs to by its sequence number - for a in self.assemblies {} + let mut ass = None; + for an in 0..self.assemblies.len() { + // If this assembly's timestamp is too old, then everything after it will be too, drop em all + let age = cur_ts.saturating_sub(self.assemblies[an].timestamp); + if age > MAX_ASSEMBLY_AGE_US { + self.truncate_assemblies(an); + break; + } + // If this assembly has a matching seq, then assemble with it + if self.assemblies[an].seq == seq { + ass = Some(an); + } + } + if ass.is_none() { + // Add a new assembly to the front and return the first index + self.new_assembly(cur_ts, seq, off, len, chunk); + return None; + } + let ass = ass.unwrap(); + + // Now that we have an assembly, merge in the fragment + let done = self.merge_in_data(cur_ts, ass, off, len, chunk); + + // If the assembly is now equal to the entire range, then return it + if done { + let assembly = self.remove_assembly(ass); + return Some(assembly.data); + } + + // Otherwise, do nothing + None } } @@ -128,7 +260,7 @@ impl AssemblyBuffer { // See if we have a whole message and not a fragment if off == 0 && len as usize == chunk.len() { - return Some(frame.to_vec()); + return Some(chunk.to_vec()); } // Drop fragments with offsets greater than or equal to the message length diff --git a/veilid-tools/src/lib.rs b/veilid-tools/src/lib.rs index 0e2f1d94..a2844887 100644 --- a/veilid-tools/src/lib.rs +++ b/veilid-tools/src/lib.rs @@ -1,4 +1,5 @@ // mod bump_port; +mod assembly_buffer; mod async_peek_stream; mod async_tag_lock; mod callback_state_machine; @@ -88,6 +89,7 @@ cfg_if! { } // pub use bump_port::*; +pub use assembly_buffer::*; pub use async_peek_stream::*; pub use async_tag_lock::*; pub use callback_state_machine::*; diff --git a/veilid-tools/src/tests/native/mod.rs b/veilid-tools/src/tests/native/mod.rs index 75a481a8..65da6ca5 100644 --- a/veilid-tools/src/tests/native/mod.rs +++ b/veilid-tools/src/tests/native/mod.rs @@ -1,6 +1,7 @@ //! Test suite for Native #![cfg(not(target_arch = "wasm32"))] +mod test_assembly_buffer; mod test_async_peek_stream; use super::*; @@ -16,6 +17,8 @@ pub async fn run_all_tests() { test_async_peek_stream::test_all().await; info!("TEST: exec_test_async_tag_lock"); test_async_tag_lock::test_all().await; + info!("TEST: exec_test_assembly_buffer"); + test_assembly_buffer::test_all().await; info!("Finished unit tests"); } @@ -96,5 +99,14 @@ cfg_if! { test_async_tag_lock::test_all().await; }); } + + #[test] + #[serial] + fn run_test_assembly_buffer() { + setup(); + block_on(async { + test_assembly_buffer::test_all().await; + }); + } } } diff --git a/veilid-tools/src/tests/native/test_assembly_buffer.rs b/veilid-tools/src/tests/native/test_assembly_buffer.rs new file mode 100644 index 00000000..45c448ac --- /dev/null +++ b/veilid-tools/src/tests/native/test_assembly_buffer.rs @@ -0,0 +1,63 @@ +use crate::*; + +fn random_sockaddr() -> SocketAddr { + if get_random_u32() & 1 == 0 { + let mut addr = [0u8; 16]; + random_bytes(&mut addr); + let port = get_random_u32() as u16; + SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(addr), port, 0, 0)) + } else { + let mut addr = [0u8; 4]; + random_bytes(&mut addr); + let port = get_random_u32() as u16; + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(addr), port)) + } +} + +pub async fn test_single_out_in() { + let assbuf_out = AssemblyBuffer::new(); + let assbuf_in = AssemblyBuffer::new(); + let (net_tx, net_rx) = flume::unbounded(); + let sender = |framed_chunk: Vec, remote_addr: SocketAddr| { + let net_tx = net_tx.clone(); + async move { + net_tx + .send_async((framed_chunk, remote_addr)) + .await + .expect("should send"); + Ok(NetworkResult::value(())) + } + }; + + for _ in 0..1000 { + let message = vec![1u8; 1000]; + let remote_addr = random_sockaddr(); + + // Send single message below fragmentation limit + assert!(matches!( + assbuf_out + .split_message(message.clone(), remote_addr, sender) + .await, + Ok(NetworkResult::Value(())) + )); + + // Ensure we didn't fragment + let (frame, r_remote_addr) = net_rx.recv_async().await.expect("should recv"); + + // Send to input + let r_message = assbuf_in + .insert_frame(&frame, r_remote_addr) + .expect("should get one out"); + + // We should have gotten the same message + assert_eq!(r_message, message); + assert_eq!(r_remote_addr, remote_addr); + } + + // Shoud have consumed everything + assert!(net_rx.is_empty()) +} + +pub async fn test_all() { + test_single_out_in().await; +}