From fee1c9f21589201c833d52f80387f303ca9abfe5 Mon Sep 17 00:00:00 2001 From: Philipp Hoenisch Date: Thu, 3 Jun 2021 14:46:56 +1000 Subject: [PATCH] Dedicated quote websocket Adds a websocket that one can subscribe to for quotes. Whenever there is a price change the quote is calculated. Includes automatic configuration for the websocket port as Tor hidden service (when running over Tor). --- Cargo.lock | 150 +++++++++++++++++++++++++++- swap/Cargo.toml | 1 + swap/src/asb.rs | 1 + swap/src/asb/config.rs | 4 + swap/src/asb/quote_websocket.rs | 86 ++++++++++++++++ swap/src/bin/asb.rs | 35 ++++++- swap/src/kraken.rs | 15 ++- swap/src/protocol/bob/event_loop.rs | 2 + 8 files changed, 283 insertions(+), 11 deletions(-) create mode 100644 swap/src/asb/quote_websocket.rs diff --git a/Cargo.lock b/Cargo.lock index a92f090a..548ae6c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -474,6 +474,16 @@ dependencies = [ "serde", ] +[[package]] +name = "buf_redux" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" +dependencies = [ + "memchr", + "safemem", +] + [[package]] name = "bumpalo" version = "3.6.1" @@ -1338,6 +1348,31 @@ dependencies = [ "ahash", ] +[[package]] +name = "headers" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855" +dependencies = [ + "base64 0.13.0", + "bitflags", + "bytes 1.0.1", + "headers-core", + "http", + "mime 0.3.16", + "sha-1", + "time 0.1.43", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.3.2" @@ -1474,7 +1509,7 @@ dependencies = [ "time 0.1.43", "traitobject", "typeable", - "unicase", + "unicase 1.4.2", "url 1.7.2", ] @@ -2152,6 +2187,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +dependencies = [ + "mime 0.3.16", + "unicase 2.6.0", +] + [[package]] name = "minicbor" version = "0.8.0" @@ -2322,6 +2367,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "multipart" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050aeedc89243f5347c3e237e3e13dc76fbe4ae3742a57b94dc14f69acf76d4" +dependencies = [ + "buf_redux", + "httparse", + "log 0.4.14", + "mime 0.3.16", + "mime_guess", + "quick-error 1.2.3", + "rand 0.7.3", + "safemem", + "tempfile", + "twoway", +] + [[package]] name = "multistream-select" version = "0.10.2" @@ -3934,7 +3997,7 @@ dependencies = [ "tokio", "tokio-socks", "tokio-tar", - "tokio-tungstenite", + "tokio-tungstenite 0.14.0", "tokio-util", "toml", "torut", @@ -3945,6 +4008,7 @@ dependencies = [ "url 2.2.2", "uuid", "void", + "warp", "zip", ] @@ -4238,6 +4302,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" +dependencies = [ + "futures-util", + "log 0.4.14", + "pin-project 1.0.5", + "tokio", + "tungstenite 0.12.0", +] + [[package]] name = "tokio-tungstenite" version = "0.14.0" @@ -4250,7 +4327,7 @@ dependencies = [ "rustls 0.19.0", "tokio", "tokio-rustls", - "tungstenite", + "tungstenite 0.13.0", "webpki", "webpki-roots 0.21.0", ] @@ -4309,6 +4386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if 1.0.0", + "log 0.4.14", "pin-project-lite 0.2.6", "tracing-attributes", "tracing-core", @@ -4456,6 +4534,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "http", + "httparse", + "input_buffer", + "log 0.4.14", + "rand 0.8.3", + "sha-1", + "url 2.2.2", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.13.0" @@ -4479,6 +4576,15 @@ dependencies = [ "webpki-roots 0.21.0", ] +[[package]] +name = "twoway" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1" +dependencies = [ + "memchr", +] + [[package]] name = "typeable" version = "0.1.2" @@ -4518,6 +4624,15 @@ dependencies = [ "version_check 0.1.5", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check 0.9.3", +] + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -4683,6 +4798,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332d47745e9a0c38636dbd454729b147d16bd1ed08ae67b3ab281c4506771054" +dependencies = [ + "bytes 1.0.1", + "futures", + "headers", + "http", + "hyper 0.14.8", + "log 0.4.14", + "mime 0.3.16", + "mime_guess", + "multipart", + "percent-encoding 2.1.0", + "pin-project 1.0.5", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-stream", + "tokio-tungstenite 0.13.0", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index ba8ed7b3..f5575eb7 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -65,6 +65,7 @@ tracing-subscriber = { version = "0.2", default-features = false, features = [ " url = { version = "2", features = [ "serde" ] } uuid = { version = "0.8", features = [ "serde", "v4" ] } void = "1" +warp = "0.3" [target.'cfg(not(windows))'.dependencies] tokio-tar = "0.3" diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 222046c1..ed46c1d3 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -1,5 +1,6 @@ pub mod command; pub mod config; +pub mod quote_websocket; mod rate; pub mod tracing; diff --git a/swap/src/asb/config.rs b/swap/src/asb/config.rs index 984bc3ee..e2ebd81b 100644 --- a/swap/src/asb/config.rs +++ b/swap/src/asb/config.rs @@ -151,6 +151,7 @@ pub struct Maker { #[serde(with = "::bitcoin::util::amount::serde::as_btc")] pub max_buy_btc: bitcoin::Amount, pub ask_spread: Decimal, + pub quote_websocket_port: Option, } impl Default for TorConf { @@ -307,6 +308,7 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { min_buy_btc: min_buy, max_buy_btc: max_buy, ask_spread, + quote_websocket_port: None, }, }) } @@ -347,6 +349,7 @@ mod tests { min_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MIN_BUY_AMOUNT).unwrap(), max_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MAX_BUY_AMOUNT).unwrap(), ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(), + quote_websocket_port: None, }, }; @@ -387,6 +390,7 @@ mod tests { min_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MIN_BUY_AMOUNT).unwrap(), max_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MAX_BUY_AMOUNT).unwrap(), ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(), + quote_websocket_port: None, }, }; diff --git a/swap/src/asb/quote_websocket.rs b/swap/src/asb/quote_websocket.rs new file mode 100644 index 00000000..f1bdda0e --- /dev/null +++ b/swap/src/asb/quote_websocket.rs @@ -0,0 +1,86 @@ +use crate::asb::Rate; +use crate::kraken::PriceUpdates; +use crate::network::quote::BidQuote; +use futures::{stream, StreamExt, TryStreamExt}; +use rust_decimal::Decimal; +use serde::Serialize; +use warp::ws::{Message, WebSocket}; +use warp::Filter; + +pub async fn setup_quote_websocket( + price_updates: PriceUpdates, + port: u16, + spread: Decimal, + min: bitcoin::Amount, + max: bitcoin::Amount, +) { + let latest_quote = warp::get() + .and(warp::path!("api" / "quote" / "xmr-btc")) + .and(warp::ws()) + .map(move |ws: warp::ws::Ws| { + let price_updates = price_updates.clone(); + tracing::info!("New quote websocket connection"); + ws.on_upgrade(move |socket| quote_stream(socket, price_updates, spread, min, max)) + }); + tokio::spawn(async move { + warp::serve(latest_quote).run(([0, 0, 0, 0], port)).await; + }); +} + +async fn quote_stream( + ws: WebSocket, + subscription: PriceUpdates, + spread: Decimal, + min: bitcoin::Amount, + max: bitcoin::Amount, +) { + let stream = stream::try_unfold(subscription.inner, move |mut receiver| async move { + if let Err(e) = receiver.changed().await { + tracing::error!( + "Failed to initialize price update stream for quote websocket: {:#}", + e + ); + } + + let quote = match receiver.borrow().clone() { + Ok(latest_price_update) => match Rate::new(latest_price_update.ask, spread).ask() { + Ok(amount) => WebsocketBidQuote::Quote(BidQuote { + price: amount, + min_quantity: min, + max_quantity: max, + }), + Err(e) => { + tracing::error!("Failed to create quote for quote websocket: {:#}", e); + WebsocketBidQuote::Error(Error::BidQuoteError) + } + }, + Err(e) => { + tracing::error!( + "Failed to fetch latest price update for quote websocket: {:#}", + e + ); + WebsocketBidQuote::Error(Error::PriceUpdateError) + } + }; + + let msg = Message::text(serde_json::to_string("e).expect("quote to serialize")); + + Ok(Some((msg, receiver))) + }) + .into_stream(); + + let (ws_tx, mut _ws_rx) = ws.split(); + tokio::task::spawn(stream.forward(ws_tx)); +} + +#[derive(Serialize, Debug, Clone)] +enum WebsocketBidQuote { + Quote(BidQuote), + Error(Error), +} + +#[derive(Clone, Debug, Serialize)] +enum Error { + PriceUpdateError, + BidQuoteError, +} diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index eea3bcce..77abf95b 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -26,6 +26,7 @@ use swap::asb::command::{parse_args, Arguments, Command}; use swap::asb::config::{ initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized, }; +use swap::asb::quote_websocket::setup_quote_websocket; use swap::database::Database; use swap::monero::Amount; use swap::network::swarm; @@ -127,15 +128,30 @@ async fn main() -> Result<()> { let kraken_price_updates = kraken::connect()?; + if let Some(quote_websocket_port) = config.maker.quote_websocket_port { + setup_quote_websocket( + kraken_price_updates.clone(), + quote_websocket_port, + config.maker.ask_spread, + config.maker.min_buy_btc, + config.maker.max_buy_btc, + ) + .await; + } + // setup Tor hidden services let tor_client = tor::Client::new(config.tor.socks5_port).with_control_port(config.tor.control_port); let _ac = match tor_client.assert_tor_running().await { Ok(_) => { tracing::info!("Tor found. Setting up hidden service"); - let ac = - register_tor_services(config.network.clone().listen, tor_client, &seed) - .await?; + let ac = register_tor_services( + config.network.clone().listen, + tor_client, + &seed, + config.maker.quote_websocket_port, + ) + .await?; Some(ac) } Err(_) => { @@ -340,10 +356,11 @@ async fn register_tor_services( networks: Vec, tor_client: tor::Client, seed: &Seed, + quote_websocket_port: Option, ) -> Result { let mut ac = tor_client.into_authenticated_client().await?; - let hidden_services_details = networks + let mut hidden_services_details = networks .iter() .flat_map(|network| { network.iter().map(|protocol| match protocol { @@ -360,6 +377,16 @@ async fn register_tor_services( .flatten() .collect::>(); + if let Some(quote_websocket_port) = quote_websocket_port { + hidden_services_details.push(( + quote_websocket_port, + SocketAddr::new( + IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), + quote_websocket_port, + ), + )); + } + let key = seed.derive_torv3_key(); ac.add_services(&hidden_services_details, &key).await?; diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index 001696ff..768a7028 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Context, Result}; use futures::{SinkExt, StreamExt, TryStreamExt}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::convert::{Infallible, TryFrom}; use std::sync::Arc; use std::time::Duration; @@ -70,7 +70,7 @@ pub fn connect() -> Result { #[derive(Clone, Debug)] pub struct PriceUpdates { - inner: watch::Receiver, + pub inner: watch::Receiver, } impl PriceUpdates { @@ -85,7 +85,13 @@ impl PriceUpdates { } } -#[derive(Clone, Debug, thiserror::Error)] +impl From> for PriceUpdates { + fn from(inner: watch::Receiver) -> Self { + Self { inner } + } +} + +#[derive(Clone, Debug, thiserror::Error, Serialize)] pub enum Error { #[error("Rate is not yet available")] NotYetAvailable, @@ -248,9 +254,10 @@ mod wire { } /// Represents an update within the price ticker. - #[derive(Clone, Debug, Deserialize)] + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(try_from = "TickerUpdate")] pub struct PriceUpdate { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] pub ask: bitcoin::Amount, } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 4fe347bf..e396684c 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -122,6 +122,8 @@ impl EventLoop { } } SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => { + tracing::info!(%msg.swap_id, %peer, "Received transfer proof msg for"); + let swap_id = msg.swap_id; if peer != self.alice_peer_id {