mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-12-25 23:39:22 -05:00
fix asyncpeekstream
This commit is contained in:
parent
cd99a4210b
commit
ce19a1bfbf
@ -68,7 +68,6 @@ impl RawTcpNetworkConnection {
|
|||||||
let mut header = [0u8; 4];
|
let mut header = [0u8; 4];
|
||||||
|
|
||||||
network_result_try!(stream.read_exact(&mut header).await.into_network_result()?);
|
network_result_try!(stream.read_exact(&mut header).await.into_network_result()?);
|
||||||
|
|
||||||
if header[0] != b'V' || header[1] != b'L' {
|
if header[0] != b'V' || header[1] != b'L' {
|
||||||
bail_io_error_other!("received invalid TCP frame header");
|
bail_io_error_other!("received invalid TCP frame header");
|
||||||
}
|
}
|
||||||
@ -78,7 +77,8 @@ impl RawTcpNetworkConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut out: Vec<u8> = vec![0u8; len];
|
let mut out: Vec<u8> = vec![0u8; len];
|
||||||
network_result_try!(stream.read_exact(&mut out).await.into_network_result()?);
|
let nrout = stream.read_exact(&mut out).await.into_network_result()?;
|
||||||
|
network_result_try!(nrout);
|
||||||
|
|
||||||
Ok(NetworkResult::Value(out))
|
Ok(NetworkResult::Value(out))
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,14 @@ where
|
|||||||
Ok(v) => NetworkResult::value(v),
|
Ok(v) => NetworkResult::value(v),
|
||||||
Err(e) => err_to_network_result(e),
|
Err(e) => err_to_network_result(e),
|
||||||
};
|
};
|
||||||
|
if !out.is_value() {
|
||||||
|
tracing::Span::current().record("network_result", &tracing::field::display(&out));
|
||||||
|
return Ok(out);
|
||||||
|
}
|
||||||
|
let out = match self.stream.clone().flush().await {
|
||||||
|
Ok(v) => NetworkResult::value(v),
|
||||||
|
Err(e) => err_to_network_result(e),
|
||||||
|
};
|
||||||
tracing::Span::current().record("network_result", &tracing::field::display(&out));
|
tracing::Span::current().record("network_result", &tracing::field::display(&out));
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
@ -294,6 +294,51 @@ pub async fn test_peek_some_read_peek_some_read_all_read() {
|
|||||||
assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec());
|
assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn test_peek_exact_read_peek_exact_read_all_read() {
|
||||||
|
info!("test_peek_exact_read_peek_exact_read_all_read");
|
||||||
|
|
||||||
|
let (mut a, mut c) = make_async_peek_stream_loopback().await;
|
||||||
|
|
||||||
|
// write everything
|
||||||
|
let outbuf = MESSAGE.to_vec();
|
||||||
|
a.write_all(&outbuf).await.unwrap();
|
||||||
|
|
||||||
|
// peek partially
|
||||||
|
let mut peekbuf1: Vec<u8> = Vec::new();
|
||||||
|
peekbuf1.resize(outbuf.len() / 4, 0u8);
|
||||||
|
let peeksize1 = c.peek_exact(&mut peekbuf1).await.unwrap();
|
||||||
|
assert_eq!(peeksize1, peekbuf1.len());
|
||||||
|
|
||||||
|
// read partially
|
||||||
|
let mut inbuf1: Vec<u8> = Vec::new();
|
||||||
|
inbuf1.resize(peeksize1 - 1, 0u8);
|
||||||
|
c.read_exact(&mut inbuf1).await.unwrap();
|
||||||
|
|
||||||
|
// peek partially
|
||||||
|
let mut peekbuf2: Vec<u8> = Vec::new();
|
||||||
|
peekbuf2.resize(2, 0u8);
|
||||||
|
let peeksize2 = c.peek_exact(&mut peekbuf2).await.unwrap();
|
||||||
|
assert_eq!(peeksize2, peekbuf2.len());
|
||||||
|
// read partially
|
||||||
|
let mut inbuf2: Vec<u8> = Vec::new();
|
||||||
|
inbuf2.resize(1, 0u8);
|
||||||
|
c.read_exact(&mut inbuf2).await.unwrap();
|
||||||
|
|
||||||
|
// read remaining
|
||||||
|
let mut inbuf3: Vec<u8> = Vec::new();
|
||||||
|
inbuf3.resize(outbuf.len() - peeksize1, 0u8);
|
||||||
|
c.read_exact(&mut inbuf3).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
|
||||||
|
assert_eq!(inbuf1, outbuf[0..peeksize1 - 1].to_vec());
|
||||||
|
assert_eq!(
|
||||||
|
peekbuf2[0..peeksize2].to_vec(),
|
||||||
|
outbuf[peeksize1 - 1..peeksize1 + 1].to_vec()
|
||||||
|
);
|
||||||
|
assert_eq!(inbuf2, peekbuf2[0..1].to_vec());
|
||||||
|
assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn test_all() {
|
pub async fn test_all() {
|
||||||
test_nothing().await;
|
test_nothing().await;
|
||||||
test_no_peek().await;
|
test_no_peek().await;
|
||||||
@ -303,4 +348,5 @@ pub async fn test_all() {
|
|||||||
test_peek_some_read_peek_some_read().await;
|
test_peek_some_read_peek_some_read().await;
|
||||||
test_peek_some_read_peek_all_read().await;
|
test_peek_some_read_peek_all_read().await;
|
||||||
test_peek_some_read_peek_some_read_all_read().await;
|
test_peek_some_read_peek_some_read_all_read().await;
|
||||||
|
test_peek_exact_read_peek_exact_read_all_read().await;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use futures_util::AsyncReadExt;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use task::{Context, Poll};
|
use task::{Context, Poll};
|
||||||
|
|
||||||
@ -32,10 +31,10 @@ impl Future for Peek<'_> {
|
|||||||
if buf_len > inner.peekbuf_len {
|
if buf_len > inner.peekbuf_len {
|
||||||
//
|
//
|
||||||
inner.peekbuf.resize(buf_len, 0u8);
|
inner.peekbuf.resize(buf_len, 0u8);
|
||||||
let mut read_future = inner
|
let read_len = match Pin::new(&mut inner.stream).poll_read(
|
||||||
.stream
|
cx,
|
||||||
.read(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]);
|
&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len],
|
||||||
let read_len = match Pin::new(&mut read_future).poll(cx) {
|
) {
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
@ -60,6 +59,7 @@ impl Future for Peek<'_> {
|
|||||||
pub struct PeekExact<'a> {
|
pub struct PeekExact<'a> {
|
||||||
aps: AsyncPeekStream,
|
aps: AsyncPeekStream,
|
||||||
buf: &'a mut [u8],
|
buf: &'a mut [u8],
|
||||||
|
cur_read: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Unpin for PeekExact<'_> {}
|
impl Unpin for PeekExact<'_> {}
|
||||||
@ -78,10 +78,10 @@ impl Future for PeekExact<'_> {
|
|||||||
if buf_len > inner.peekbuf_len {
|
if buf_len > inner.peekbuf_len {
|
||||||
//
|
//
|
||||||
inner.peekbuf.resize(buf_len, 0u8);
|
inner.peekbuf.resize(buf_len, 0u8);
|
||||||
let mut read_future = inner
|
let read_len = match Pin::new(&mut inner.stream).poll_read(
|
||||||
.stream
|
cx,
|
||||||
.read_exact(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]);
|
&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len],
|
||||||
match Pin::new(&mut read_future).poll(cx) {
|
) {
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
@ -89,13 +89,19 @@ impl Future for PeekExact<'_> {
|
|||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
return Poll::Ready(Err(e));
|
return Poll::Ready(Err(e));
|
||||||
}
|
}
|
||||||
Poll::Ready(Ok(())) => (),
|
Poll::Ready(Ok(v)) => v,
|
||||||
};
|
};
|
||||||
inner.peekbuf_len = buf_len;
|
inner.peekbuf_len += read_len;
|
||||||
|
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
||||||
copy_len = inner.peekbuf_len;
|
copy_len = inner.peekbuf_len;
|
||||||
}
|
}
|
||||||
this.buf[..copy_len].copy_from_slice(&inner.peekbuf[..copy_len]);
|
this.buf[this.cur_read..copy_len].copy_from_slice(&inner.peekbuf[this.cur_read..copy_len]);
|
||||||
Poll::Ready(Ok(copy_len))
|
this.cur_read = copy_len;
|
||||||
|
if this.cur_read == buf_len {
|
||||||
|
Poll::Ready(Ok(buf_len))
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/////////
|
/////////
|
||||||
@ -139,6 +145,7 @@ impl AsyncPeekStream {
|
|||||||
PeekExact::<'a> {
|
PeekExact::<'a> {
|
||||||
aps: self.clone(),
|
aps: self.clone(),
|
||||||
buf,
|
buf,
|
||||||
|
cur_read: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user