Add websocket support using warp

This commit is contained in:
Philipp Hoenisch 2021-06-04 08:35:23 +10:00
parent 20ea217e7b
commit 686880a140
No known key found for this signature in database
GPG Key ID: E5F8E74C672BC666
2 changed files with 25 additions and 55 deletions

View File

@ -13,15 +13,13 @@
#![allow(non_snake_case)] #![allow(non_snake_case)]
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use futures::{StreamExt, TryStreamExt};
use libp2p::core::multiaddr::Protocol; use libp2p::core::multiaddr::Protocol;
use libp2p::core::Multiaddr; use libp2p::core::Multiaddr;
use libp2p::Swarm; use libp2p::Swarm;
use prettytable::{row, Table}; use prettytable::{row, Table};
use std::error::Error; use std::env;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::{env, fmt};
use structopt::clap; use structopt::clap;
use structopt::clap::ErrorKind; use structopt::clap::ErrorKind;
use swap::asb::command::{parse_args, Arguments, Command}; use swap::asb::command::{parse_args, Arguments, Command};
@ -29,7 +27,7 @@ use swap::asb::config::{
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized, initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
}; };
use swap::database::Database; use swap::database::Database;
use swap::kraken::PriceUpdates; use swap::kraken::latest_rate;
use swap::monero::Amount; use swap::monero::Amount;
use swap::network::swarm; use swap::network::swarm;
use swap::protocol::alice; use swap::protocol::alice;
@ -40,7 +38,7 @@ use swap::tor::AuthenticatedClient;
use swap::{asb, bitcoin, kraken, monero, tor}; use swap::{asb, bitcoin, kraken, monero, tor};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use warp::{Filter, Reply}; use warp::Filter;
#[macro_use] #[macro_use]
extern crate prettytable; extern crate prettytable;
@ -134,9 +132,14 @@ async fn main() -> Result<()> {
let updates = kraken_price_updates.clone(); let updates = kraken_price_updates.clone();
let latest_rate = warp::get() let latest_rate = warp::get()
.and(warp::path!("api" / "rate" / "btc-xmr")) .and(warp::path!("api" / "rate" / "btc-xmr"))
.map(move || latest_rate(updates.clone())); .and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let updates = updates.clone();
info!("New connection received");
ws.on_upgrade(move |socket| latest_rate(socket, updates))
});
tokio::spawn(async move { tokio::spawn(async move {
warp::serve(latest_rate).run(([127, 0, 0, 1], 3030)).await; warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await;
}); });
// setup Tor hidden services // setup Tor hidden services
@ -388,45 +391,3 @@ async fn register_tor_services(
Ok(ac) Ok(ac)
} }
fn latest_rate(subscription: PriceUpdates) -> impl Reply {
let stream = subscription
.into_stream()
.map_ok(|data| {
let event = warp::sse::Event::default()
.id("rate")
.json_data(data)
.context("failed to attach json data to sse event")?;
Ok(event)
})
.map(|result| match result {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(e)) => Err(e),
Err(e) => Err(e),
})
.err_into::<RateStreamError>();
warp::sse::reply(warp::sse::keep_alive().stream(stream))
}
#[derive(Debug)]
struct RateStreamError(anyhow::Error);
impl fmt::Display for RateStreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:#}", self.0)
}
}
impl Error for RateStreamError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}
impl From<anyhow::Error> for RateStreamError {
fn from(e: anyhow::Error) -> Self {
RateStreamError(e)
}
}

View File

@ -6,6 +6,7 @@ use std::convert::{Infallible, TryFrom};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use warp::ws::{Message, WebSocket};
/// Connect to Kraken websocket API for a constant stream of rate updates. /// Connect to Kraken websocket API for a constant stream of rate updates.
/// ///
@ -71,7 +72,7 @@ pub fn connect() -> Result<PriceUpdates> {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PriceUpdates { pub struct PriceUpdates {
inner: watch::Receiver<PriceUpdate>, pub inner: watch::Receiver<PriceUpdate>,
} }
impl PriceUpdates { impl PriceUpdates {
@ -85,23 +86,31 @@ impl PriceUpdates {
self.inner.borrow().clone() self.inner.borrow().clone()
} }
pub fn into_stream(self) -> impl Stream<Item = Result<PriceUpdate>> { pub fn into_ws_stream(self) -> impl Stream<Item = Result<Message, warp::Error>> {
stream::try_unfold(self.inner, |mut receiver| async move { stream::try_unfold(self.inner, |mut receiver| async move {
// todo print error message but don't forward it to the user and don't panic
receiver receiver
.changed() .changed()
.await .await
.context("failed to receive latest rate update")?; .context("failed to receive latest rate update")
.expect("Should not fail :)");
let latest_rate = receiver let latest_rate = receiver
.borrow() .borrow()
.clone() .clone()
.map_err(|_e| Error::NotYetAvailable); .map_err(|_e| Error::NotYetAvailable)
.expect("Should work");
Ok(Some((latest_rate, receiver))) let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize"));
Ok(Some((msg, receiver)))
}) })
} }
} }
pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) {
let (ws_tx, mut _ws_rx) = ws.split();
tokio::task::spawn(subscription.into_ws_stream().forward(ws_tx));
}
impl From<watch::Receiver<PriceUpdate>> for PriceUpdates { impl From<watch::Receiver<PriceUpdate>> for PriceUpdates {
fn from(inner: watch::Receiver<PriceUpdate>) -> Self { fn from(inner: watch::Receiver<PriceUpdate>) -> Self {
Self { inner } Self { inner }