From fc2c08c7c9ca7d42f70629275ab108a9696f0daa Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 25 Feb 2021 12:52:38 +1100 Subject: [PATCH 1/2] Error only on close message when fetching the rate Messages Ping, Pong and Binary are ignored and not reported as error. --- swap/src/asb/kraken.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index 4f8e36b0..4cdad0ec 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -36,8 +36,8 @@ impl LatestRate for RateService { pub enum Error { #[error("Rate has not yet been retrieved from Kraken websocket API")] NotYetRetrieved, - #[error("Message is not text")] - NonTextMessage, + #[error("Received close message from Kraken")] + CloseMessage, #[error("Websocket: ")] WebSocket(String), #[error("Serde: ")] @@ -77,8 +77,11 @@ impl RateService { while let Some(msg) = rate_stream.next().await { let msg = match msg { Ok(Message::Text(msg)) => msg, + Ok(Message::Close(..)) => { + let _ = rate_update.send(Err(Error::CloseMessage)); + continue; + } Ok(_) => { - let _ = rate_update.send(Err(Error::NonTextMessage)); continue; } Err(e) => { From 1f1b3a95bcbee0bf2e5247344b1f14223944e312 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 26 Feb 2021 12:44:49 +1100 Subject: [PATCH 2/2] Logging for different scenarios when reading from rate stream --- swap/src/asb/kraken.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index 4cdad0ec..27020597 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -7,7 +7,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::convert::TryFrom; use tokio::sync::watch; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::{protocol::CloseFrame, Message}; +use tracing::{error, trace}; use watch::Receiver; const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; @@ -77,14 +78,27 @@ impl RateService { while let Some(msg) = rate_stream.next().await { let msg = match msg { Ok(Message::Text(msg)) => msg, - Ok(Message::Close(..)) => { + Ok(Message::Close(close_frame)) => { + if let Some(CloseFrame { code, reason }) = close_frame { + error!( + "Kraken rate stream was closed with code {} and reason: {}", + code, reason + ); + } else { + error!("Kraken rate stream was closed without code and reason"); + } let _ = rate_update.send(Err(Error::CloseMessage)); continue; } - Ok(_) => { + Ok(msg) => { + trace!( + "Kraken rate stream returned non text message that will be ignored: {}", + msg + ); continue; } Err(e) => { + error!("Error when reading from Kraken rate stream: {}", e); let _ = rate_update.send(Err(e.into())); continue; }