Rename variables to add to understanding the code

This commit is contained in:
Daniel Karzel 2021-02-22 13:43:57 +11:00
parent 9496dce917
commit 151f33ba10

View File

@ -66,23 +66,23 @@ impl From<serde_json::Error> for Error {
impl RateService { impl RateService {
pub async fn new() -> Result<Self> { pub async fn new() -> Result<Self> {
let (tx, rx) = watch::channel(Err(Error::NotYetRetrieved)); let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved));
let (ws, _response) = let (rate_stream, _response) =
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
let (mut write, mut read) = ws.split(); let (mut rate_stream_sink, mut rate_stream) = rate_stream.split();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(msg) = read.next().await { while let Some(msg) = rate_stream.next().await {
let msg = match msg { let msg = match msg {
Ok(Message::Text(msg)) => msg, Ok(Message::Text(msg)) => msg,
Ok(_) => { Ok(_) => {
let _ = tx.send(Err(Error::NonTextMessage)); let _ = rate_update.send(Err(Error::NonTextMessage));
continue; continue;
} }
Err(e) => { Err(e) => {
let _ = tx.send(Err(e.into())); let _ = rate_update.send(Err(e.into()));
continue; continue;
} }
}; };
@ -95,7 +95,7 @@ impl RateService {
let ticker = match serde_json::from_str::<TickerUpdate>(&msg) { let ticker = match serde_json::from_str::<TickerUpdate>(&msg) {
Ok(ticker) => ticker, Ok(ticker) => ticker,
Err(e) => { Err(e) => {
let _ = tx.send(Err(e.into())); let _ = rate_update.send(Err(e.into()));
continue; continue;
} }
}; };
@ -103,18 +103,22 @@ impl RateService {
let rate = match Rate::try_from(ticker) { let rate = match Rate::try_from(ticker) {
Ok(rate) => rate, Ok(rate) => rate,
Err(e) => { Err(e) => {
let _ = tx.send(Err(e)); let _ = rate_update.send(Err(e));
continue; continue;
} }
}; };
let _ = tx.send(Ok(rate)); let _ = rate_update.send(Ok(rate));
} }
}); });
write.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()).await?; rate_stream_sink
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
.await?;
Ok(Self { receiver: rx }) Ok(Self {
receiver: rate_update_receiver,
})
} }
} }