From 92b3df4158645d09d0d60035e99646a5cb9525d7 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 16 Feb 2021 16:37:44 +1100 Subject: [PATCH 1/7] Introduce dynamic rates --- Cargo.lock | 64 ++++++++++++ swap/Cargo.toml | 1 + swap/src/asb.rs | 10 ++ swap/src/asb/amounts.rs | 67 +++++++++++++ swap/src/asb/fixed_rate.rs | 20 ++++ swap/src/asb/kraken.rs | 137 ++++++++++++++++++++++++++ swap/src/bin/asb.rs | 4 + swap/src/monero.rs | 1 + swap/src/protocol/alice/event_loop.rs | 19 ++-- swap/tests/testutils/mod.rs | 11 +-- 10 files changed, 320 insertions(+), 14 deletions(-) create mode 100644 swap/src/asb/amounts.rs create mode 100644 swap/src/asb/fixed_rate.rs create mode 100644 swap/src/asb/kraken.rs diff --git a/Cargo.lock b/Cargo.lock index e1691b3f..350e77f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,6 +1504,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "input_buffer" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" +dependencies = [ + "bytes", +] + [[package]] name = "instant" version = "0.1.9" @@ -3163,6 +3172,19 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if 1.0.0", + "cpuid-bool 0.1.2", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + [[package]] name = "sha1" version = "0.6.0" @@ -3501,6 +3523,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-tungstenite", "toml", "tracing", "tracing-core", @@ -3749,6 +3772,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" +dependencies = [ + "futures-util", + "log", + "native-tls", + "pin-project 1.0.4", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.1" @@ -3856,6 +3894,26 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "native-tls", + "rand 0.8.2", + "sha-1", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.12.0" @@ -3969,6 +4027,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + [[package]] name = "uuid" version = "0.8.2" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 597e235f..af389fdb 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -55,6 +55,7 @@ tempfile = "3" thiserror = "1" time = "0.2" tokio = { version = "1.0", features = ["rt-multi-thread", "time", "macros", "sync"] } +tokio-tungstenite = { version = "0.13", features = [ "tls" ] } toml = "0.5" tracing = { version = "0.1", features = ["attributes"] } tracing-core = "0.1" diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 6d982acc..d788b27a 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -1,2 +1,12 @@ pub mod command; pub mod config; +pub mod fixed_rate; +pub mod kraken; + +mod amounts; + +pub use amounts::Rate; + +pub trait LatestRate { + fn latest_rate(&mut self) -> Rate; +} diff --git a/swap/src/asb/amounts.rs b/swap/src/asb/amounts.rs new file mode 100644 index 00000000..fc43ffaf --- /dev/null +++ b/swap/src/asb/amounts.rs @@ -0,0 +1,67 @@ +use crate::{bitcoin, monero}; +use anyhow::{anyhow, Result}; +use rust_decimal::{prelude::ToPrimitive, Decimal}; +use std::fmt::Debug; + +/// Prices at which 1 XMR will be traded, in BTC (XMR/BTC pair) +/// The `ask` represents the minimum price in BTC for which we are willing to +/// sell 1 XMR. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct Rate { + pub ask: bitcoin::Amount, +} + +impl Rate { + pub const ZERO: Rate = Rate { + ask: bitcoin::Amount::ZERO, + }; + + // This function takes the quote amount as it is what Bob sends to Alice in the + // swap request + pub fn sell_quote(&self, quote: bitcoin::Amount) -> Result { + Self::quote(self.ask, quote) + } + + fn quote(rate: bitcoin::Amount, quote: bitcoin::Amount) -> Result { + // quote (btc) = rate * base (xmr) + // base = quote / rate + + let quote_in_sats = quote.as_sat(); + let quote_in_btc = Decimal::from(quote_in_sats) + .checked_div(Decimal::from(bitcoin::Amount::ONE_BTC.as_sat())) + .ok_or_else(|| anyhow!("division overflow"))?; + + let rate_in_btc = Decimal::from(rate.as_sat()) + .checked_div(Decimal::from(bitcoin::Amount::ONE_BTC.as_sat())) + .ok_or_else(|| anyhow!("division overflow"))?; + + let base_in_xmr = quote_in_btc + .checked_div(rate_in_btc) + .ok_or_else(|| anyhow!("division overflow"))?; + let base_in_piconero = base_in_xmr * Decimal::from(monero::Amount::ONE_XMR.as_piconero()); + + let base_in_piconero = base_in_piconero + .to_u64() + .ok_or_else(|| anyhow!("decimal cannot be represented as u64"))?; + + Ok(monero::Amount::from_piconero(base_in_piconero)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sell_quote() { + let rate = Rate { + ask: bitcoin::Amount::from_btc(0.002_500).unwrap(), + }; + + let btc_amount = bitcoin::Amount::from_btc(2.5).unwrap(); + + let xmr_amount = rate.sell_quote(btc_amount).unwrap(); + + assert_eq!(xmr_amount, monero::Amount::from_monero(1000.0).unwrap()) + } +} diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs new file mode 100644 index 00000000..2dc28a2c --- /dev/null +++ b/swap/src/asb/fixed_rate.rs @@ -0,0 +1,20 @@ +use crate::asb::{LatestRate, Rate}; + +pub const RATE: f64 = 0.01; + +#[derive(Clone)] +pub struct RateService(Rate); + +impl LatestRate for RateService { + fn latest_rate(&mut self) -> Rate { + self.0 + } +} + +impl Default for RateService { + fn default() -> Self { + Self(Rate { + ask: bitcoin::Amount::from_btc(RATE).expect("Static value should never fail"), + }) + } +} diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs new file mode 100644 index 00000000..35da8e29 --- /dev/null +++ b/swap/src/asb/kraken.rs @@ -0,0 +1,137 @@ +use crate::asb::{LatestRate, Rate}; +use anyhow::{anyhow, bail, Result}; +use futures::{SinkExt, StreamExt}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::convert::TryFrom; +use tokio::sync::watch; +use tokio_tungstenite::tungstenite::Message; +use watch::Receiver; + +const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; +const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" +{ "event": "subscribe", + "pair": [ "XMR/XBT" ], + "subscription": { + "name": "ticker" + } +}"#; + +#[derive(Clone)] +pub struct RateService { + receiver: Receiver, +} + +impl LatestRate for RateService { + fn latest_rate(&mut self) -> Rate { + *self.receiver.borrow() + } +} + +impl RateService { + pub async fn new() -> Result { + let (tx, rx) = watch::channel(Rate::ZERO); + + let (ws, _response) = + tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; + + let (mut write, mut read) = ws.split(); + + // TODO: Handle the possibility of losing the connection + // to the Kraken WS. Currently the stream would produce no + // further items, and consumers would assume that the rate + // is up to date + tokio::spawn(async move { + while let Some(msg) = read.next().await { + let msg = match msg { + Ok(Message::Text(msg)) => msg, + _ => continue, + }; + + let ticker = match serde_json::from_str::(&msg) { + Ok(ticker) => ticker, + _ => continue, + }; + + let rate = match Rate::try_from(ticker) { + Ok(rate) => rate, + Err(e) => { + log::error!("could not get rate from ticker update: {}", e); + continue; + } + }; + + let _ = tx.send(rate); + } + }); + + write.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()).await?; + + Ok(Self { receiver: rx }) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +struct TickerUpdate(Vec); + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum TickerField { + Data(TickerData), + Metadata(Value), +} + +#[derive(Debug, Serialize, Deserialize)] +struct TickerData { + #[serde(rename = "a")] + ask: Vec, + #[serde(rename = "b")] + bid: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum RateElement { + Text(String), + Number(u64), +} + +impl TryFrom for Rate { + type Error = anyhow::Error; + + fn try_from(value: TickerUpdate) -> Result { + let data = value + .0 + .iter() + .find_map(|field| match field { + TickerField::Data(data) => Some(data), + TickerField::Metadata(_) => None, + }) + .ok_or_else(|| anyhow!("ticker update does not contain data"))?; + + let ask = data.ask.first().ok_or_else(|| anyhow!("no ask price"))?; + let ask = match ask { + RateElement::Text(ask) => { + bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? + } + _ => bail!("unexpected ask rate element"), + }; + + Ok(Self { ask }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn deserialize_ticker_update() { + let sample_response = r#" +[2308,{"a":["18215.60000",0,"0.27454523"],"b":["18197.50000",0,"0.63711255"],"c":["18197.50000","0.00413060"],"v":["2.78915585","156.15766485"],"p":["18200.94036","18275.19149"],"t":[22,1561],"l":["18162.40000","17944.90000"],"h":["18220.90000","18482.60000"],"o":["18220.90000","18478.90000"]},"ticker","XBT/USDT"]"#; + + let _ = serde_json::from_str::(sample_response).unwrap(); + } +} diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index b3898bf2..30fe5ae7 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -24,6 +24,7 @@ use swap::{ initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized, }, + kraken, }, bitcoin, database::Database, @@ -96,6 +97,8 @@ async fn main() -> Result<()> { bitcoin_wallet.new_address().await? ); + let rate_service = kraken::RateService::new().await?; + let (mut event_loop, _) = EventLoop::new( config.network.listen, seed, @@ -103,6 +106,7 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), + rate_service, ) .unwrap(); diff --git a/swap/src/monero.rs b/swap/src/monero.rs index bc40b4a0..da2c6994 100644 --- a/swap/src/monero.rs +++ b/swap/src/monero.rs @@ -77,6 +77,7 @@ pub struct Amount(u64); impl Amount { pub const ZERO: Self = Self(0); + pub const ONE_XMR: Self = Self(PICONERO_OFFSET); /// Create an [Amount] with piconero precision and the given number of /// piconeros. /// diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index a02bb24d..0e836451 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,4 +1,5 @@ use crate::{ + asb::LatestRate, bitcoin, database::Database, execution_params::ExecutionParams, @@ -24,9 +25,6 @@ use tokio::sync::{broadcast, mpsc, mpsc::error::SendError}; use tracing::{debug, error, trace}; use uuid::Uuid; -// TODO: Use dynamic -pub const RATE: u32 = 100; - #[allow(missing_debug_implementations)] pub struct MpscChannels { sender: mpsc::Sender, @@ -79,7 +77,7 @@ impl EventLoopHandle { } #[allow(missing_debug_implementations)] -pub struct EventLoop { +pub struct EventLoop { swarm: libp2p::Swarm, peer_id: PeerId, execution_params: ExecutionParams, @@ -87,6 +85,7 @@ pub struct EventLoop { monero_wallet: Arc, db: Arc, listen_address: Multiaddr, + rate_service: RS, recv_encrypted_signature: broadcast::Sender, send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, @@ -97,7 +96,10 @@ pub struct EventLoop { swap_handle_sender: mpsc::Sender>>, } -impl EventLoop { +impl EventLoop +where + RS: LatestRate, +{ pub fn new( listen_address: Multiaddr, seed: Seed, @@ -105,6 +107,7 @@ impl EventLoop { bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, + rate_service: RS, ) -> Result<(Self, mpsc::Receiver>>)> { let identity = network::Seed::new(seed).derive_libp2p_identity(); let behaviour = Behaviour::default(); @@ -132,6 +135,7 @@ impl EventLoop { monero_wallet, db, listen_address, + rate_service, recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof_sender: send_transfer_proof.sender, @@ -199,9 +203,10 @@ impl EventLoop { // 1. Check if acceptable request // 2. Send response + let rate = self.rate_service.latest_rate(); + let btc_amount = quote_request.btc_amount; - let xmr_amount = btc_amount.as_btc() * RATE as f64; - let xmr_amount = monero::Amount::from_monero(xmr_amount)?; + let xmr_amount = rate.sell_quote(btc_amount)?; let quote_response = QuoteResponse { xmr_amount }; self.swarm diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 1dbdc3b7..85a8fc30 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -14,18 +14,14 @@ use std::{ time::Duration, }; use swap::{ + asb::{fixed_rate, fixed_rate::RATE}, bitcoin, bitcoin::{CancelTimelock, PunishTimelock}, database::Database, execution_params, execution_params::{ExecutionParams, GetExecutionParams}, monero, - protocol::{ - alice, - alice::{event_loop::RATE, AliceState}, - bob, - bob::BobState, - }, + protocol::{alice, alice::AliceState, bob, bob::BobState}, seed::Seed, }; use tempfile::tempdir; @@ -324,7 +320,7 @@ where let (monero, containers) = testutils::init_containers(&cli).await; let btc_amount = bitcoin::Amount::from_sat(1_000_000); - let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() * RATE as f64).unwrap(); + let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / RATE).unwrap(); let alice_starting_balances = StartingBalances { xmr: xmr_amount * 10, @@ -390,6 +386,7 @@ where alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), alice_db, + fixed_rate::RateService::default(), ) .unwrap(); From 644f4c1732fd260225d51d52efbead2b0b15c344 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 18 Feb 2021 12:27:15 +1100 Subject: [PATCH 2/7] Bubble up ws error to consumer Note that because we are using `watch` channel, only a reference to the channel value can be returned. Hence, using custom Error that can be cloned to be able to pass `Result` through the channel. --- swap/src/asb.rs | 4 +- swap/src/asb/fixed_rate.rs | 6 +- swap/src/asb/kraken.rs | 83 ++++++++++++++++++++------- swap/src/protocol/alice/event_loop.rs | 11 +++- 4 files changed, 76 insertions(+), 28 deletions(-) diff --git a/swap/src/asb.rs b/swap/src/asb.rs index d788b27a..2fda0610 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -8,5 +8,7 @@ mod amounts; pub use amounts::Rate; pub trait LatestRate { - fn latest_rate(&mut self) -> Rate; + type Error: std::fmt::Debug; + + fn latest_rate(&mut self) -> Result; } diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index 2dc28a2c..31ce83bf 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -6,8 +6,10 @@ pub const RATE: f64 = 0.01; pub struct RateService(Rate); impl LatestRate for RateService { - fn latest_rate(&mut self) -> Rate { - self.0 + type Error = anyhow::Error; + + fn latest_rate(&mut self) -> anyhow::Result { + Ok(self.0) } } diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index 35da8e29..6113fabb 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -1,5 +1,5 @@ use crate::asb::{LatestRate, Rate}; -use anyhow::{anyhow, bail, Result}; +use bitcoin::util::amount::ParseAmountError; use futures::{SinkExt, StreamExt}; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -20,49 +20,89 @@ const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" #[derive(Clone)] pub struct RateService { - receiver: Receiver, + receiver: Receiver>, } impl LatestRate for RateService { - fn latest_rate(&mut self) -> Rate { - *self.receiver.borrow() + type Error = Error; + + fn latest_rate(&mut self) -> Result { + (*self.receiver.borrow()).clone() + } +} + +#[derive(Clone, Debug, thiserror::Error)] +pub enum Error { + #[error("Rate has not yet been retrieved from Kraken websocket API")] + NotYetRetrieved, + #[error("Message is not text")] + NonTextMessage, + #[error("Websocket: ")] + WebSocket(String), + #[error("Serde: ")] + Serde(String), + #[error("Data field is missing")] + DataFieldMissing, + #[error("Ask Rate Element is of unexpected type")] + UnexpectedAskRateElementType, + #[error("Ask Rate Element is missing")] + MissingAskRateElementType, + #[error("Bitcoin amount parse error: ")] + BitcoinParseAmount(#[from] ParseAmountError), +} + +impl From for Error { + fn from(err: tokio_tungstenite::tungstenite::Error) -> Self { + Error::WebSocket(format!("{:#}", err)) + } +} + +impl From for Error { + fn from(err: serde_json::Error) -> Self { + Error::Serde(format!("{:#}", err)) } } impl RateService { - pub async fn new() -> Result { - let (tx, rx) = watch::channel(Rate::ZERO); + pub async fn new() -> anyhow::Result { + let (tx, rx) = watch::channel(Err(Error::NotYetRetrieved)); let (ws, _response) = tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; let (mut write, mut read) = ws.split(); - // TODO: Handle the possibility of losing the connection - // to the Kraken WS. Currently the stream would produce no - // further items, and consumers would assume that the rate - // is up to date tokio::spawn(async move { while let Some(msg) = read.next().await { let msg = match msg { Ok(Message::Text(msg)) => msg, - _ => continue, + Ok(_) => { + let _ = tx.send(Err(Error::NonTextMessage)); + continue; + } + Err(e) => { + let _ = tx.send(Err(e.into())); + continue; + } }; let ticker = match serde_json::from_str::(&msg) { Ok(ticker) => ticker, - _ => continue, + Err(e) => { + let _ = tx.send(Err(e.into())); + continue; + } }; let rate = match Rate::try_from(ticker) { Ok(rate) => rate, Err(e) => { - log::error!("could not get rate from ticker update: {}", e); + let _ = tx.send(Err(e)); continue; } }; - let _ = tx.send(rate); + let _ = tx.send(Ok(rate)); } }); @@ -99,9 +139,9 @@ enum RateElement { } impl TryFrom for Rate { - type Error = anyhow::Error; + type Error = Error; - fn try_from(value: TickerUpdate) -> Result { + fn try_from(value: TickerUpdate) -> Result { let data = value .0 .iter() @@ -109,14 +149,14 @@ impl TryFrom for Rate { TickerField::Data(data) => Some(data), TickerField::Metadata(_) => None, }) - .ok_or_else(|| anyhow!("ticker update does not contain data"))?; - - let ask = data.ask.first().ok_or_else(|| anyhow!("no ask price"))?; + .ok_or(Error::DataFieldMissing)?; + // TODO: Ensure whether heartbeats returned by the api are being filtered. + let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; let ask = match ask { RateElement::Text(ask) => { bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? } - _ => bail!("unexpected ask rate element"), + _ => return Err(Error::UnexpectedAskRateElementType), }; Ok(Self { ask }) @@ -129,8 +169,7 @@ mod tests { #[tokio::test] async fn deserialize_ticker_update() { - let sample_response = r#" -[2308,{"a":["18215.60000",0,"0.27454523"],"b":["18197.50000",0,"0.63711255"],"c":["18197.50000","0.00413060"],"v":["2.78915585","156.15766485"],"p":["18200.94036","18275.19149"],"t":[22,1561],"l":["18162.40000","17944.90000"],"h":["18220.90000","18482.60000"],"o":["18220.90000","18478.90000"]},"ticker","XBT/USDT"]"#; + let sample_response = r#"[980,{"a":["0.00521900",4,"4.84775132"],"b":["0.00520600",70,"70.35668921"],"c":["0.00520700","0.00000186"],"v":["18530.40510860","18531.94887860"],"p":["0.00489493","0.00489490"],"t":[5017,5018],"l":["0.00448300","0.00448300"],"h":["0.00525000","0.00525000"],"o":["0.00450000","0.00451000"]},"ticker","XMR/XBT"]"#; let _ = serde_json::from_str::(sample_response).unwrap(); } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 0e836451..2cc498bf 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -14,7 +14,7 @@ use crate::{ }, seed::Seed, }; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use futures::future::RemoteHandle; use libp2p::{ core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, @@ -164,7 +164,9 @@ where debug!("Connection Established with {}", alice); } OutEvent::QuoteRequest { msg, channel, bob_peer_id } => { - let _ = self.handle_quote_request(msg, channel, bob_peer_id).await; + if let Err(error) = self.handle_quote_request(msg, channel, bob_peer_id).await { + error!("Failed to handle quote request: {:#}", error); + } } OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; @@ -203,7 +205,10 @@ where // 1. Check if acceptable request // 2. Send response - let rate = self.rate_service.latest_rate(); + let rate = self + .rate_service + .latest_rate() + .map_err(|e| anyhow!("Failed to get latest rate: {:?}", e))?; let btc_amount = quote_request.btc_amount; let xmr_amount = rate.sell_quote(btc_amount)?; From b20c16df787e58e64ef9dd1f8b15567eb0d49f41 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 19 Feb 2021 09:57:46 +1100 Subject: [PATCH 3/7] Improving logging on failure --- swap/tests/testutils/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 85a8fc30..ad2a3550 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -144,7 +144,13 @@ impl TestContext { .get_balance() .await .unwrap(); - assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.xmr_amount); + assert!( + xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.xmr_amount, + "{} !< {} - {}", + xmr_balance_after_swap, + self.alice_starting_balances.xmr, + self.xmr_amount + ); } pub async fn assert_alice_refunded(&mut self) { From 519d1a5701947999c9dd94cac47aa6f30fde7aea Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 19 Feb 2021 10:25:59 +1100 Subject: [PATCH 4/7] Log rate and amounts for Alice when doing execution setup --- swap/src/asb/amounts.rs | 8 +++++++- swap/src/protocol/alice/behaviour.rs | 1 - swap/src/protocol/alice/event_loop.rs | 7 ++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/swap/src/asb/amounts.rs b/swap/src/asb/amounts.rs index fc43ffaf..228672d9 100644 --- a/swap/src/asb/amounts.rs +++ b/swap/src/asb/amounts.rs @@ -1,7 +1,7 @@ use crate::{bitcoin, monero}; use anyhow::{anyhow, Result}; use rust_decimal::{prelude::ToPrimitive, Decimal}; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; /// Prices at which 1 XMR will be traded, in BTC (XMR/BTC pair) /// The `ask` represents the minimum price in BTC for which we are willing to @@ -48,6 +48,12 @@ impl Rate { } } +impl Display for Rate { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.ask) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index ea5a814f..eef2350c 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -127,7 +127,6 @@ impl Behaviour { pub fn start_execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) { self.execution_setup.run(bob_peer_id, state0); - info!("Start execution setup with {}", bob_peer_id); } /// Send Transfer Proof to Bob. diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 2cc498bf..b0a03d71 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -22,7 +22,7 @@ use libp2p::{ use rand::rngs::OsRng; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, mpsc::error::SendError}; -use tracing::{debug, error, trace}; +use tracing::{debug, error, info, trace}; use uuid::Uuid; #[allow(missing_debug_implementations)] @@ -229,6 +229,11 @@ where ) .await?; + info!( + "Starting execution setup to sell {} for {} (rate of {}) with {}", + xmr_amount, btc_amount, rate, bob_peer_id + ); + self.swarm.start_execution_setup(bob_peer_id, state0); // Continues once the execution setup protocol is done Ok(()) From b47b06aa2353d97b5defa5554cce95a949c24464 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 19 Feb 2021 17:00:45 +1100 Subject: [PATCH 5/7] Import anyhow::Result across the codebase There is no need to fully qualify this type because it is a type alias for std::Result. We can mix and match the two as we want. --- swap/src/asb/fixed_rate.rs | 3 ++- swap/src/asb/kraken.rs | 3 ++- swap/src/bitcoin.rs | 2 +- swap/src/cli/command.rs | 3 ++- swap/src/database.rs | 6 +++--- swap/src/fs.rs | 4 ++-- swap/src/monero.rs | 8 ++++---- swap/src/protocol/alice/behaviour.rs | 2 +- swap/src/protocol/bob/state.rs | 2 +- swap/src/trace.rs | 3 ++- swap/tests/testutils/mod.rs | 2 +- 11 files changed, 21 insertions(+), 17 deletions(-) diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index 31ce83bf..53d42308 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -1,4 +1,5 @@ use crate::asb::{LatestRate, Rate}; +use anyhow::Result; pub const RATE: f64 = 0.01; @@ -8,7 +9,7 @@ pub struct RateService(Rate); impl LatestRate for RateService { type Error = anyhow::Error; - fn latest_rate(&mut self) -> anyhow::Result { + fn latest_rate(&mut self) -> Result { Ok(self.0) } } diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index 6113fabb..f1f38732 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -1,4 +1,5 @@ use crate::asb::{LatestRate, Rate}; +use anyhow::Result; use bitcoin::util::amount::ParseAmountError; use futures::{SinkExt, StreamExt}; use reqwest::Url; @@ -64,7 +65,7 @@ impl From for Error { } impl RateService { - pub async fn new() -> anyhow::Result { + pub async fn new() -> Result { let (tx, rx) = watch::channel(Err(Error::NotYetRetrieved)); let (ws, _response) = diff --git a/swap/src/bitcoin.rs b/swap/src/bitcoin.rs index f308574a..9d12d8b6 100644 --- a/swap/src/bitcoin.rs +++ b/swap/src/bitcoin.rs @@ -283,7 +283,7 @@ pub async fn current_epoch( cancel_timelock: CancelTimelock, punish_timelock: PunishTimelock, lock_tx_id: ::bitcoin::Txid, -) -> anyhow::Result +) -> Result where W: WatchForRawTransaction + TransactionBlockHeight + GetBlockHeight, { diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index 1b4c8f45..88348f21 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -1,4 +1,5 @@ use crate::bitcoin; +use anyhow::Result; use libp2p::{core::Multiaddr, PeerId}; use std::path::PathBuf; use uuid::Uuid; @@ -85,7 +86,7 @@ pub enum Refund { }, } -fn parse_btc(str: &str) -> anyhow::Result { +fn parse_btc(str: &str) -> Result { let amount = bitcoin::Amount::from_str_in(str, ::bitcoin::Denomination::Bitcoin)?; Ok(amount) } diff --git a/swap/src/database.rs b/swap/src/database.rs index 33c02b0c..8d963918 100644 --- a/swap/src/database.rs +++ b/swap/src/database.rs @@ -65,7 +65,7 @@ impl Database { .context("Could not flush db") } - pub fn get_state(&self, swap_id: Uuid) -> anyhow::Result { + pub fn get_state(&self, swap_id: Uuid) -> Result { let key = serialize(&swap_id)?; let encoded = self @@ -97,14 +97,14 @@ impl Database { } } -pub fn serialize(t: &T) -> anyhow::Result> +pub fn serialize(t: &T) -> Result> where T: Serialize, { Ok(serde_cbor::to_vec(t)?) } -pub fn deserialize(v: &[u8]) -> anyhow::Result +pub fn deserialize(v: &[u8]) -> Result where T: DeserializeOwned, { diff --git a/swap/src/fs.rs b/swap/src/fs.rs index fc4dedbe..38be1bca 100644 --- a/swap/src/fs.rs +++ b/swap/src/fs.rs @@ -1,4 +1,4 @@ -use anyhow::Context; +use anyhow::{Context, Result}; use directories_next::ProjectDirs; use std::path::{Path, PathBuf}; @@ -9,7 +9,7 @@ fn default_config_dir() -> Option { ProjectDirs::from("", "", "xmr-btc-swap").map(|proj_dirs| proj_dirs.config_dir().to_path_buf()) } -pub fn default_config_path() -> anyhow::Result { +pub fn default_config_path() -> Result { default_config_dir() .map(|dir| Path::join(&dir, "config.toml")) .context("Could not generate default configuration path") diff --git a/swap/src/monero.rs b/swap/src/monero.rs index da2c6994..a01c68c3 100644 --- a/swap/src/monero.rs +++ b/swap/src/monero.rs @@ -186,7 +186,7 @@ pub trait Transfer { public_spend_key: PublicKey, public_view_key: PublicViewKey, amount: Amount, - ) -> anyhow::Result<(TransferProof, Amount)>; + ) -> Result<(TransferProof, Amount)>; } #[async_trait] @@ -215,17 +215,17 @@ pub trait CreateWalletForOutput { private_spend_key: PrivateKey, private_view_key: PrivateViewKey, restore_height: Option, - ) -> anyhow::Result<()>; + ) -> Result<()>; } #[async_trait] pub trait OpenWallet { - async fn open_wallet(&self, file_name: &str) -> anyhow::Result<()>; + async fn open_wallet(&self, file_name: &str) -> Result<()>; } #[async_trait] pub trait CreateWallet { - async fn create_wallet(&self, file_name: &str) -> anyhow::Result<()>; + async fn create_wallet(&self, file_name: &str) -> Result<()>; } #[derive(thiserror::Error, Debug, Clone, PartialEq)] diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index eef2350c..8aa59904 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -119,7 +119,7 @@ impl Behaviour { &mut self, channel: ResponseChannel, quote_response: QuoteResponse, - ) -> anyhow::Result<()> { + ) -> Result<()> { self.quote_response.send(channel, quote_response)?; info!("Sent quote response"); Ok(()) diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 5f33db16..ee30275f 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -140,7 +140,7 @@ impl State0 { } } - pub async fn receive(self, wallet: &W, msg: Message1) -> anyhow::Result + pub async fn receive(self, wallet: &W, msg: Message1) -> Result where W: BuildTxLockPsbt + GetNetwork, { diff --git a/swap/src/trace.rs b/swap/src/trace.rs index 252d57fc..d711c148 100644 --- a/swap/src/trace.rs +++ b/swap/src/trace.rs @@ -1,10 +1,11 @@ +use anyhow::Result; use atty::{self}; use log::LevelFilter; use tracing::{info, subscriber}; use tracing_log::LogTracer; use tracing_subscriber::FmtSubscriber; -pub fn init_tracing(level: LevelFilter) -> anyhow::Result<()> { +pub fn init_tracing(level: LevelFilter) -> Result<()> { if level == LevelFilter::Off { return Ok(()); } diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index ad2a3550..02464e39 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -84,7 +84,7 @@ pub struct TestContext { alice_starting_balances: StartingBalances, alice_bitcoin_wallet: Arc, alice_monero_wallet: Arc, - alice_swap_handle: mpsc::Receiver>>, + alice_swap_handle: mpsc::Receiver>>, bob_params: BobParams, bob_starting_balances: StartingBalances, From a8bfc1d686f737ec8363499988290cf0a9f3a160 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 19 Feb 2021 17:04:42 +1100 Subject: [PATCH 6/7] Make LatestRate::Error require std::error::Error trait bound This allows us to use .context instead of .map_err when calling `latest_rate()`. For the static rate module, we simply fill in `Infallible` which is actually better suited because it describes that we are never using this error. --- swap/src/asb.rs | 2 +- swap/src/asb/fixed_rate.rs | 6 +++--- swap/src/protocol/alice/event_loop.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 2fda0610..77006526 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -8,7 +8,7 @@ mod amounts; pub use amounts::Rate; pub trait LatestRate { - type Error: std::fmt::Debug; + type Error: std::error::Error + Send + Sync + 'static; fn latest_rate(&mut self) -> Result; } diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index 53d42308..1bf56364 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -1,5 +1,5 @@ use crate::asb::{LatestRate, Rate}; -use anyhow::Result; +use std::convert::Infallible; pub const RATE: f64 = 0.01; @@ -7,9 +7,9 @@ pub const RATE: f64 = 0.01; pub struct RateService(Rate); impl LatestRate for RateService { - type Error = anyhow::Error; + type Error = Infallible; - fn latest_rate(&mut self) -> Result { + fn latest_rate(&mut self) -> Result { Ok(self.0) } } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index b0a03d71..57c6f546 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -14,7 +14,7 @@ use crate::{ }, seed::Seed, }; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use futures::future::RemoteHandle; use libp2p::{ core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, @@ -208,7 +208,7 @@ where let rate = self .rate_service .latest_rate() - .map_err(|e| anyhow!("Failed to get latest rate: {:?}", e))?; + .context("Failed to get latest rate")?; let btc_amount = quote_request.btc_amount; let xmr_amount = rate.sell_quote(btc_amount)?; From 9496dce91708f6196074097320387c221f48ad10 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 22 Feb 2021 14:51:22 +1100 Subject: [PATCH 7/7] Skip heartbeat messages --- swap/src/asb/kraken.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index f1f38732..62aa4d4d 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -87,6 +87,11 @@ impl RateService { } }; + // If we encounter a heartbeat we skip it and iterate again + if msg.eq(r#"{"event":"heartbeat"}"#) { + continue; + } + let ticker = match serde_json::from_str::(&msg) { Ok(ticker) => ticker, Err(e) => { @@ -151,7 +156,6 @@ impl TryFrom for Rate { TickerField::Metadata(_) => None, }) .ok_or(Error::DataFieldMissing)?; - // TODO: Ensure whether heartbeats returned by the api are being filtered. let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; let ask = match ask { RateElement::Text(ask) => {