From f6ed4d65b56ba981d39de6facb2a28c0635620d1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 5 Mar 2021 13:40:39 +1100 Subject: [PATCH] Properly deal with additional messages sent from kraken --- swap/src/kraken.rs | 97 +++++++++++++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 32 deletions(-) diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index f0813022..ad66d2c6 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -7,8 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::convert::TryFrom; use tokio::sync::watch; -use tokio_tungstenite::tungstenite::protocol::CloseFrame; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite; use tracing::{error, trace}; pub async fn connect() -> Result>> { @@ -22,9 +21,9 @@ pub async fn connect() -> Result>> { tokio::spawn(async move { while let Some(msg) = rate_stream.next().await { let msg = match msg { - Ok(Message::Text(msg)) => msg, - Ok(Message::Close(close_frame)) => { - if let Some(CloseFrame { code, reason }) = close_frame { + Ok(tungstenite::Message::Text(msg)) => msg, + Ok(tungstenite::Message::Close(close_frame)) => { + if let Some(tungstenite::protocol::CloseFrame { code, reason }) = close_frame { error!( "Kraken rate stream was closed with code {} and reason: {}", code, reason @@ -32,7 +31,7 @@ pub async fn connect() -> Result>> { } else { error!("Kraken rate stream was closed without code and reason"); } - let _ = rate_update.send(Err(Error::CloseMessage)); + let _ = rate_update.send(Err(Error::ConnectionClosed)); continue; } Ok(msg) => { @@ -43,26 +42,37 @@ pub async fn connect() -> Result>> { continue; } Err(e) => { - error!("Error when reading from Kraken rate stream: {}", e); + error!(%e, "Error when reading from Kraken rate stream"); let _ = rate_update.send(Err(e.into())); continue; } }; - // If we encounter a heartbeat we skip it and iterate again - if msg.eq(r#"{"event":"heartbeat"}"#) { - continue; - } - - let ticker = match serde_json::from_str::(&msg) { - Ok(ticker) => ticker, - Err(e) => { - let _ = rate_update.send(Err(e.into())); + let update = match serde_json::from_str::(&msg) { + Ok(Event::SystemStatus) => { + tracing::debug!("Connected to Kraken websocket API"); continue; } + Ok(Event::SubscriptionStatus) => { + tracing::debug!("Subscribed to updates for ticker"); + continue; + } + Ok(Event::Heartbeat) => { + tracing::trace!("Received heartbeat message"); + continue; + } + // if the message is not an event, it is a ticker update or an unknown event + Err(_) => match serde_json::from_str::(&msg) { + Ok(ticker) => ticker, + Err(e) => { + tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg); + let _ = rate_update.send(Err(Error::UnknownMessage { msg })); + continue; + } + }, }; - let rate = match Rate::try_from(ticker) { + let rate = match Rate::try_from(update) { Ok(rate) => rate, Err(e) => { let _ = rate_update.send(Err(e)); @@ -94,12 +104,12 @@ const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" pub enum Error { #[error("Rate has not yet been retrieved from Kraken websocket API")] NotYetRetrieved, - #[error("Received close message from Kraken")] - CloseMessage, - #[error("Websocket: ")] + #[error("The Kraken server closed the websocket connection")] + ConnectionClosed, + #[error("Websocket: {0}")] WebSocket(String), - #[error("Serde: ")] - Serde(String), + #[error("Received unknown message from Kraken: {msg}")] + UnknownMessage { msg: String }, #[error("Data field is missing")] DataFieldMissing, #[error("Ask Rate Element is of unexpected type")] @@ -110,16 +120,21 @@ pub enum Error { BitcoinParseAmount(#[from] ParseAmountError), } -impl From for Error { - fn from(err: tokio_tungstenite::tungstenite::Error) -> Self { +impl From for Error { + fn from(err: tungstenite::Error) -> Self { Error::WebSocket(format!("{:#}", err)) } } -impl From for Error { - fn from(err: serde_json::Error) -> Self { - Error::Serde(format!("{:#}", err)) - } +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "event")] +enum Event { + #[serde(rename = "systemStatus")] + SystemStatus, + #[serde(rename = "heartbeat")] + Heartbeat, + #[serde(rename = "subscriptionStatus")] + SubscriptionStatus, } #[derive(Debug, Serialize, Deserialize)] @@ -176,10 +191,28 @@ impl TryFrom for Rate { mod tests { use super::*; - #[tokio::test] - async fn deserialize_ticker_update() { - let sample_response = r#"[980,{"a":["0.00521900",4,"4.84775132"],"b":["0.00520600",70,"70.35668921"],"c":["0.00520700","0.00000186"],"v":["18530.40510860","18531.94887860"],"p":["0.00489493","0.00489490"],"t":[5017,5018],"l":["0.00448300","0.00448300"],"h":["0.00525000","0.00525000"],"o":["0.00450000","0.00451000"]},"ticker","XMR/XBT"]"#; + #[test] + fn can_deserialize_system_status_event() { + let event = r#"{"connectionID":14859574189081089471,"event":"systemStatus","status":"online","version":"1.8.1"}"#; - let _ = serde_json::from_str::(sample_response).unwrap(); + let event = serde_json::from_str::(event).unwrap(); + + assert_eq!(event, Event::SystemStatus) + } + + #[test] + fn can_deserialize_subscription_status_event() { + let event = r#"{"channelID":980,"channelName":"ticker","event":"subscriptionStatus","pair":"XMR/XBT","status":"subscribed","subscription":{"name":"ticker"}}"#; + + let event = serde_json::from_str::(event).unwrap(); + + assert_eq!(event, Event::SubscriptionStatus) + } + + #[test] + fn deserialize_ticker_update() { + let message = r#"[980,{"a":["0.00440700",7,"7.35318535"],"b":["0.00440200",7,"7.57416678"],"c":["0.00440700","0.22579000"],"v":["273.75489000","4049.91233351"],"p":["0.00446205","0.00441699"],"t":[123,1310],"l":["0.00439400","0.00429900"],"h":["0.00450000","0.00450000"],"o":["0.00449100","0.00433700"]},"ticker","XMR/XBT"]"#; + + let _ = serde_json::from_str::(message).unwrap(); } }