diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 222046c1..2a808ac7 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -1,5 +1,6 @@ pub mod command; pub mod config; +pub mod price_websocket; mod rate; pub mod tracing; diff --git a/swap/src/asb/price_websocket.rs b/swap/src/asb/price_websocket.rs new file mode 100644 index 00000000..bdc40dca --- /dev/null +++ b/swap/src/asb/price_websocket.rs @@ -0,0 +1,28 @@ +use crate::kraken::PriceUpdates; +use anyhow::Context; +use futures::{stream, StreamExt, TryStreamExt}; +use warp::ws::{Message, WebSocket}; + +pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) { + let stream = stream::try_unfold(subscription.inner, |mut receiver| async move { + // todo print error message but don't forward it to the user and don't panic + receiver + .changed() + .await + .context("failed to receive latest rate update") + .expect("Should not fail :)"); + + let latest_rate = receiver.borrow().clone().expect("Should work"); + + // TODO: Proper definition of what to send over the wire + // TODO: Properly calculate the actual rate (using spread) and add min and max + // amounts tradeable + let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize")); + + Ok(Some((msg, receiver))) + }) + .into_stream(); + + let (ws_tx, mut _ws_rx) = ws.split(); + tokio::task::spawn(stream.forward(ws_tx)); +} diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index f85fa805..d30a06c2 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -26,8 +26,9 @@ use swap::asb::command::{parse_args, Arguments, Command}; use swap::asb::config::{ initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized, }; +use swap::asb::price_websocket::latest_rate; use swap::database::Database; -use swap::kraken::latest_rate; +use swap::kraken::PriceUpdates; use swap::monero::Amount; use swap::network::swarm; use swap::protocol::alice; @@ -129,18 +130,7 @@ async fn main() -> Result<()> { let kraken_price_updates = kraken::connect()?; - let updates = kraken_price_updates.clone(); - let latest_rate = warp::get() - .and(warp::path!("api" / "rate" / "btc-xmr")) - .and(warp::ws()) - .map(move |ws: warp::ws::Ws| { - let updates = updates.clone(); - info!("New connection received"); - ws.on_upgrade(move |socket| latest_rate(socket, updates)) - }); - tokio::spawn(async move { - warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await; - }); + setup_price_websocket(kraken_price_updates.clone()).await; // setup Tor hidden services let tor_client = @@ -311,6 +301,20 @@ async fn main() -> Result<()> { Ok(()) } +async fn setup_price_websocket(price_updates: PriceUpdates) { + let latest_rate = warp::get() + .and(warp::path!("api" / "price" / "xmr-btc")) + .and(warp::ws()) + .map(move |ws: warp::ws::Ws| { + let price_updates = price_updates.clone(); + info!("New price websocket connection"); + ws.on_upgrade(move |socket| latest_rate(socket, price_updates)) + }); + tokio::spawn(async move { + warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await; + }); +} + async fn init_bitcoin_wallet( config: &Config, seed: &Seed, @@ -376,7 +380,10 @@ async fn register_tor_services( .collect::>(); // TODO: Only configure if feature flag is set in config - hidden_services_details.push((3030, SocketAddr::new(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), 3030))); + hidden_services_details.push(( + 3030, + SocketAddr::new(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), 3030), + )); let key = seed.derive_torv3_key(); diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index 93ec5a2b..768a7028 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -1,12 +1,10 @@ use anyhow::{anyhow, Context, Result}; -use futures::{stream, SinkExt, Stream, StreamExt, TryStreamExt}; -use serde::Deserialize; -use serde::Serialize; +use futures::{SinkExt, StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; use std::convert::{Infallible, TryFrom}; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; -use warp::ws::{Message, WebSocket}; /// Connect to Kraken websocket API for a constant stream of rate updates. /// @@ -85,30 +83,6 @@ impl PriceUpdates { pub fn latest_update(&mut self) -> PriceUpdate { self.inner.borrow().clone() } - - pub fn into_ws_stream(self) -> impl Stream> { - stream::try_unfold(self.inner, |mut receiver| async move { - // todo print error message but don't forward it to the user and don't panic - receiver - .changed() - .await - .context("failed to receive latest rate update") - .expect("Should not fail :)"); - - let latest_rate = receiver - .borrow() - .clone() - .map_err(|_e| Error::NotYetAvailable) - .expect("Should work"); - let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize")); - Ok(Some((msg, receiver))) - }) - } -} - -pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) { - let (ws_tx, mut _ws_rx) = ws.split(); - tokio::task::spawn(subscription.into_ws_stream().forward(ws_tx)); } impl From> for PriceUpdates {