Extract price websocket into separate module

The kraken stream should not be concerned with this.
We will have to extend the websocket to actually return quotes.
This commit is contained in:
Daniel Karzel 2021-06-04 13:50:18 +10:00
parent 4dfe24773f
commit b8f689bda0
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E
4 changed files with 52 additions and 42 deletions

View File

@ -1,5 +1,6 @@
pub mod command;
pub mod config;
pub mod price_websocket;
mod rate;
pub mod tracing;

View File

@ -0,0 +1,28 @@
use crate::kraken::PriceUpdates;
use anyhow::Context;
use futures::{stream, StreamExt, TryStreamExt};
use warp::ws::{Message, WebSocket};
pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) {
let stream = stream::try_unfold(subscription.inner, |mut receiver| async move {
// todo print error message but don't forward it to the user and don't panic
receiver
.changed()
.await
.context("failed to receive latest rate update")
.expect("Should not fail :)");
let latest_rate = receiver.borrow().clone().expect("Should work");
// TODO: Proper definition of what to send over the wire
// TODO: Properly calculate the actual rate (using spread) and add min and max
// amounts tradeable
let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize"));
Ok(Some((msg, receiver)))
})
.into_stream();
let (ws_tx, mut _ws_rx) = ws.split();
tokio::task::spawn(stream.forward(ws_tx));
}

View File

@ -26,8 +26,9 @@ use swap::asb::command::{parse_args, Arguments, Command};
use swap::asb::config::{
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
};
use swap::asb::price_websocket::latest_rate;
use swap::database::Database;
use swap::kraken::latest_rate;
use swap::kraken::PriceUpdates;
use swap::monero::Amount;
use swap::network::swarm;
use swap::protocol::alice;
@ -129,18 +130,7 @@ async fn main() -> Result<()> {
let kraken_price_updates = kraken::connect()?;
let updates = kraken_price_updates.clone();
let latest_rate = warp::get()
.and(warp::path!("api" / "rate" / "btc-xmr"))
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let updates = updates.clone();
info!("New connection received");
ws.on_upgrade(move |socket| latest_rate(socket, updates))
});
tokio::spawn(async move {
warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await;
});
setup_price_websocket(kraken_price_updates.clone()).await;
// setup Tor hidden services
let tor_client =
@ -311,6 +301,20 @@ async fn main() -> Result<()> {
Ok(())
}
async fn setup_price_websocket(price_updates: PriceUpdates) {
let latest_rate = warp::get()
.and(warp::path!("api" / "price" / "xmr-btc"))
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let price_updates = price_updates.clone();
info!("New price websocket connection");
ws.on_upgrade(move |socket| latest_rate(socket, price_updates))
});
tokio::spawn(async move {
warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await;
});
}
async fn init_bitcoin_wallet(
config: &Config,
seed: &Seed,
@ -376,7 +380,10 @@ async fn register_tor_services(
.collect::<Vec<_>>();
// TODO: Only configure if feature flag is set in config
hidden_services_details.push((3030, SocketAddr::new(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), 3030)));
hidden_services_details.push((
3030,
SocketAddr::new(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), 3030),
));
let key = seed.derive_torv3_key();

View File

@ -1,12 +1,10 @@
use anyhow::{anyhow, Context, Result};
use futures::{stream, SinkExt, Stream, StreamExt, TryStreamExt};
use serde::Deserialize;
use serde::Serialize;
use futures::{SinkExt, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use std::convert::{Infallible, TryFrom};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use warp::ws::{Message, WebSocket};
/// Connect to Kraken websocket API for a constant stream of rate updates.
///
@ -85,30 +83,6 @@ impl PriceUpdates {
pub fn latest_update(&mut self) -> PriceUpdate {
self.inner.borrow().clone()
}
pub fn into_ws_stream(self) -> impl Stream<Item = Result<Message, warp::Error>> {
stream::try_unfold(self.inner, |mut receiver| async move {
// todo print error message but don't forward it to the user and don't panic
receiver
.changed()
.await
.context("failed to receive latest rate update")
.expect("Should not fail :)");
let latest_rate = receiver
.borrow()
.clone()
.map_err(|_e| Error::NotYetAvailable)
.expect("Should work");
let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize"));
Ok(Some((msg, receiver)))
})
}
}
pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) {
let (ws_tx, mut _ws_rx) = ws.split();
tokio::task::spawn(subscription.into_ws_stream().forward(ws_tx));
}
impl From<watch::Receiver<PriceUpdate>> for PriceUpdates {