diff --git a/veilid-core/src/network_manager/native/protocol/assembly_buffer.rs b/veilid-core/src/network_manager/native/protocol/assembly_buffer.rs new file mode 100644 index 00000000..1a9de554 --- /dev/null +++ b/veilid-core/src/network_manager/native/protocol/assembly_buffer.rs @@ -0,0 +1,224 @@ +use super::*; +use range_set_blaze::RangeSetBlaze; +use std::io::{Error, ErrorKind}; +use std::sync::atomic::{AtomicU16, Ordering}; + +// AssemblyBuffer Version 1 properties +const VERSION_1: u8 = 1; +type LengthType = u16; +type SequenceType = u16; +const HEADER_LEN: usize = 8; +const MAX_MESSAGE_LEN: usize = LengthType::MAX as usize; + +// XXX: keep statistics on all drops and why we dropped them +// XXX: move to config +const FRAGMENT_LEN: usize = 1280 - HEADER_LEN; +const MAX_CONCURRENT_HOSTS: usize = 256; +const MAX_ASSEMBLIES_PER_HOST: usize = 256; +const MAX_BUFFER_PER_HOST: usize = 256 * 1024; + +///////////////////////////////////////////////////////// + +pub struct Message { + data: Vec, + remote_addr: SocketAddr, +} + +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct PeerKey { + remote_addr: SocketAddr, +} + +#[derive(Clone, Eq, PartialEq)] +struct MessageAssembly { + seq: SequenceType, + data: Vec, + parts: RangeSetBlaze, +} + +#[derive(Clone, Eq, PartialEq)] +struct PeerMessages { + assemblies: Vec, +} + +impl PeerMessages { + pub fn new() -> Self { + Self { + assemblies: Vec::new(), + } + } +} + +///////////////////////////////////////////////////////// + +struct AssemblyBufferInner { + peer_message_map: HashMap, +} + +struct AssemblyBufferUnlockedInner { + outbound_lock_table: AsyncTagLockTable, + next_seq: AtomicU16, +} + +/// Packet reassembly and fragmentation handler +/// No retry, no acknowledgment, no flow control +/// Just trying to survive lower path MTU for larger messages +#[derive(Clone)] +pub struct AssemblyBuffer { + inner: Arc>, + unlocked_inner: Arc, +} + +impl AssemblyBuffer { + pub fn new_unlocked_inner() -> AssemblyBufferUnlockedInner { + AssemblyBufferUnlockedInner { + outbound_lock_table: AsyncTagLockTable::new(), + next_seq: AtomicU16::new(0), + } + } + pub fn new_inner() -> AssemblyBufferInner { + AssemblyBufferInner { + peer_message_map: HashMap::new(), + } + } + + pub fn new(frag_len: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(Self::new_inner())), + unlocked_inner: Arc::new(Self::new_unlocked_inner()), + } + } + + /// Receive a packet chunk and add to the message assembly + /// if a message has been completely, return it + pub fn receive_packet(&self, frame: &[u8], remote_addr: SocketAddr) -> Option { + // If we receive a zero length frame, send it + if frame.len() == 0 { + return Some(Message { + data: frame.to_vec(), + remote_addr, + }); + } + + // If we receive a frame smaller than or equal to the length of the header, drop it + // or if this frame is larger than our max message length, then drop it + if frame.len() <= HEADER_LEN || frame.len() > MAX_MESSAGE_LEN { + return None; + } + + // --- Decode the header + + // Drop versions we don't understand + if frame[0] != VERSION_1 { + return None; + } + // Version 1 header + let seq = SequenceType::from_be_bytes(frame[2..4].try_into().unwrap()); + let off = LengthType::from_be_bytes(frame[4..6].try_into().unwrap()); + let len = LengthType::from_be_bytes(frame[6..HEADER_LEN].try_into().unwrap()); + let chunk = &frame[HEADER_LEN..]; + + // See if we have a whole message and not a fragment + if off == 0 && len as usize == chunk.len() { + return Some(Message { + data: frame.to_vec(), + remote_addr, + }); + } + + // Drop fragments with offsets greater than or equal to the message length + if off >= len { + return None; + } + // Drop fragments where the chunk would be applied beyond the message length + if off as usize + chunk.len() > len as usize { + return None; + } + + // Get or create the peer message assemblies + // and drop the packet if we have too many peers + let mut inner = self.inner.lock(); + let peer_key = PeerKey { remote_addr }; + let peer_messages = match inner.peer_message_map.entry(peer_key) { + std::collections::hash_map::Entry::Occupied(e) => e.get_mut(), + std::collections::hash_map::Entry::Vacant(v) => { + // See if we have room for one more + if inner.peer_message_map.len() == MAX_CONCURRENT_HOSTS { + return None; + } + // Add the peer + v.insert(PeerMessages::new()) + } + }; + + None + } + + /// Add framing to chunk to send to the wire + fn frame_chunk(chunk: &[u8], offset: usize, message_len: usize, seq: SequenceType) -> Vec { + assert!(chunk.len() > 0); + assert!(message_len <= MAX_MESSAGE_LEN); + assert!(offset + chunk.len() <= message_len); + + let off: LengthType = offset as LengthType; + let len: LengthType = message_len as LengthType; + + unsafe { + // Uninitialized vector, careful! + let mut out = unaligned_u8_vec_uninit(chunk.len() + HEADER_LEN); + + // Write out header + out[0] = VERSION_1; + out[1] = 0; // reserved + out[2..4].copy_from_slice(&seq.to_be_bytes()); // sequence number + out[4..6].copy_from_slice(&off.to_be_bytes()); // offset of chunk inside message + out[6..HEADER_LEN].copy_from_slice(&len.to_be_bytes()); // total length of message + + // Write out body + out[HEADER_LEN..out.len()].copy_from_slice(chunk); + out + } + } + + /// Split a message into packets and send them serially, ensuring + /// that they are sent consecutively to a particular remote address, + /// never interleaving packets from one message and other to minimize reassembly problems + pub async fn split_message(&self, message: Message, sender: F) -> std::io::Result<()> + where + F: Fn(Vec, SocketAddr) -> SendPinBoxFuture>, + { + if message.data.len() > MAX_MESSAGE_LEN { + return Err(Error::from(ErrorKind::InvalidData)); + } + + // Do not frame or split anything zero bytes long, just send it + if message.data.len() == 0 { + sender(message.data, message.remote_addr).await?; + return Ok(()); + } + + // Lock per remote addr + let _tag_lock = self + .unlocked_inner + .outbound_lock_table + .lock_tag(message.remote_addr) + .await; + + // Get a message seq + let seq = self.unlocked_inner.next_seq.fetch_add(1, Ordering::Relaxed); + + // Chunk it up + let mut offset = 0usize; + let message_len = message.data.len(); + for chunk in message.data.chunks(FRAGMENT_LEN) { + // Frame chunk + let framed_chunk = Self::frame_chunk(chunk, offset, message_len, seq); + // Send chunk + sender(framed_chunk, message.remote_addr).await?; + // Go to next chunk + offset += chunk.len() + } + + Ok(()) + } +} diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index 0a41a77b..e7de0f50 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -4,6 +4,8 @@ pub mod udp; pub mod wrtc; pub mod ws; +mod assembly_buffer; + use super::*; use std::io; diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index a8fe5ce8..662e8715 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -1,14 +1,19 @@ use super::*; +use assembly_buffer::*; use sockets::*; #[derive(Clone)] pub struct RawUdpProtocolHandler { socket: Arc, + assembly_buffer: AssemblyBuffer, } impl RawUdpProtocolHandler { pub fn new(socket: Arc) -> Self { - Self { socket } + Self { + socket, + assembly_buffer: AssemblyBuffer::new(), + } } // #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor))] diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index f9b41323..1c49ec4a 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -878,6 +878,8 @@ impl RPCProcessor { ) { // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { + log_rpc!(debug "RPC Question Lost: {:?}", node_ref); + node_ref.stats_question_lost(); // Also clear the last_connections for the entry so we make a new connection next time diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 2f39e200..02f402a2 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -152,7 +152,7 @@ where .into_timeout_or(); Ok(res .on_timeout(|| { - log_rpc!(debug "op wait timed out: {}", handle.op_id); + // log_rpc!(debug "op wait timed out: {}", handle.op_id); // debug_print_backtrace(); self.cancel_op_waiter(handle.op_id); })