fix errors

This commit is contained in:
John Smith 2022-07-22 14:08:46 -04:00
parent 6f6ec298cf
commit 4fd8a562ba
3 changed files with 27 additions and 42 deletions

View File

@ -17,23 +17,15 @@ cfg_if! {
} }
} }
fn to_io(err: async_tungstenite::tungstenite::Error) -> io::Error { fn err_to_network_result<T>(err: async_tungstenite::tungstenite::Error) -> NetworkResult<T> {
let kind = match err { match err {
async_tungstenite::tungstenite::Error::ConnectionClosed => io::ErrorKind::ConnectionReset, async_tungstenite::tungstenite::Error::ConnectionClosed
async_tungstenite::tungstenite::Error::AlreadyClosed => io::ErrorKind::NotConnected, | async_tungstenite::tungstenite::Error::AlreadyClosed
async_tungstenite::tungstenite::Error::Io(x) => { | async_tungstenite::tungstenite::Error::Io(_) => {
return x; NetworkResult::NoConnection(to_io_error_other(err))
} }
async_tungstenite::tungstenite::Error::Tls(_) => io::ErrorKind::InvalidData, _ => NetworkResult::InvalidMessage(err.to_string()),
async_tungstenite::tungstenite::Error::Capacity(_) => io::ErrorKind::Other, }
async_tungstenite::tungstenite::Error::Protocol(_) => io::ErrorKind::Other,
async_tungstenite::tungstenite::Error::SendQueueFull(_) => io::ErrorKind::Other,
async_tungstenite::tungstenite::Error::Utf8 => io::ErrorKind::Other,
async_tungstenite::tungstenite::Error::Url(_) => io::ErrorKind::Other,
async_tungstenite::tungstenite::Error::Http(_) => io::ErrorKind::Other,
async_tungstenite::tungstenite::Error::HttpFormat(_) => io::ErrorKind::Other,
};
io::Error::new(kind, err)
} }
pub type WebSocketNetworkConnectionAccepted = WebsocketNetworkConnection<AsyncPeekStream>; pub type WebSocketNetworkConnectionAccepted = WebsocketNetworkConnection<AsyncPeekStream>;
@ -73,11 +65,11 @@ where
// #[instrument(level = "trace", err, skip(self))] // #[instrument(level = "trace", err, skip(self))]
// pub async fn close(&self) -> io::Result<()> { // pub async fn close(&self) -> io::Result<()> {
// // Make an attempt to flush the stream // // Make an attempt to flush the stream
// self.stream.clone().close().await.map_err(to_io)?; // self.stream.clone().close().await.map_err(to_io_error_other)?;
// // Then forcibly close the socket // // Then forcibly close the socket
// self.tcp_stream // self.tcp_stream
// .shutdown(Shutdown::Both) // .shutdown(Shutdown::Both)
// .map_err(to_io) // .map_err(to_io_error_other)
// } // }
#[instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len()))] #[instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len()))]
@ -85,13 +77,10 @@ where
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("received too large WS message"); bail_io_error_other!("received too large WS message");
} }
let out = self let out = match self.stream.clone().send(Message::binary(message)).await {
.stream Ok(v) => NetworkResult::value(v),
.clone() Err(e) => err_to_network_result(e),
.send(Message::binary(message)) };
.await
.map_err(to_io)
.into_network_result()?;
tracing::Span::current().record("network_result", &tracing::field::display(&out)); tracing::Span::current().record("network_result", &tracing::field::display(&out));
Ok(out) Ok(out)
} }
@ -112,16 +101,14 @@ where
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"closeframe", "closeframe",
)), )),
Some(Ok(x)) => { Some(Ok(x)) => NetworkResult::NoConnection(io::Error::new(
return Err(io::Error::new( io::ErrorKind::ConnectionReset,
io::ErrorKind::InvalidData, format!("Unexpected WS message type: {:?}", x),
format!("Unexpected WS message type: {:?}", x), )),
)); Some(Err(e)) => err_to_network_result(e),
}
Some(Err(e)) => return Err(to_io(e)),
None => NetworkResult::NoConnection(io::Error::new( None => NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"connection ended", "connection ended normally",
)), )),
}; };

View File

@ -289,7 +289,7 @@ impl NetworkManager {
) { ) {
// Add this our futures to process in parallel // Add this our futures to process in parallel
let routing_table = routing_table.clone(); let routing_table = routing_table.clone();
unord.push(intf::spawn(async move { unord.push(async move {
// Need VALID signed peer info, so ask bootstrap to find_node of itself // Need VALID signed peer info, so ask bootstrap to find_node of itself
// which will ensure it has the bootstrap's signed peer info as part of the response // which will ensure it has the bootstrap's signed peer info as part of the response
let _ = routing_table.find_target(nr.clone()).await; let _ = routing_table.find_target(nr.clone()).await;
@ -305,7 +305,7 @@ impl NetworkManager {
// otherwise this bootstrap is valid, lets ask it to find ourselves now // otherwise this bootstrap is valid, lets ask it to find ourselves now
routing_table.reverse_find_node(nr, true).await routing_table.reverse_find_node(nr, true).await
} }
})); });
} }
} }
@ -333,7 +333,7 @@ impl NetworkManager {
let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id); let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id);
for nr in node_refs { for nr in node_refs {
let rpc = rpc.clone(); let rpc = rpc.clone();
unord.push(intf::spawn(async move { rpc.rpc_call_status(nr).await })); unord.push(async move { rpc.rpc_call_status(nr).await });
} }
// Wait for futures to complete // Wait for futures to complete
@ -360,9 +360,7 @@ impl NetworkManager {
for nr in noderefs { for nr in noderefs {
log_net!("--- peer minimum search with {:?}", nr); log_net!("--- peer minimum search with {:?}", nr);
let routing_table = routing_table.clone(); let routing_table = routing_table.clone();
unord.push(intf::spawn(async move { unord.push(async move { routing_table.reverse_find_node(nr, false).await });
routing_table.reverse_find_node(nr, false).await
}));
} }
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
@ -373,7 +371,7 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(super) async fn relay_management_task_routine( pub(super) async fn relay_management_task_routine(
self, self,
stop_token: StopToken, _stop_token: StopToken,
_last_ts: u64, _last_ts: u64,
cur_ts: u64, cur_ts: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
@ -455,7 +453,7 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(super) async fn rolling_transfers_task_routine( pub(super) async fn rolling_transfers_task_routine(
self, self,
stop_token: StopToken, _stop_token: StopToken,
last_ts: u64, last_ts: u64,
cur_ts: u64, cur_ts: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {

View File

@ -195,7 +195,7 @@ impl<T> From<NetworkResult<T>> for Option<T> {
// match self { // match self {
// Self::Timeout => Self::Timeout, // Self::Timeout => Self::Timeout,
// Self::NoConnection(e) => Self::NoConnection(e.clone()), // Self::NoConnection(e) => Self::NoConnection(e.clone()),
// Self::InvalidResponse(k, s) => Self::InvalidResponse(k, s.clone()), // Self::InvalidMessage(s) => Self::InvalidMessage(s.clone()),
// Self::Value(t) => Self::Value(t.clone()), // Self::Value(t) => Self::Value(t.clone()),
// } // }
// } // }