Dedicated quote websocket

Adds a websocket that one can subscribe to for quotes.
Whenever there is a price change the quote is calculated.
Includes automatic configuration for the websocket port as Tor hidden service (when running over Tor).
This commit is contained in:
Philipp Hoenisch 2021-06-03 14:46:56 +10:00 committed by Daniel Karzel
parent d5d0dda6e7
commit fee1c9f215
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E
8 changed files with 283 additions and 11 deletions

150
Cargo.lock generated
View File

@ -474,6 +474,16 @@ dependencies = [
"serde",
]
[[package]]
name = "buf_redux"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
dependencies = [
"memchr",
"safemem",
]
[[package]]
name = "bumpalo"
version = "3.6.1"
@ -1338,6 +1348,31 @@ dependencies = [
"ahash",
]
[[package]]
name = "headers"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855"
dependencies = [
"base64 0.13.0",
"bitflags",
"bytes 1.0.1",
"headers-core",
"http",
"mime 0.3.16",
"sha-1",
"time 0.1.43",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.2"
@ -1474,7 +1509,7 @@ dependencies = [
"time 0.1.43",
"traitobject",
"typeable",
"unicase",
"unicase 1.4.2",
"url 1.7.2",
]
@ -2152,6 +2187,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212"
dependencies = [
"mime 0.3.16",
"unicase 2.6.0",
]
[[package]]
name = "minicbor"
version = "0.8.0"
@ -2322,6 +2367,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multipart"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050aeedc89243f5347c3e237e3e13dc76fbe4ae3742a57b94dc14f69acf76d4"
dependencies = [
"buf_redux",
"httparse",
"log 0.4.14",
"mime 0.3.16",
"mime_guess",
"quick-error 1.2.3",
"rand 0.7.3",
"safemem",
"tempfile",
"twoway",
]
[[package]]
name = "multistream-select"
version = "0.10.2"
@ -3934,7 +3997,7 @@ dependencies = [
"tokio",
"tokio-socks",
"tokio-tar",
"tokio-tungstenite",
"tokio-tungstenite 0.14.0",
"tokio-util",
"toml",
"torut",
@ -3945,6 +4008,7 @@ dependencies = [
"url 2.2.2",
"uuid",
"void",
"warp",
"zip",
]
@ -4238,6 +4302,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-tungstenite"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b"
dependencies = [
"futures-util",
"log 0.4.14",
"pin-project 1.0.5",
"tokio",
"tungstenite 0.12.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.14.0"
@ -4250,7 +4327,7 @@ dependencies = [
"rustls 0.19.0",
"tokio",
"tokio-rustls",
"tungstenite",
"tungstenite 0.13.0",
"webpki",
"webpki-roots 0.21.0",
]
@ -4309,6 +4386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if 1.0.0",
"log 0.4.14",
"pin-project-lite 0.2.6",
"tracing-attributes",
"tracing-core",
@ -4456,6 +4534,25 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24"
dependencies = [
"base64 0.13.0",
"byteorder",
"bytes 1.0.1",
"http",
"httparse",
"input_buffer",
"log 0.4.14",
"rand 0.8.3",
"sha-1",
"url 2.2.2",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.13.0"
@ -4479,6 +4576,15 @@ dependencies = [
"webpki-roots 0.21.0",
]
[[package]]
name = "twoway"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1"
dependencies = [
"memchr",
]
[[package]]
name = "typeable"
version = "0.1.2"
@ -4518,6 +4624,15 @@ dependencies = [
"version_check 0.1.5",
]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check 0.9.3",
]
[[package]]
name = "unicode-bidi"
version = "0.3.4"
@ -4683,6 +4798,35 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332d47745e9a0c38636dbd454729b147d16bd1ed08ae67b3ab281c4506771054"
dependencies = [
"bytes 1.0.1",
"futures",
"headers",
"http",
"hyper 0.14.8",
"log 0.4.14",
"mime 0.3.16",
"mime_guess",
"multipart",
"percent-encoding 2.1.0",
"pin-project 1.0.5",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.13.0",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"

View File

@ -65,6 +65,7 @@ tracing-subscriber = { version = "0.2", default-features = false, features = [ "
url = { version = "2", features = [ "serde" ] }
uuid = { version = "0.8", features = [ "serde", "v4" ] }
void = "1"
warp = "0.3"
[target.'cfg(not(windows))'.dependencies]
tokio-tar = "0.3"

View File

@ -1,5 +1,6 @@
pub mod command;
pub mod config;
pub mod quote_websocket;
mod rate;
pub mod tracing;

View File

@ -151,6 +151,7 @@ pub struct Maker {
#[serde(with = "::bitcoin::util::amount::serde::as_btc")]
pub max_buy_btc: bitcoin::Amount,
pub ask_spread: Decimal,
pub quote_websocket_port: Option<u16>,
}
impl Default for TorConf {
@ -307,6 +308,7 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
min_buy_btc: min_buy,
max_buy_btc: max_buy,
ask_spread,
quote_websocket_port: None,
},
})
}
@ -347,6 +349,7 @@ mod tests {
min_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MIN_BUY_AMOUNT).unwrap(),
max_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MAX_BUY_AMOUNT).unwrap(),
ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(),
quote_websocket_port: None,
},
};
@ -387,6 +390,7 @@ mod tests {
min_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MIN_BUY_AMOUNT).unwrap(),
max_buy_btc: bitcoin::Amount::from_btc(DEFAULT_MAX_BUY_AMOUNT).unwrap(),
ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(),
quote_websocket_port: None,
},
};

View File

@ -0,0 +1,86 @@
use crate::asb::Rate;
use crate::kraken::PriceUpdates;
use crate::network::quote::BidQuote;
use futures::{stream, StreamExt, TryStreamExt};
use rust_decimal::Decimal;
use serde::Serialize;
use warp::ws::{Message, WebSocket};
use warp::Filter;
pub async fn setup_quote_websocket(
price_updates: PriceUpdates,
port: u16,
spread: Decimal,
min: bitcoin::Amount,
max: bitcoin::Amount,
) {
let latest_quote = warp::get()
.and(warp::path!("api" / "quote" / "xmr-btc"))
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let price_updates = price_updates.clone();
tracing::info!("New quote websocket connection");
ws.on_upgrade(move |socket| quote_stream(socket, price_updates, spread, min, max))
});
tokio::spawn(async move {
warp::serve(latest_quote).run(([0, 0, 0, 0], port)).await;
});
}
async fn quote_stream(
ws: WebSocket,
subscription: PriceUpdates,
spread: Decimal,
min: bitcoin::Amount,
max: bitcoin::Amount,
) {
let stream = stream::try_unfold(subscription.inner, move |mut receiver| async move {
if let Err(e) = receiver.changed().await {
tracing::error!(
"Failed to initialize price update stream for quote websocket: {:#}",
e
);
}
let quote = match receiver.borrow().clone() {
Ok(latest_price_update) => match Rate::new(latest_price_update.ask, spread).ask() {
Ok(amount) => WebsocketBidQuote::Quote(BidQuote {
price: amount,
min_quantity: min,
max_quantity: max,
}),
Err(e) => {
tracing::error!("Failed to create quote for quote websocket: {:#}", e);
WebsocketBidQuote::Error(Error::BidQuoteError)
}
},
Err(e) => {
tracing::error!(
"Failed to fetch latest price update for quote websocket: {:#}",
e
);
WebsocketBidQuote::Error(Error::PriceUpdateError)
}
};
let msg = Message::text(serde_json::to_string(&quote).expect("quote to serialize"));
Ok(Some((msg, receiver)))
})
.into_stream();
let (ws_tx, mut _ws_rx) = ws.split();
tokio::task::spawn(stream.forward(ws_tx));
}
#[derive(Serialize, Debug, Clone)]
enum WebsocketBidQuote {
Quote(BidQuote),
Error(Error),
}
#[derive(Clone, Debug, Serialize)]
enum Error {
PriceUpdateError,
BidQuoteError,
}

View File

@ -26,6 +26,7 @@ use swap::asb::command::{parse_args, Arguments, Command};
use swap::asb::config::{
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
};
use swap::asb::quote_websocket::setup_quote_websocket;
use swap::database::Database;
use swap::monero::Amount;
use swap::network::swarm;
@ -127,15 +128,30 @@ async fn main() -> Result<()> {
let kraken_price_updates = kraken::connect()?;
if let Some(quote_websocket_port) = config.maker.quote_websocket_port {
setup_quote_websocket(
kraken_price_updates.clone(),
quote_websocket_port,
config.maker.ask_spread,
config.maker.min_buy_btc,
config.maker.max_buy_btc,
)
.await;
}
// setup Tor hidden services
let tor_client =
tor::Client::new(config.tor.socks5_port).with_control_port(config.tor.control_port);
let _ac = match tor_client.assert_tor_running().await {
Ok(_) => {
tracing::info!("Tor found. Setting up hidden service");
let ac =
register_tor_services(config.network.clone().listen, tor_client, &seed)
.await?;
let ac = register_tor_services(
config.network.clone().listen,
tor_client,
&seed,
config.maker.quote_websocket_port,
)
.await?;
Some(ac)
}
Err(_) => {
@ -340,10 +356,11 @@ async fn register_tor_services(
networks: Vec<Multiaddr>,
tor_client: tor::Client,
seed: &Seed,
quote_websocket_port: Option<u16>,
) -> Result<AuthenticatedClient> {
let mut ac = tor_client.into_authenticated_client().await?;
let hidden_services_details = networks
let mut hidden_services_details = networks
.iter()
.flat_map(|network| {
network.iter().map(|protocol| match protocol {
@ -360,6 +377,16 @@ async fn register_tor_services(
.flatten()
.collect::<Vec<_>>();
if let Some(quote_websocket_port) = quote_websocket_port {
hidden_services_details.push((
quote_websocket_port,
SocketAddr::new(
IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)),
quote_websocket_port,
),
));
}
let key = seed.derive_torv3_key();
ac.add_services(&hidden_services_details, &key).await?;

View File

@ -1,6 +1,6 @@
use anyhow::{anyhow, Context, Result};
use futures::{SinkExt, StreamExt, TryStreamExt};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::convert::{Infallible, TryFrom};
use std::sync::Arc;
use std::time::Duration;
@ -70,7 +70,7 @@ pub fn connect() -> Result<PriceUpdates> {
#[derive(Clone, Debug)]
pub struct PriceUpdates {
inner: watch::Receiver<PriceUpdate>,
pub inner: watch::Receiver<PriceUpdate>,
}
impl PriceUpdates {
@ -85,7 +85,13 @@ impl PriceUpdates {
}
}
#[derive(Clone, Debug, thiserror::Error)]
impl From<watch::Receiver<PriceUpdate>> for PriceUpdates {
fn from(inner: watch::Receiver<PriceUpdate>) -> Self {
Self { inner }
}
}
#[derive(Clone, Debug, thiserror::Error, Serialize)]
pub enum Error {
#[error("Rate is not yet available")]
NotYetAvailable,
@ -248,9 +254,10 @@ mod wire {
}
/// Represents an update within the price ticker.
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(try_from = "TickerUpdate")]
pub struct PriceUpdate {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
pub ask: bitcoin::Amount,
}

View File

@ -122,6 +122,8 @@ impl EventLoop {
}
}
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => {
tracing::info!(%msg.swap_id, %peer, "Received transfer proof msg for");
let swap_id = msg.swap_id;
if peer != self.alice_peer_id {