From 92b3df4158645d09d0d60035e99646a5cb9525d7 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 16 Feb 2021 16:37:44 +1100 Subject: [PATCH] Introduce dynamic rates --- Cargo.lock | 64 ++++++++++++ swap/Cargo.toml | 1 + swap/src/asb.rs | 10 ++ swap/src/asb/amounts.rs | 67 +++++++++++++ swap/src/asb/fixed_rate.rs | 20 ++++ swap/src/asb/kraken.rs | 137 ++++++++++++++++++++++++++ swap/src/bin/asb.rs | 4 + swap/src/monero.rs | 1 + swap/src/protocol/alice/event_loop.rs | 19 ++-- swap/tests/testutils/mod.rs | 11 +-- 10 files changed, 320 insertions(+), 14 deletions(-) create mode 100644 swap/src/asb/amounts.rs create mode 100644 swap/src/asb/fixed_rate.rs create mode 100644 swap/src/asb/kraken.rs diff --git a/Cargo.lock b/Cargo.lock index e1691b3f..350e77f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,6 +1504,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "input_buffer" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" +dependencies = [ + "bytes", +] + [[package]] name = "instant" version = "0.1.9" @@ -3163,6 +3172,19 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if 1.0.0", + "cpuid-bool 0.1.2", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + [[package]] name = "sha1" version = "0.6.0" @@ -3501,6 +3523,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-tungstenite", "toml", "tracing", "tracing-core", @@ -3749,6 +3772,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" +dependencies = [ + "futures-util", + "log", + "native-tls", + "pin-project 1.0.4", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.1" @@ -3856,6 +3894,26 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "native-tls", + "rand 0.8.2", + "sha-1", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.12.0" @@ -3969,6 +4027,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + [[package]] name = "uuid" version = "0.8.2" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 597e235f..af389fdb 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -55,6 +55,7 @@ tempfile = "3" thiserror = "1" time = "0.2" tokio = { version = "1.0", features = ["rt-multi-thread", "time", "macros", "sync"] } +tokio-tungstenite = { version = "0.13", features = [ "tls" ] } toml = "0.5" tracing = { version = "0.1", features = ["attributes"] } tracing-core = "0.1" diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 6d982acc..d788b27a 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -1,2 +1,12 @@ pub mod command; pub mod config; +pub mod fixed_rate; +pub mod kraken; + +mod amounts; + +pub use amounts::Rate; + +pub trait LatestRate { + fn latest_rate(&mut self) -> Rate; +} diff --git a/swap/src/asb/amounts.rs b/swap/src/asb/amounts.rs new file mode 100644 index 00000000..fc43ffaf --- /dev/null +++ b/swap/src/asb/amounts.rs @@ -0,0 +1,67 @@ +use crate::{bitcoin, monero}; +use anyhow::{anyhow, Result}; +use rust_decimal::{prelude::ToPrimitive, Decimal}; +use std::fmt::Debug; + +/// Prices at which 1 XMR will be traded, in BTC (XMR/BTC pair) +/// The `ask` represents the minimum price in BTC for which we are willing to +/// sell 1 XMR. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct Rate { + pub ask: bitcoin::Amount, +} + +impl Rate { + pub const ZERO: Rate = Rate { + ask: bitcoin::Amount::ZERO, + }; + + // This function takes the quote amount as it is what Bob sends to Alice in the + // swap request + pub fn sell_quote(&self, quote: bitcoin::Amount) -> Result { + Self::quote(self.ask, quote) + } + + fn quote(rate: bitcoin::Amount, quote: bitcoin::Amount) -> Result { + // quote (btc) = rate * base (xmr) + // base = quote / rate + + let quote_in_sats = quote.as_sat(); + let quote_in_btc = Decimal::from(quote_in_sats) + .checked_div(Decimal::from(bitcoin::Amount::ONE_BTC.as_sat())) + .ok_or_else(|| anyhow!("division overflow"))?; + + let rate_in_btc = Decimal::from(rate.as_sat()) + .checked_div(Decimal::from(bitcoin::Amount::ONE_BTC.as_sat())) + .ok_or_else(|| anyhow!("division overflow"))?; + + let base_in_xmr = quote_in_btc + .checked_div(rate_in_btc) + .ok_or_else(|| anyhow!("division overflow"))?; + let base_in_piconero = base_in_xmr * Decimal::from(monero::Amount::ONE_XMR.as_piconero()); + + let base_in_piconero = base_in_piconero + .to_u64() + .ok_or_else(|| anyhow!("decimal cannot be represented as u64"))?; + + Ok(monero::Amount::from_piconero(base_in_piconero)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sell_quote() { + let rate = Rate { + ask: bitcoin::Amount::from_btc(0.002_500).unwrap(), + }; + + let btc_amount = bitcoin::Amount::from_btc(2.5).unwrap(); + + let xmr_amount = rate.sell_quote(btc_amount).unwrap(); + + assert_eq!(xmr_amount, monero::Amount::from_monero(1000.0).unwrap()) + } +} diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs new file mode 100644 index 00000000..2dc28a2c --- /dev/null +++ b/swap/src/asb/fixed_rate.rs @@ -0,0 +1,20 @@ +use crate::asb::{LatestRate, Rate}; + +pub const RATE: f64 = 0.01; + +#[derive(Clone)] +pub struct RateService(Rate); + +impl LatestRate for RateService { + fn latest_rate(&mut self) -> Rate { + self.0 + } +} + +impl Default for RateService { + fn default() -> Self { + Self(Rate { + ask: bitcoin::Amount::from_btc(RATE).expect("Static value should never fail"), + }) + } +} diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs new file mode 100644 index 00000000..35da8e29 --- /dev/null +++ b/swap/src/asb/kraken.rs @@ -0,0 +1,137 @@ +use crate::asb::{LatestRate, Rate}; +use anyhow::{anyhow, bail, Result}; +use futures::{SinkExt, StreamExt}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::convert::TryFrom; +use tokio::sync::watch; +use tokio_tungstenite::tungstenite::Message; +use watch::Receiver; + +const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; +const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" +{ "event": "subscribe", + "pair": [ "XMR/XBT" ], + "subscription": { + "name": "ticker" + } +}"#; + +#[derive(Clone)] +pub struct RateService { + receiver: Receiver, +} + +impl LatestRate for RateService { + fn latest_rate(&mut self) -> Rate { + *self.receiver.borrow() + } +} + +impl RateService { + pub async fn new() -> Result { + let (tx, rx) = watch::channel(Rate::ZERO); + + let (ws, _response) = + tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; + + let (mut write, mut read) = ws.split(); + + // TODO: Handle the possibility of losing the connection + // to the Kraken WS. Currently the stream would produce no + // further items, and consumers would assume that the rate + // is up to date + tokio::spawn(async move { + while let Some(msg) = read.next().await { + let msg = match msg { + Ok(Message::Text(msg)) => msg, + _ => continue, + }; + + let ticker = match serde_json::from_str::(&msg) { + Ok(ticker) => ticker, + _ => continue, + }; + + let rate = match Rate::try_from(ticker) { + Ok(rate) => rate, + Err(e) => { + log::error!("could not get rate from ticker update: {}", e); + continue; + } + }; + + let _ = tx.send(rate); + } + }); + + write.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()).await?; + + Ok(Self { receiver: rx }) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +struct TickerUpdate(Vec); + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum TickerField { + Data(TickerData), + Metadata(Value), +} + +#[derive(Debug, Serialize, Deserialize)] +struct TickerData { + #[serde(rename = "a")] + ask: Vec, + #[serde(rename = "b")] + bid: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum RateElement { + Text(String), + Number(u64), +} + +impl TryFrom for Rate { + type Error = anyhow::Error; + + fn try_from(value: TickerUpdate) -> Result { + let data = value + .0 + .iter() + .find_map(|field| match field { + TickerField::Data(data) => Some(data), + TickerField::Metadata(_) => None, + }) + .ok_or_else(|| anyhow!("ticker update does not contain data"))?; + + let ask = data.ask.first().ok_or_else(|| anyhow!("no ask price"))?; + let ask = match ask { + RateElement::Text(ask) => { + bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? + } + _ => bail!("unexpected ask rate element"), + }; + + Ok(Self { ask }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn deserialize_ticker_update() { + let sample_response = r#" +[2308,{"a":["18215.60000",0,"0.27454523"],"b":["18197.50000",0,"0.63711255"],"c":["18197.50000","0.00413060"],"v":["2.78915585","156.15766485"],"p":["18200.94036","18275.19149"],"t":[22,1561],"l":["18162.40000","17944.90000"],"h":["18220.90000","18482.60000"],"o":["18220.90000","18478.90000"]},"ticker","XBT/USDT"]"#; + + let _ = serde_json::from_str::(sample_response).unwrap(); + } +} diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index b3898bf2..30fe5ae7 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -24,6 +24,7 @@ use swap::{ initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized, }, + kraken, }, bitcoin, database::Database, @@ -96,6 +97,8 @@ async fn main() -> Result<()> { bitcoin_wallet.new_address().await? ); + let rate_service = kraken::RateService::new().await?; + let (mut event_loop, _) = EventLoop::new( config.network.listen, seed, @@ -103,6 +106,7 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), + rate_service, ) .unwrap(); diff --git a/swap/src/monero.rs b/swap/src/monero.rs index bc40b4a0..da2c6994 100644 --- a/swap/src/monero.rs +++ b/swap/src/monero.rs @@ -77,6 +77,7 @@ pub struct Amount(u64); impl Amount { pub const ZERO: Self = Self(0); + pub const ONE_XMR: Self = Self(PICONERO_OFFSET); /// Create an [Amount] with piconero precision and the given number of /// piconeros. /// diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index a02bb24d..0e836451 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,4 +1,5 @@ use crate::{ + asb::LatestRate, bitcoin, database::Database, execution_params::ExecutionParams, @@ -24,9 +25,6 @@ use tokio::sync::{broadcast, mpsc, mpsc::error::SendError}; use tracing::{debug, error, trace}; use uuid::Uuid; -// TODO: Use dynamic -pub const RATE: u32 = 100; - #[allow(missing_debug_implementations)] pub struct MpscChannels { sender: mpsc::Sender, @@ -79,7 +77,7 @@ impl EventLoopHandle { } #[allow(missing_debug_implementations)] -pub struct EventLoop { +pub struct EventLoop { swarm: libp2p::Swarm, peer_id: PeerId, execution_params: ExecutionParams, @@ -87,6 +85,7 @@ pub struct EventLoop { monero_wallet: Arc, db: Arc, listen_address: Multiaddr, + rate_service: RS, recv_encrypted_signature: broadcast::Sender, send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, @@ -97,7 +96,10 @@ pub struct EventLoop { swap_handle_sender: mpsc::Sender>>, } -impl EventLoop { +impl EventLoop +where + RS: LatestRate, +{ pub fn new( listen_address: Multiaddr, seed: Seed, @@ -105,6 +107,7 @@ impl EventLoop { bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, + rate_service: RS, ) -> Result<(Self, mpsc::Receiver>>)> { let identity = network::Seed::new(seed).derive_libp2p_identity(); let behaviour = Behaviour::default(); @@ -132,6 +135,7 @@ impl EventLoop { monero_wallet, db, listen_address, + rate_service, recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof_sender: send_transfer_proof.sender, @@ -199,9 +203,10 @@ impl EventLoop { // 1. Check if acceptable request // 2. Send response + let rate = self.rate_service.latest_rate(); + let btc_amount = quote_request.btc_amount; - let xmr_amount = btc_amount.as_btc() * RATE as f64; - let xmr_amount = monero::Amount::from_monero(xmr_amount)?; + let xmr_amount = rate.sell_quote(btc_amount)?; let quote_response = QuoteResponse { xmr_amount }; self.swarm diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 1dbdc3b7..85a8fc30 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -14,18 +14,14 @@ use std::{ time::Duration, }; use swap::{ + asb::{fixed_rate, fixed_rate::RATE}, bitcoin, bitcoin::{CancelTimelock, PunishTimelock}, database::Database, execution_params, execution_params::{ExecutionParams, GetExecutionParams}, monero, - protocol::{ - alice, - alice::{event_loop::RATE, AliceState}, - bob, - bob::BobState, - }, + protocol::{alice, alice::AliceState, bob, bob::BobState}, seed::Seed, }; use tempfile::tempdir; @@ -324,7 +320,7 @@ where let (monero, containers) = testutils::init_containers(&cli).await; let btc_amount = bitcoin::Amount::from_sat(1_000_000); - let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() * RATE as f64).unwrap(); + let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / RATE).unwrap(); let alice_starting_balances = StartingBalances { xmr: xmr_amount * 10, @@ -390,6 +386,7 @@ where alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), alice_db, + fixed_rate::RateService::default(), ) .unwrap();