diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 6534ba3e..a4528f03 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -13,15 +13,13 @@ #![allow(non_snake_case)] use anyhow::{bail, Context, Result}; -use futures::{StreamExt, TryStreamExt}; use libp2p::core::multiaddr::Protocol; use libp2p::core::Multiaddr; use libp2p::Swarm; use prettytable::{row, Table}; -use std::error::Error; +use std::env; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; -use std::{env, fmt}; use structopt::clap; use structopt::clap::ErrorKind; use swap::asb::command::{parse_args, Arguments, Command}; @@ -29,7 +27,7 @@ use swap::asb::config::{ initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized, }; use swap::database::Database; -use swap::kraken::PriceUpdates; +use swap::kraken::latest_rate; use swap::monero::Amount; use swap::network::swarm; use swap::protocol::alice; @@ -40,7 +38,7 @@ use swap::tor::AuthenticatedClient; use swap::{asb, bitcoin, kraken, monero, tor}; use tracing::{debug, info, warn}; use tracing_subscriber::filter::LevelFilter; -use warp::{Filter, Reply}; +use warp::Filter; #[macro_use] extern crate prettytable; @@ -134,9 +132,14 @@ async fn main() -> Result<()> { let updates = kraken_price_updates.clone(); let latest_rate = warp::get() .and(warp::path!("api" / "rate" / "btc-xmr")) - .map(move || latest_rate(updates.clone())); + .and(warp::ws()) + .map(move |ws: warp::ws::Ws| { + let updates = updates.clone(); + info!("New connection received"); + ws.on_upgrade(move |socket| latest_rate(socket, updates)) + }); tokio::spawn(async move { - warp::serve(latest_rate).run(([127, 0, 0, 1], 3030)).await; + warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await; }); // setup Tor hidden services @@ -388,45 +391,3 @@ async fn register_tor_services( Ok(ac) } - -fn latest_rate(subscription: PriceUpdates) -> impl Reply { - let stream = subscription - .into_stream() - .map_ok(|data| { - let event = warp::sse::Event::default() - .id("rate") - .json_data(data) - .context("failed to attach json data to sse event")?; - - Ok(event) - }) - .map(|result| match result { - Ok(Ok(ok)) => Ok(ok), - Ok(Err(e)) => Err(e), - Err(e) => Err(e), - }) - .err_into::(); - - warp::sse::reply(warp::sse::keep_alive().stream(stream)) -} - -#[derive(Debug)] -struct RateStreamError(anyhow::Error); - -impl fmt::Display for RateStreamError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:#}", self.0) - } -} - -impl Error for RateStreamError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - self.0.source() - } -} - -impl From for RateStreamError { - fn from(e: anyhow::Error) -> Self { - RateStreamError(e) - } -} diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index 9ab0ae3f..93ec5a2b 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -6,6 +6,7 @@ use std::convert::{Infallible, TryFrom}; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; +use warp::ws::{Message, WebSocket}; /// Connect to Kraken websocket API for a constant stream of rate updates. /// @@ -71,7 +72,7 @@ pub fn connect() -> Result { #[derive(Clone, Debug)] pub struct PriceUpdates { - inner: watch::Receiver, + pub inner: watch::Receiver, } impl PriceUpdates { @@ -85,23 +86,31 @@ impl PriceUpdates { self.inner.borrow().clone() } - pub fn into_stream(self) -> impl Stream> { + pub fn into_ws_stream(self) -> impl Stream> { stream::try_unfold(self.inner, |mut receiver| async move { + // todo print error message but don't forward it to the user and don't panic receiver .changed() .await - .context("failed to receive latest rate update")?; + .context("failed to receive latest rate update") + .expect("Should not fail :)"); let latest_rate = receiver .borrow() .clone() - .map_err(|_e| Error::NotYetAvailable); - - Ok(Some((latest_rate, receiver))) + .map_err(|_e| Error::NotYetAvailable) + .expect("Should work"); + let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize")); + Ok(Some((msg, receiver))) }) } } +pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) { + let (ws_tx, mut _ws_rx) = ws.split(); + tokio::task::spawn(subscription.into_ws_stream().forward(ws_tx)); +} + impl From> for PriceUpdates { fn from(inner: watch::Receiver) -> Self { Self { inner }