From ce19a1bfbf76f02108105ae086581cbf747e6dbf Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 24 Aug 2022 13:49:05 -0400 Subject: [PATCH] fix asyncpeekstream --- .../network_manager/native/protocol/tcp.rs | 4 +- .../src/network_manager/native/protocol/ws.rs | 8 ++++ .../tests/native/test_async_peek_stream.rs | 46 +++++++++++++++++++ veilid-core/src/xx/async_peek_stream.rs | 33 +++++++------ 4 files changed, 76 insertions(+), 15 deletions(-) diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 2749bf8e..1237044d 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -68,7 +68,6 @@ impl RawTcpNetworkConnection { let mut header = [0u8; 4]; network_result_try!(stream.read_exact(&mut header).await.into_network_result()?); - if header[0] != b'V' || header[1] != b'L' { bail_io_error_other!("received invalid TCP frame header"); } @@ -78,7 +77,8 @@ impl RawTcpNetworkConnection { } let mut out: Vec = vec![0u8; len]; - network_result_try!(stream.read_exact(&mut out).await.into_network_result()?); + let nrout = stream.read_exact(&mut out).await.into_network_result()?; + network_result_try!(nrout); Ok(NetworkResult::Value(out)) } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 69ff5d8a..8a6d8909 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -81,6 +81,14 @@ where Ok(v) => NetworkResult::value(v), Err(e) => err_to_network_result(e), }; + if !out.is_value() { + tracing::Span::current().record("network_result", &tracing::field::display(&out)); + return Ok(out); + } + let out = match self.stream.clone().flush().await { + Ok(v) => NetworkResult::value(v), + Err(e) => err_to_network_result(e), + }; tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } diff --git a/veilid-core/src/tests/native/test_async_peek_stream.rs b/veilid-core/src/tests/native/test_async_peek_stream.rs index b23e0759..f73f95b2 100644 --- a/veilid-core/src/tests/native/test_async_peek_stream.rs +++ b/veilid-core/src/tests/native/test_async_peek_stream.rs @@ -294,6 +294,51 @@ pub async fn test_peek_some_read_peek_some_read_all_read() { assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec()); } +pub async fn test_peek_exact_read_peek_exact_read_all_read() { + info!("test_peek_exact_read_peek_exact_read_all_read"); + + let (mut a, mut c) = make_async_peek_stream_loopback().await; + + // write everything + let outbuf = MESSAGE.to_vec(); + a.write_all(&outbuf).await.unwrap(); + + // peek partially + let mut peekbuf1: Vec = Vec::new(); + peekbuf1.resize(outbuf.len() / 4, 0u8); + let peeksize1 = c.peek_exact(&mut peekbuf1).await.unwrap(); + assert_eq!(peeksize1, peekbuf1.len()); + + // read partially + let mut inbuf1: Vec = Vec::new(); + inbuf1.resize(peeksize1 - 1, 0u8); + c.read_exact(&mut inbuf1).await.unwrap(); + + // peek partially + let mut peekbuf2: Vec = Vec::new(); + peekbuf2.resize(2, 0u8); + let peeksize2 = c.peek_exact(&mut peekbuf2).await.unwrap(); + assert_eq!(peeksize2, peekbuf2.len()); + // read partially + let mut inbuf2: Vec = Vec::new(); + inbuf2.resize(1, 0u8); + c.read_exact(&mut inbuf2).await.unwrap(); + + // read remaining + let mut inbuf3: Vec = Vec::new(); + inbuf3.resize(outbuf.len() - peeksize1, 0u8); + c.read_exact(&mut inbuf3).await.unwrap(); + + assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec()); + assert_eq!(inbuf1, outbuf[0..peeksize1 - 1].to_vec()); + assert_eq!( + peekbuf2[0..peeksize2].to_vec(), + outbuf[peeksize1 - 1..peeksize1 + 1].to_vec() + ); + assert_eq!(inbuf2, peekbuf2[0..1].to_vec()); + assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec()); +} + pub async fn test_all() { test_nothing().await; test_no_peek().await; @@ -303,4 +348,5 @@ pub async fn test_all() { test_peek_some_read_peek_some_read().await; test_peek_some_read_peek_all_read().await; test_peek_some_read_peek_some_read_all_read().await; + test_peek_exact_read_peek_exact_read_all_read().await; } diff --git a/veilid-core/src/xx/async_peek_stream.rs b/veilid-core/src/xx/async_peek_stream.rs index 57537e75..dd64ea26 100644 --- a/veilid-core/src/xx/async_peek_stream.rs +++ b/veilid-core/src/xx/async_peek_stream.rs @@ -1,5 +1,4 @@ use super::*; -use futures_util::AsyncReadExt; use std::io; use task::{Context, Poll}; @@ -32,10 +31,10 @@ impl Future for Peek<'_> { if buf_len > inner.peekbuf_len { // inner.peekbuf.resize(buf_len, 0u8); - let mut read_future = inner - .stream - .read(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]); - let read_len = match Pin::new(&mut read_future).poll(cx) { + let read_len = match Pin::new(&mut inner.stream).poll_read( + cx, + &mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len], + ) { Poll::Pending => { inner.peekbuf.resize(inner.peekbuf_len, 0u8); return Poll::Pending; @@ -60,6 +59,7 @@ impl Future for Peek<'_> { pub struct PeekExact<'a> { aps: AsyncPeekStream, buf: &'a mut [u8], + cur_read: usize, } impl Unpin for PeekExact<'_> {} @@ -78,10 +78,10 @@ impl Future for PeekExact<'_> { if buf_len > inner.peekbuf_len { // inner.peekbuf.resize(buf_len, 0u8); - let mut read_future = inner - .stream - .read_exact(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]); - match Pin::new(&mut read_future).poll(cx) { + let read_len = match Pin::new(&mut inner.stream).poll_read( + cx, + &mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len], + ) { Poll::Pending => { inner.peekbuf.resize(inner.peekbuf_len, 0u8); return Poll::Pending; @@ -89,13 +89,19 @@ impl Future for PeekExact<'_> { Poll::Ready(Err(e)) => { return Poll::Ready(Err(e)); } - Poll::Ready(Ok(())) => (), + Poll::Ready(Ok(v)) => v, }; - inner.peekbuf_len = buf_len; + inner.peekbuf_len += read_len; + inner.peekbuf.resize(inner.peekbuf_len, 0u8); copy_len = inner.peekbuf_len; } - this.buf[..copy_len].copy_from_slice(&inner.peekbuf[..copy_len]); - Poll::Ready(Ok(copy_len)) + this.buf[this.cur_read..copy_len].copy_from_slice(&inner.peekbuf[this.cur_read..copy_len]); + this.cur_read = copy_len; + if this.cur_read == buf_len { + Poll::Ready(Ok(buf_len)) + } else { + Poll::Pending + } } } ///////// @@ -139,6 +145,7 @@ impl AsyncPeekStream { PeekExact::<'a> { aps: self.clone(), buf, + cur_read: 0, } } }