From f6497778ed5dfe616b950b496b25cb081f1df083 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 29 Apr 2021 19:26:19 +1000 Subject: [PATCH 1/6] Add resume-only mode for the ASB Resume-only is a maintenance mode where no swaps are accepted but unfinished swaps are resumed. This is achieve by ignoring incoming spot-price requests (that would lead to execution setup) in the event-loop. --- CHANGELOG.md | 5 +++++ swap/src/asb/command.rs | 6 ++++++ swap/src/bin/asb.rs | 2 ++ swap/src/network/spot_price.rs | 11 ++++++++++- swap/src/protocol/alice/event_loop.rs | 20 +++++++++++++++++++- swap/src/protocol/bob/event_loop.rs | 19 +++++++++++++++---- swap/tests/harness/mod.rs | 1 + 7 files changed, 58 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ee491b2..f601fd4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Resume-only mode for the ASB. + When started with `--resume-only` the ASB does not accept new, incoming swap requests but only finishes swaps that are resumed upon startup. + ### Fixed - An issue where both the ASB and the CLI point to the same default directory `xmr-btc-swap` for storing data. diff --git a/swap/src/asb/command.rs b/swap/src/asb/command.rs index 298f5570..85c82d1f 100644 --- a/swap/src/asb/command.rs +++ b/swap/src/asb/command.rs @@ -34,6 +34,12 @@ pub enum Command { default_value = "0.02" )] ask_spread: Decimal, + + #[structopt( + long = "resume-only", + help = "For maintenance only. When set, no new swap requests will be accepted, but existing unfinished swaps will be resumed." + )] + resume_only: bool, }, History, WithdrawBtc { diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 4d99aadb..80416503 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -81,6 +81,7 @@ async fn main() -> Result<()> { Command::Start { max_buy, ask_spread, + resume_only, } => { let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?; let monero_wallet = init_monero_wallet(&config, env_config).await?; @@ -133,6 +134,7 @@ async fn main() -> Result<()> { Arc::new(db), KrakenRate::new(ask_spread, kraken_price_updates), max_buy, + resume_only, ) .unwrap(); diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 560f8b17..bd2ca0a7 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -41,7 +41,16 @@ pub struct Request { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Response { - pub xmr: monero::Amount, + pub xmr: Option, + pub error: Option, +} + +#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)] +pub enum Error { + #[error( + "This seller currently does not accept incoming swap requests, please try again later" + )] + MaintenanceMode, } /// Constructs a new instance of the `spot-price` behaviour to be used by Alice. diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 45258ef8..b81fecfe 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -42,6 +42,7 @@ pub struct EventLoop { max_buy: bitcoin::Amount, swap_sender: mpsc::Sender, + resume_only: bool, /// Stores incoming [`EncryptedSignature`]s per swap. recv_encrypted_signature: HashMap>, @@ -62,6 +63,7 @@ impl EventLoop where LR: LatestRate, { + #[allow(clippy::too_many_arguments)] pub fn new( swarm: Swarm, env_config: Config, @@ -70,6 +72,7 @@ where db: Arc, latest_rate: LR, max_buy: bitcoin::Amount, + resume_only: bool, ) -> Result<(Self, mpsc::Receiver)> { let swap_channel = MpscChannels::default(); @@ -81,6 +84,7 @@ where db, latest_rate, swap_sender: swap_channel.sender, + resume_only, max_buy, recv_encrypted_signature: Default::default(), inflight_encrypted_signatures: Default::default(), @@ -144,6 +148,20 @@ where swarm_event = self.swarm.next_event() => { match swarm_event { SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { + if self.resume_only { + tracing::warn!(%peer, "Ignoring spot price request from {} because ASB started in resume-only mode", peer); + + match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response { xmr: None, error: Some(spot_price::Error::MaintenanceMode) }) { + Ok(_) => {}, + Err(_) => { + tracing::debug!(%peer, "Failed to respond with error to spot price request"); + continue; + } + } + + continue; + } + let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { Ok(xmr) => xmr, @@ -153,7 +171,7 @@ where } }; - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response { xmr }) { + match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response { xmr: Some(xmr), error: None }) { Ok(_) => {}, Err(_) => { // if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index c4bfc0a8..1b978efe 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,7 +3,7 @@ use crate::network::quote::BidQuote; use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::{FutureExt, StreamExt}; use libp2p::request_response::{RequestId, ResponseChannel}; @@ -261,11 +261,22 @@ impl EventLoopHandle { } pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { - Ok(self + let response = self .spot_price .send_receive(spot_price::Request { btc }) - .await? - .xmr) + .await?; + + match (response.xmr, response.error) { + (Some(xmr), None) => Ok(xmr), + (_, Some(error)) => { + bail!(error); + } + (None, None) => { + bail!( + "Unexpected response for spot-price request, neither price nor error received" + ); + } + } } pub async fn request_quote(&mut self) -> Result { diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index cadce4c4..de8bd925 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -234,6 +234,7 @@ fn start_alice( db, FixedRate::default(), bitcoin::Amount::ONE_BTC, + false, ) .unwrap(); From ea76ae582158a248bf6ed9542e6cdb87cb4e543d Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 30 Apr 2021 12:36:26 +1000 Subject: [PATCH 2/6] Return proper error to CLI for all expected scenarios When a CLI requests a spot price have some errors that are expected, where we can provide a proper error message for the CLI: - Balance of ASB too low - Buy amount sent by CLI exceeds maximum buy amount accepted by ASB - ASB is running in maintenance mode and does not accept incoming swap requests All of these errors returns a proper error to the CLI and prints a warning in the ASB logs. Any other unexpected error will result in closing the channel with the CLI and printing an error in the ASB logs. --- CHANGELOG.md | 6 +++ swap/src/monero.rs | 6 --- swap/src/network/spot_price.rs | 20 +++++++-- swap/src/protocol/alice/event_loop.rs | 64 ++++++++++++--------------- swap/src/protocol/bob/event_loop.rs | 12 ++--- 5 files changed, 55 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f601fd4f..37152d5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 If you want to access data created by a previous version you will have to rename the data folder or one of the following: 1. For the CLI you can use `--data-dir` to point to the old directory. 2. For the ASB you can change the data-dir in the config file of the ASB. +- The CLI receives proper Error messages if setting up a swap with the ASB fails. + This is a breaking change because the spot-price protocol response changed. + Expected errors scenarios that are now reported back to the CLI: + 1. Balance of ASB too low + 2. Buy amount sent by CLI exceeds maximum buy amount accepted by ASB + 3. ASB is running in resume-only mode and does not accept incoming swap requests ## [0.5.0] - 2021-04-17 diff --git a/swap/src/monero.rs b/swap/src/monero.rs index c52bd0b5..6886cbf1 100644 --- a/swap/src/monero.rs +++ b/swap/src/monero.rs @@ -192,12 +192,6 @@ pub struct InsufficientFunds { pub actual: Amount, } -#[derive(Debug, Clone, Copy, thiserror::Error)] -#[error("The balance is too low, current balance: {balance}")] -pub struct BalanceTooLow { - pub balance: Amount, -} - #[derive(thiserror::Error, Debug, Clone, PartialEq)] #[error("Overflow, cannot convert {0} to u64")] pub struct OverflowError(pub String); diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index bd2ca0a7..24671481 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,6 +1,6 @@ +use crate::monero; use crate::network::cbor_request_response::CborCodec; use crate::protocol::{alice, bob}; -use crate::{bitcoin, monero}; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, @@ -40,9 +40,9 @@ pub struct Request { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Response { - pub xmr: Option, - pub error: Option, +pub enum Response { + Xmr(monero::Amount), + Error(Error), } #[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)] @@ -51,6 +51,18 @@ pub enum Error { "This seller currently does not accept incoming swap requests, please try again later" )] MaintenanceMode, + #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] + MaxBuyAmountExceeded { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + max: bitcoin::Amount, + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + buy: bitcoin::Amount, + }, + #[error("This seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")] + BalanceTooLow { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + buy: bitcoin::Amount, + }, } /// Constructs a new instance of the `spot-price` behaviour to be used by Alice. diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index b81fecfe..bf2b5ba0 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,12 +1,11 @@ use crate::asb::Rate; use crate::database::Database; use crate::env::Config; -use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; use crate::network::{spot_price, transfer_proof}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; -use anyhow::{bail, Context, Result}; +use anyhow::{Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; @@ -148,30 +147,30 @@ where swarm_event = self.swarm.next_event() => { match swarm_event { SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { - if self.resume_only { - tracing::warn!(%peer, "Ignoring spot price request from {} because ASB started in resume-only mode", peer); - - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response { xmr: None, error: Some(spot_price::Error::MaintenanceMode) }) { - Ok(_) => {}, - Err(_) => { - tracing::debug!(%peer, "Failed to respond with error to spot price request"); - continue; - } - } - - continue; - } - let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { - Ok(xmr) => xmr, + Ok(xmr) => match xmr { + Ok(xmr) => xmr, + Err(e) => { + tracing::warn!(%peer, "Ignoring spot price request from {} because: {:#}", peer, e); + match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Error(e)) { + Ok(_) => { + continue; + }, + Err(_) => { + tracing::debug!(%peer, "Failed to respond with error to spot price request"); + continue; + } + } + } + }, Err(e) => { - tracing::warn!(%peer, "Failed to produce spot price for {}: {:#}", btc, e); + tracing::error!(%peer, "Unrecoverable error while producing spot price for {}: {:#}", btc, e); continue; } }; - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response { xmr: Some(xmr), error: None }) { + match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Xmr(xmr)) { Ok(_) => {}, Err(_) => { // if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug @@ -365,17 +364,21 @@ where &mut self, btc: bitcoin::Amount, monero_wallet: Arc, - ) -> Result { + ) -> Result> { + if self.resume_only { + return Ok(Err(spot_price::Error::MaintenanceMode)); + } + let rate = self .latest_rate .latest_rate() .context("Failed to get latest rate")?; if btc > self.max_buy { - bail!(MaximumBuyAmountExceeded { - actual: btc, - max: self.max_buy - }) + return Ok(Err(spot_price::Error::MaxBuyAmountExceeded { + buy: btc, + max: self.max_buy, + })); } let xmr_balance = monero_wallet.get_balance().await?; @@ -383,12 +386,10 @@ where let xmr = rate.sell_quote(btc)?; if xmr_balance < xmr + xmr_lock_fees { - bail!(BalanceTooLow { - balance: xmr_balance - }) + return Ok(Err(spot_price::Error::BalanceTooLow { buy: btc })); } - Ok(xmr) + Ok(Ok(xmr)) } async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result { @@ -569,13 +570,6 @@ impl EventLoopHandle { } } -#[derive(Debug, Clone, Copy, thiserror::Error)] -#[error("Refusing to buy {actual} because the maximum configured limit is {max}")] -pub struct MaximumBuyAmountExceeded { - pub max: bitcoin::Amount, - pub actual: bitcoin::Amount, -} - #[allow(missing_debug_implementations)] struct MpscChannels { sender: mpsc::Sender, diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 1b978efe..2045a94f 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,5 +1,6 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; +use crate::network::spot_price::Response; use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; @@ -266,16 +267,11 @@ impl EventLoopHandle { .send_receive(spot_price::Request { btc }) .await?; - match (response.xmr, response.error) { - (Some(xmr), None) => Ok(xmr), - (_, Some(error)) => { + match response { + Response::Xmr(xmr) => Ok(xmr), + Response::Error(error) => { bail!(error); } - (None, None) => { - bail!( - "Unexpected response for spot-price request, neither price nor error received" - ); - } } } From 52f648e1de9c08c34d39523b99e5c6d94a934796 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Tue, 4 May 2021 14:57:50 +1000 Subject: [PATCH 3/6] Alice's spot price logic into dedicated behaviour Move Alice's spot price logic into a dedicated network behaviour that handles all the logic. The new behaviour encapsulates the complete state necessary for spot price request decision making. The network behaviour cannot handle asynchronous calls, thus the balance is managed inside the spot price and has to updated regularly from the outside to ensure the spot price balance check has up to date data. At the moment the balance is updated upon an incoming quote requests. Code that is relevant for both ASB and CLI remains in the `network::spot_price` module (e.g. `network::spot_price::Error`). --- swap/src/bin/asb.rs | 15 +- swap/src/network/spot_price.rs | 37 +---- swap/src/network/swarm.rs | 20 ++- swap/src/protocol/alice.rs | 1 + swap/src/protocol/alice/behaviour.rs | 37 +++-- swap/src/protocol/alice/event_loop.rs | 96 +++---------- swap/src/protocol/alice/spot_price.rs | 199 ++++++++++++++++++++++++++ swap/tests/harness/mod.rs | 25 +++- 8 files changed, 304 insertions(+), 126 deletions(-) create mode 100644 swap/src/protocol/alice/spot_price.rs diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 80416503..5d099055 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -119,7 +119,17 @@ async fn main() -> Result<()> { } }; - let mut swarm = swarm::alice(&seed)?; + let current_balance = monero_wallet.get_balance().await?; + let lock_fee = monero_wallet.static_tx_fee_estimate(); + let kraken_rate = KrakenRate::new(ask_spread, kraken_price_updates); + let mut swarm = swarm::alice( + &seed, + current_balance, + lock_fee, + max_buy, + kraken_rate.clone(), + resume_only, + )?; for listen in config.network.listen { Swarm::listen_on(&mut swarm, listen.clone()) @@ -132,9 +142,8 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), - KrakenRate::new(ask_spread, kraken_price_updates), + kraken_rate, max_buy, - resume_only, ) .unwrap(); diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 24671481..04aaaf67 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,6 +1,6 @@ use crate::monero; use crate::network::cbor_request_response::CborCodec; -use crate::protocol::{alice, bob}; +use crate::protocol::bob; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, @@ -10,7 +10,7 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0"; -type OutEvent = RequestResponseEvent; +pub type OutEvent = RequestResponseEvent; type Message = RequestResponseMessage; pub type Behaviour = RequestResponse>; @@ -50,7 +50,7 @@ pub enum Error { #[error( "This seller currently does not accept incoming swap requests, please try again later" )] - MaintenanceMode, + NoSwapsAccepted, #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] MaxBuyAmountExceeded { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] @@ -63,18 +63,11 @@ pub enum Error { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] buy: bitcoin::Amount, }, -} -/// Constructs a new instance of the `spot-price` behaviour to be used by Alice. -/// -/// Alice only supports inbound connections, i.e. providing spot prices for BTC -/// in XMR. -pub fn alice() -> Behaviour { - Behaviour::new( - CborCodec::default(), - vec![(SpotPriceProtocol, ProtocolSupport::Inbound)], - RequestResponseConfig::default(), - ) + /// To be used for errors that cannot be explained on the CLI side (e.g. + /// rate update problems on the seller side) + #[error("The seller encountered a problem, please try again later.")] + Other, } /// Constructs a new instance of the `spot-price` behaviour to be used by Bob. @@ -89,22 +82,6 @@ pub fn bob() -> Behaviour { ) } -impl From<(PeerId, Message)> for alice::OutEvent { - fn from((peer, message): (PeerId, Message)) -> Self { - match message { - Message::Request { - request, channel, .. - } => Self::SpotPriceRequested { - request, - channel, - peer, - }, - Message::Response { .. } => Self::unexpected_response(peer), - } - } -} -crate::impl_from_rr_event!(OutEvent, alice::OutEvent, PROTOCOL); - impl From<(PeerId, Message)> for bob::OutEvent { fn from((peer, message): (PeerId, Message)) -> Self { match message { diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index ebcce5fa..75ea57af 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,13 +1,27 @@ use crate::network::transport; +use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::{alice, bob}; use crate::seed::Seed; -use crate::tor; +use crate::{monero, tor}; use anyhow::Result; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::{PeerId, Swarm}; -pub fn alice(seed: &Seed) -> Result> { - with_clear_net(seed, alice::Behaviour::default()) +pub fn alice( + seed: &Seed, + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, +) -> Result>> +where + LR: LatestRate + Send + 'static, +{ + with_clear_net( + seed, + alice::Behaviour::new(balance, lock_fee, max_buy, latest_rate, resume_only), + ) } pub async fn bob( diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 5909a701..2410e289 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -14,6 +14,7 @@ pub use self::swap::{run, run_until}; mod behaviour; pub mod event_loop; mod execution_setup; +mod spot_price; pub mod state; pub mod swap; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index fb38403b..b12db8c9 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,6 +1,8 @@ +use crate::monero; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; -use crate::protocol::alice::{execution_setup, State3}; +use crate::network::{encrypted_signature, quote, transfer_proof}; +use crate::protocol::alice::event_loop::LatestRate; +use crate::protocol::alice::{execution_setup, spot_price, State3}; use anyhow::{anyhow, Error}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; @@ -8,10 +10,10 @@ use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { - SpotPriceRequested { - request: spot_price::Request, - channel: ResponseChannel, + ExecutionSetupStart { peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, }, QuoteRequested { channel: ResponseChannel, @@ -60,19 +62,34 @@ impl OutEvent { #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] -pub struct Behaviour { +pub struct Behaviour { pub quote: quote::Behaviour, - pub spot_price: spot_price::Behaviour, + pub spot_price: spot_price::Behaviour, pub execution_setup: execution_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour, } -impl Default for Behaviour { - fn default() -> Self { +impl Behaviour +where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { Self { quote: quote::alice(), - spot_price: spot_price::alice(), + spot_price: spot_price::Behaviour::new( + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + ), execution_setup: Default::default(), transfer_proof: transfer_proof::alice(), encrypted_signature: encrypted_signature::alice(), diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index bf2b5ba0..049c7af5 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,7 +2,7 @@ use crate::asb::Rate; use crate::database::Database; use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transfer_proof}; +use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{Context, Result}; @@ -31,17 +31,16 @@ type OutgoingTransferProof = BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>; #[allow(missing_debug_implementations)] -pub struct EventLoop { - swarm: libp2p::Swarm, +pub struct EventLoop { + swarm: libp2p::Swarm>, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - latest_rate: RS, + latest_rate: LR, max_buy: bitcoin::Amount, swap_sender: mpsc::Sender, - resume_only: bool, /// Stores incoming [`EncryptedSignature`]s per swap. recv_encrypted_signature: HashMap>, @@ -60,18 +59,17 @@ pub struct EventLoop { impl EventLoop where - LR: LatestRate, + LR: LatestRate + Send + 'static, { #[allow(clippy::too_many_arguments)] pub fn new( - swarm: Swarm, + swarm: Swarm>, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, latest_rate: LR, max_buy: bitcoin::Amount, - resume_only: bool, ) -> Result<(Self, mpsc::Receiver)> { let swap_channel = MpscChannels::default(); @@ -83,7 +81,6 @@ where db, latest_rate, swap_sender: swap_channel.sender, - resume_only, max_buy, recv_encrypted_signature: Default::default(), inflight_encrypted_signatures: Default::default(), @@ -146,38 +143,8 @@ where tokio::select! { swarm_event = self.swarm.next_event() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { - let btc = request.btc; - let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { - Ok(xmr) => match xmr { - Ok(xmr) => xmr, - Err(e) => { - tracing::warn!(%peer, "Ignoring spot price request from {} because: {:#}", peer, e); - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Error(e)) { - Ok(_) => { - continue; - }, - Err(_) => { - tracing::debug!(%peer, "Failed to respond with error to spot price request"); - continue; - } - } - } - }, - Err(e) => { - tracing::error!(%peer, "Unrecoverable error while producing spot price for {}: {:#}", btc, e); - continue; - } - }; + SwarmEvent::Behaviour(OutEvent::ExecutionSetupStart { peer, btc, xmr }) => { - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Xmr(xmr)) { - Ok(_) => {}, - Err(_) => { - // if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug - tracing::debug!(%peer, "Failed to respond with spot price"); - continue; - } - } let tx_redeem_fee = self.bitcoin_wallet .estimate_fee(bitcoin::TxRedeem::weight(), btc) .await; @@ -195,7 +162,7 @@ where (redeem_address, punish_address) } _ => { - tracing::error!("Could not get new address."); + tracing::error!(%peer, "Failed to get new address during execution setup."); continue; } }; @@ -208,7 +175,7 @@ where (tx_redeem_fee, tx_punish_fee) } _ => { - tracing::error!("Could not calculate transaction fees."); + tracing::error!(%peer, "Failed to calculate transaction fees during execution setup."); continue; } }; @@ -233,6 +200,17 @@ where self.swarm.behaviour_mut().execution_setup.run(peer, state0); } SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { + // TODO: Move the spot-price update into dedicated update stream to decouple it from quote requests + let current_balance = self.monero_wallet.get_balance().await; + match current_balance { + Ok(balance) => { + self.swarm.behaviour_mut().spot_price.update_balance(balance); + } + Err(e) => { + tracing::error!("Failed to fetch Monero balance: {:#}", e); + } + } + let quote = match self.make_quote(self.max_buy).await { Ok(quote) => quote, Err(e) => { @@ -360,38 +338,6 @@ where } } - async fn handle_spot_price_request( - &mut self, - btc: bitcoin::Amount, - monero_wallet: Arc, - ) -> Result> { - if self.resume_only { - return Ok(Err(spot_price::Error::MaintenanceMode)); - } - - let rate = self - .latest_rate - .latest_rate() - .context("Failed to get latest rate")?; - - if btc > self.max_buy { - return Ok(Err(spot_price::Error::MaxBuyAmountExceeded { - buy: btc, - max: self.max_buy, - })); - } - - let xmr_balance = monero_wallet.get_balance().await?; - let xmr_lock_fees = monero_wallet.static_tx_fee_estimate(); - let xmr = rate.sell_quote(btc)?; - - if xmr_balance < xmr + xmr_lock_fees { - return Ok(Err(spot_price::Error::BalanceTooLow { buy: btc })); - } - - Ok(Ok(xmr)) - } - async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result { let rate = self .latest_rate @@ -510,7 +456,7 @@ impl LatestRate for FixedRate { /// Produces [`Rate`]s based on [`PriceUpdate`]s from kraken and a configured /// spread. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct KrakenRate { ask_spread: Decimal, price_updates: kraken::PriceUpdates, diff --git a/swap/src/protocol/alice/spot_price.rs b/swap/src/protocol/alice/spot_price.rs new file mode 100644 index 00000000..6bf64e6f --- /dev/null +++ b/swap/src/protocol/alice/spot_price.rs @@ -0,0 +1,199 @@ +use crate::monero; +use crate::network::cbor_request_response::CborCodec; +use crate::network::spot_price; +use crate::network::spot_price::SpotPriceProtocol; +use crate::protocol::alice; +use crate::protocol::alice::event_loop::LatestRate; +use libp2p::request_response::{ + ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, + ResponseChannel, +}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; +use libp2p::{NetworkBehaviour, PeerId}; +use std::collections::VecDeque; +use std::task::{Context, Poll}; + +pub struct OutEvent { + peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + behaviour: spot_price::Behaviour, + + #[behaviour(ignore)] + events: VecDeque, + + #[behaviour(ignore)] + balance: monero::Amount, + #[behaviour(ignore)] + lock_fee: monero::Amount, + #[behaviour(ignore)] + max_buy: bitcoin::Amount, + #[behaviour(ignore)] + latest_rate: LR, + #[behaviour(ignore)] + resume_only: bool, +} + +/// Behaviour that handles spot prices. +/// All the logic how to react to a spot price request is contained here, events +/// reporting the successful handling of a spot price request or a failure are +/// bubbled up to the parent behaviour. +impl Behaviour +where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { + Self { + behaviour: spot_price::Behaviour::new( + CborCodec::default(), + vec![(SpotPriceProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ), + events: Default::default(), + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + } + } + + pub fn update_balance(&mut self, balance: monero::Amount) { + self.balance = balance; + } + + fn send_error_response( + &mut self, + peer: PeerId, + channel: ResponseChannel, + error: spot_price::Error, + ) { + if self + .behaviour + .send_response(channel, spot_price::Response::Error(error)) + .is_err() + { + tracing::debug!(%peer, "Unable to send error response for spot price request"); + } + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + // We trust in libp2p to poll us. + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess for Behaviour +where + LR: LatestRate + Send + 'static, +{ + fn inject_event(&mut self, event: spot_price::OutEvent) { + let (peer, message) = match event { + RequestResponseEvent::Message { peer, message } => (peer, message), + RequestResponseEvent::OutboundFailure { peer, error, .. } => { + tracing::error!(%peer, "Failure sending spot price response: {:#}", error); + return; + } + RequestResponseEvent::InboundFailure { peer, error, .. } => { + tracing::warn!(%peer, "Inbound failure when handling spot price request: {:#}", error); + return; + } + RequestResponseEvent::ResponseSent { peer, .. } => { + tracing::debug!(%peer, "Spot price response sent"); + return; + } + }; + + let (request, channel) = match message { + RequestResponseMessage::Request { + request, channel, .. + } => (request, channel), + RequestResponseMessage::Response { .. } => { + tracing::error!("Unexpected message"); + return; + } + }; + + if self.resume_only { + tracing::warn!(%peer, "Ignoring spot price request from {} because ASB is running in resume-only mode", peer); + self.send_error_response(peer, channel, spot_price::Error::NoSwapsAccepted); + return; + } + + let btc = request.btc; + if btc > self.max_buy { + tracing::warn!(%peer, "Ignoring spot price request from {} because max muy amount exceeded", peer); + self.send_error_response(peer, channel, spot_price::Error::MaxBuyAmountExceeded { + max: self.max_buy, + buy: btc, + }); + return; + } + + let rate = match self.latest_rate.latest_rate() { + Ok(rate) => rate, + Err(e) => { + tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with fetching the latest rate: {:#}", peer, e); + self.send_error_response(peer, channel, spot_price::Error::Other); + return; + } + }; + let xmr = match rate.sell_quote(btc) { + Ok(xmr) => xmr, + Err(e) => { + tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with calculating the amount from rate: {:#}", peer, e); + self.send_error_response(peer, channel, spot_price::Error::Other); + return; + } + }; + + let xmr_balance = self.balance; + let xmr_lock_fees = self.lock_fee; + + if xmr_balance < xmr + xmr_lock_fees { + tracing::error!(%peer, "Ignoring spot price request from {} because the XMR balance is too low to fulfill the swap: {}", peer, xmr_balance); + self.send_error_response(peer, channel, spot_price::Error::BalanceTooLow { buy: btc }); + return; + } + + if self + .behaviour + .send_response(channel, spot_price::Response::Xmr(xmr)) + .is_err() + { + tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc) + } + + self.events.push_back(OutEvent { peer, btc, xmr }); + } +} + +impl From for alice::OutEvent { + fn from(event: OutEvent) -> Self { + Self::ExecutionSetupStart { + peer: event.peer, + btc: event.btc, + xmr: event.xmr, + } + } +} diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index de8bd925..c70d979d 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -90,7 +90,8 @@ where env_config, alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), - ); + ) + .await; let bob_seed = Seed::random().unwrap(); let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None); @@ -213,7 +214,7 @@ pub async fn init_electrs_container( Ok(docker) } -fn start_alice( +async fn start_alice( seed: &Seed, db_path: PathBuf, listen_address: Multiaddr, @@ -223,7 +224,21 @@ fn start_alice( ) -> (AliceApplicationHandle, Receiver) { let db = Arc::new(Database::open(db_path.as_path()).unwrap()); - let mut swarm = swarm::alice(&seed).unwrap(); + let current_balance = monero_wallet.get_balance().await.unwrap(); + let lock_fee = monero_wallet.static_tx_fee_estimate(); + let max_buy = bitcoin::Amount::from_sat(u64::MAX); + let latest_rate = FixedRate::default(); + let resume_only = false; + + let mut swarm = swarm::alice( + &seed, + current_balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + ) + .unwrap(); swarm.listen_on(listen_address).unwrap(); let (event_loop, swap_handle) = alice::EventLoop::new( @@ -234,7 +249,6 @@ fn start_alice( db, FixedRate::default(), bitcoin::Amount::ONE_BTC, - false, ) .unwrap(); @@ -497,7 +511,8 @@ impl TestContext { self.env_config, self.alice_bitcoin_wallet.clone(), self.alice_monero_wallet.clone(), - ); + ) + .await; self.alice_handle = alice_handle; self.alice_swap_handle = alice_swap_handle; From 5aac76598d1a44765c44c91ce601536a29e334cf Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Wed, 5 May 2021 13:04:55 +1000 Subject: [PATCH 4/6] Decouple ASB/CLI Errors from Error sent over wire What goes over the wire should not be coupled to the errors being printed. For the CLI and ASB we introduce a separate error enum that is used for logging. When sending over the wire the errors are mapped to and from the `network::spot_price::Error`. As part of Bob-specific spot_price code was moved from the network into bob. Clearly separation of the network API from bob/alice. --- swap/src/network/spot_price.rs | 77 +++++++++++++------------- swap/src/network/swarm.rs | 3 +- swap/src/protocol/alice/behaviour.rs | 8 ++- swap/src/protocol/alice/event_loop.rs | 8 ++- swap/src/protocol/alice/spot_price.rs | 80 ++++++++++++++++++++++----- swap/src/protocol/bob.rs | 1 + swap/src/protocol/bob/behaviour.rs | 3 +- swap/src/protocol/bob/event_loop.rs | 2 + swap/src/protocol/bob/spot_price.rs | 69 +++++++++++++++++++++++ 9 files changed, 190 insertions(+), 61 deletions(-) create mode 100644 swap/src/protocol/bob/spot_price.rs diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 04aaaf67..e0c5c29b 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,17 +1,12 @@ use crate::monero; use crate::network::cbor_request_response::CborCodec; -use crate::protocol::bob; use libp2p::core::ProtocolName; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, -}; -use libp2p::PeerId; +use libp2p::request_response::{RequestResponse, RequestResponseEvent, RequestResponseMessage}; use serde::{Deserialize, Serialize}; -const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0"; +pub const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0"; pub type OutEvent = RequestResponseEvent; -type Message = RequestResponseMessage; +pub type Message = RequestResponseMessage; pub type Behaviour = RequestResponse>; @@ -45,55 +40,57 @@ pub enum Response { Error(Error), } -#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum Error { - #[error( - "This seller currently does not accept incoming swap requests, please try again later" - )] NoSwapsAccepted, - #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] MaxBuyAmountExceeded { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] max: bitcoin::Amount, #[serde(with = "::bitcoin::util::amount::serde::as_sat")] buy: bitcoin::Amount, }, - #[error("This seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")] BalanceTooLow { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] buy: bitcoin::Amount, }, - /// To be used for errors that cannot be explained on the CLI side (e.g. /// rate update problems on the seller side) - #[error("The seller encountered a problem, please try again later.")] Other, } -/// Constructs a new instance of the `spot-price` behaviour to be used by Bob. -/// -/// Bob only supports outbound connections, i.e. requesting a spot price for a -/// given amount of BTC in XMR. -pub fn bob() -> Behaviour { - Behaviour::new( - CborCodec::default(), - vec![(SpotPriceProtocol, ProtocolSupport::Outbound)], - RequestResponseConfig::default(), - ) -} +#[cfg(test)] +mod tests { + use super::*; + use crate::monero; -impl From<(PeerId, Message)> for bob::OutEvent { - fn from((peer, message): (PeerId, Message)) -> Self { - match message { - Message::Request { .. } => Self::unexpected_request(peer), - Message::Response { - response, - request_id, - } => Self::SpotPriceReceived { - id: request_id, - response, - }, - } + #[test] + fn snapshot_test_serialize() { + let amount = monero::Amount::from_piconero(100_000u64); + let xmr = r#"{"Xmr":100000}"#.to_string(); + let serialized = serde_json::to_string(&Response::Xmr(amount)).unwrap(); + assert_eq!(xmr, serialized); + + let error = r#"{"Error":"NoSwapsAccepted"}"#.to_string(); + let serialized = serde_json::to_string(&Response::Error(Error::NoSwapsAccepted)).unwrap(); + assert_eq!(error, serialized); + + let error = r#"{"Error":{"MaxBuyAmountExceeded":{"max":0,"buy":0}}}"#.to_string(); + let serialized = serde_json::to_string(&Response::Error(Error::MaxBuyAmountExceeded { + max: Default::default(), + buy: Default::default(), + })) + .unwrap(); + assert_eq!(error, serialized); + + let error = r#"{"Error":{"BalanceTooLow":{"buy":0}}}"#.to_string(); + let serialized = serde_json::to_string(&Response::Error(Error::BalanceTooLow { + buy: Default::default(), + })) + .unwrap(); + assert_eq!(error, serialized); + + let error = r#"{"Error":"Other"}"#.to_string(); + let serialized = serde_json::to_string(&Response::Error(Error::Other)).unwrap(); + assert_eq!(error, serialized); } } -crate::impl_from_rr_event!(OutEvent, bob::OutEvent, PROTOCOL); diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 75ea57af..818a1210 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -6,6 +6,7 @@ use crate::{monero, tor}; use anyhow::Result; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::{PeerId, Swarm}; +use std::fmt::Debug; pub fn alice( seed: &Seed, @@ -16,7 +17,7 @@ pub fn alice( resume_only: bool, ) -> Result>> where - LR: LatestRate + Send + 'static, + LR: LatestRate + Send + 'static + Debug, { with_clear_net( seed, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index b12db8c9..edabe71b 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -6,6 +6,7 @@ use crate::protocol::alice::{execution_setup, spot_price, State3}; use anyhow::{anyhow, Error}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; +use std::fmt::Debug; use uuid::Uuid; #[derive(Debug)] @@ -62,7 +63,10 @@ impl OutEvent { #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] -pub struct Behaviour { +pub struct Behaviour +where + LR: LatestRate + Send + 'static + Debug, +{ pub quote: quote::Behaviour, pub spot_price: spot_price::Behaviour, pub execution_setup: execution_setup::Behaviour, @@ -72,7 +76,7 @@ pub struct Behaviour { impl Behaviour where - LR: LatestRate + Send + 'static, + LR: LatestRate + Send + 'static + Debug, { pub fn new( balance: monero::Amount, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 049c7af5..e5a49fe7 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -16,6 +16,7 @@ use rand::rngs::OsRng; use rust_decimal::Decimal; use std::collections::HashMap; use std::convert::Infallible; +use std::fmt::Debug; use std::sync::Arc; use tokio::sync::mpsc; use uuid::Uuid; @@ -31,7 +32,10 @@ type OutgoingTransferProof = BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>; #[allow(missing_debug_implementations)] -pub struct EventLoop { +pub struct EventLoop +where + LR: LatestRate + Send + 'static + Debug, +{ swarm: libp2p::Swarm>, env_config: Config, bitcoin_wallet: Arc, @@ -59,7 +63,7 @@ pub struct EventLoop { impl EventLoop where - LR: LatestRate + Send + 'static, + LR: LatestRate + Send + 'static + Debug, { #[allow(clippy::too_many_arguments)] pub fn new( diff --git a/swap/src/protocol/alice/spot_price.rs b/swap/src/protocol/alice/spot_price.rs index 6bf64e6f..e7aeec86 100644 --- a/swap/src/protocol/alice/spot_price.rs +++ b/swap/src/protocol/alice/spot_price.rs @@ -11,6 +11,7 @@ use libp2p::request_response::{ use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use libp2p::{NetworkBehaviour, PeerId}; use std::collections::VecDeque; +use std::fmt::Debug; use std::task::{Context, Poll}; pub struct OutEvent { @@ -22,7 +23,10 @@ pub struct OutEvent { #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)] #[allow(missing_debug_implementations)] -pub struct Behaviour { +pub struct Behaviour +where + LR: LatestRate + Send + 'static + Debug, +{ behaviour: spot_price::Behaviour, #[behaviour(ignore)] @@ -46,7 +50,7 @@ pub struct Behaviour { /// bubbled up to the parent behaviour. impl Behaviour where - LR: LatestRate + Send + 'static, + LR: LatestRate + Send + 'static + Debug, { pub fn new( balance: monero::Amount, @@ -78,11 +82,22 @@ where &mut self, peer: PeerId, channel: ResponseChannel, - error: spot_price::Error, + error: Error, ) { + match error { + Error::ResumeOnlyMode | Error::MaxBuyAmountExceeded { .. } => { + tracing::warn!(%peer, "Ignoring spot price request because: {}", error); + } + Error::BalanceTooLow { .. } + | Error::LatestRateFetchFailed(_) + | Error::SellQuoteCalculationFailed(_) => { + tracing::error!(%peer, "Ignoring spot price request because: {}", error); + } + } + if self .behaviour - .send_response(channel, spot_price::Response::Error(error)) + .send_response(channel, spot_price::Response::Error(error.into())) .is_err() { tracing::debug!(%peer, "Unable to send error response for spot price request"); @@ -105,7 +120,7 @@ where impl NetworkBehaviourEventProcess for Behaviour where - LR: LatestRate + Send + 'static, + LR: LatestRate + Send + 'static + Debug, { fn inject_event(&mut self, event: spot_price::OutEvent) { let (peer, message) = match event { @@ -135,15 +150,13 @@ where }; if self.resume_only { - tracing::warn!(%peer, "Ignoring spot price request from {} because ASB is running in resume-only mode", peer); - self.send_error_response(peer, channel, spot_price::Error::NoSwapsAccepted); + self.send_error_response(peer, channel, Error::ResumeOnlyMode); return; } let btc = request.btc; if btc > self.max_buy { - tracing::warn!(%peer, "Ignoring spot price request from {} because max muy amount exceeded", peer); - self.send_error_response(peer, channel, spot_price::Error::MaxBuyAmountExceeded { + self.send_error_response(peer, channel, Error::MaxBuyAmountExceeded { max: self.max_buy, buy: btc, }); @@ -153,16 +166,14 @@ where let rate = match self.latest_rate.latest_rate() { Ok(rate) => rate, Err(e) => { - tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with fetching the latest rate: {:#}", peer, e); - self.send_error_response(peer, channel, spot_price::Error::Other); + self.send_error_response(peer, channel, Error::LatestRateFetchFailed(e)); return; } }; let xmr = match rate.sell_quote(btc) { Ok(xmr) => xmr, Err(e) => { - tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with calculating the amount from rate: {:#}", peer, e); - self.send_error_response(peer, channel, spot_price::Error::Other); + self.send_error_response(peer, channel, Error::SellQuoteCalculationFailed(e)); return; } }; @@ -171,8 +182,7 @@ where let xmr_lock_fees = self.lock_fee; if xmr_balance < xmr + xmr_lock_fees { - tracing::error!(%peer, "Ignoring spot price request from {} because the XMR balance is too low to fulfill the swap: {}", peer, xmr_balance); - self.send_error_response(peer, channel, spot_price::Error::BalanceTooLow { buy: btc }); + self.send_error_response(peer, channel, Error::BalanceTooLow { buy: btc }); return; } @@ -197,3 +207,43 @@ impl From for alice::OutEvent { } } } + +impl From> for spot_price::Error +where + LR: LatestRate + Debug, +{ + fn from(error: Error) -> Self { + match error { + Error::ResumeOnlyMode => spot_price::Error::NoSwapsAccepted, + Error::MaxBuyAmountExceeded { max, buy } => { + spot_price::Error::MaxBuyAmountExceeded { max, buy } + } + Error::BalanceTooLow { buy } => spot_price::Error::BalanceTooLow { buy }, + Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => { + spot_price::Error::Other + } + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error +where + LR: LatestRate + Debug, +{ + #[error("ASB is running in resume-only mode")] + ResumeOnlyMode, + #[error("Maximum buy {max} exceeded {buy}")] + MaxBuyAmountExceeded { + max: bitcoin::Amount, + buy: bitcoin::Amount, + }, + #[error("This seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")] + BalanceTooLow { buy: bitcoin::Amount }, + + #[error("Failed to fetch latest rate")] + LatestRateFetchFailed(LR::Error), + + #[error("Failed to calculate quote: {0}")] + SellQuoteCalculationFailed(anyhow::Error), +} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index fbcc7dd1..292c2f40 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -16,6 +16,7 @@ pub mod cancel; pub mod event_loop; mod execution_setup; pub mod refund; +mod spot_price; pub mod state; pub mod swap; diff --git a/swap/src/protocol/bob/behaviour.rs b/swap/src/protocol/bob/behaviour.rs index 91a7c47b..6eab91fa 100644 --- a/swap/src/protocol/bob/behaviour.rs +++ b/swap/src/protocol/bob/behaviour.rs @@ -1,5 +1,6 @@ use crate::network::quote::BidQuote; use crate::network::{encrypted_signature, quote, redial, spot_price, transfer_proof}; +use crate::protocol::bob; use crate::protocol::bob::{execution_setup, State2}; use anyhow::{anyhow, Error, Result}; use libp2p::core::Multiaddr; @@ -71,7 +72,7 @@ impl Behaviour { pub fn new(alice: PeerId) -> Self { Self { quote: quote::bob(), - spot_price: spot_price::bob(), + spot_price: bob::spot_price::bob(), execution_setup: Default::default(), transfer_proof: transfer_proof::bob(), encrypted_signature: encrypted_signature::bob(), diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 2045a94f..e32fa7e0 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -2,6 +2,7 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; use crate::network::spot_price::Response; use crate::network::{encrypted_signature, spot_price}; +use crate::protocol::bob; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; @@ -270,6 +271,7 @@ impl EventLoopHandle { match response { Response::Xmr(xmr) => Ok(xmr), Response::Error(error) => { + let error: bob::spot_price::Error = error.into(); bail!(error); } } diff --git a/swap/src/protocol/bob/spot_price.rs b/swap/src/protocol/bob/spot_price.rs new file mode 100644 index 00000000..b16f06f1 --- /dev/null +++ b/swap/src/protocol/bob/spot_price.rs @@ -0,0 +1,69 @@ +use crate::network::cbor_request_response::CborCodec; +use crate::network::spot_price; +use crate::network::spot_price::SpotPriceProtocol; +use crate::protocol::bob::OutEvent; +use libp2p::request_response::{ProtocolSupport, RequestResponseConfig}; +use libp2p::PeerId; + +const PROTOCOL: &str = spot_price::PROTOCOL; +type SpotPriceOutEvent = spot_price::OutEvent; + +/// Constructs a new instance of the `spot-price` behaviour to be used by Bob. +/// +/// Bob only supports outbound connections, i.e. requesting a spot price for a +/// given amount of BTC in XMR. +pub fn bob() -> spot_price::Behaviour { + spot_price::Behaviour::new( + CborCodec::default(), + vec![(SpotPriceProtocol, ProtocolSupport::Outbound)], + RequestResponseConfig::default(), + ) +} + +impl From<(PeerId, spot_price::Message)> for OutEvent { + fn from((peer, message): (PeerId, spot_price::Message)) -> Self { + match message { + spot_price::Message::Request { .. } => Self::unexpected_request(peer), + spot_price::Message::Response { + response, + request_id, + } => Self::SpotPriceReceived { + id: request_id, + response, + }, + } + } +} + +crate::impl_from_rr_event!(SpotPriceOutEvent, OutEvent, PROTOCOL); + +#[derive(Clone, Debug, thiserror::Error)] +pub enum Error { + #[error("Seller currently does not accept incoming swap requests, please try again later")] + NoSwapsAccepted, + #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] + MaxBuyAmountExceeded { + max: bitcoin::Amount, + buy: bitcoin::Amount, + }, + #[error("Seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")] + BalanceTooLow { buy: bitcoin::Amount }, + + /// To be used for errors that cannot be explained on the CLI side (e.g. + /// rate update problems on the seller side) + #[error("Seller encountered a problem, please try again later.")] + Other, +} + +impl From for Error { + fn from(error: spot_price::Error) -> Self { + match error { + spot_price::Error::NoSwapsAccepted => Error::NoSwapsAccepted, + spot_price::Error::MaxBuyAmountExceeded { max, buy } => { + Error::MaxBuyAmountExceeded { max, buy } + } + spot_price::Error::BalanceTooLow { buy } => Error::BalanceTooLow { buy }, + spot_price::Error::Other => Error::Other, + } + } +} From 03a0dc73cd7c1b5509bdb211ed843a42723f04b2 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 6 May 2021 10:50:29 +1000 Subject: [PATCH 5/6] Alice's spot_price Behaviour reports back Error Instead of handling all errors on the inside spot_price errors are bubbled up (as `SwapRequestDeclined`). This allows us to test both Alice's and Bob's behaviour for all scenarios. --- swap/src/protocol/alice/behaviour.rs | 9 ++- swap/src/protocol/alice/event_loop.rs | 13 ++++ swap/src/protocol/alice/spot_price.rs | 105 +++++++++++++------------- 3 files changed, 70 insertions(+), 57 deletions(-) diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index edabe71b..08c87b45 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -6,11 +6,14 @@ use crate::protocol::alice::{execution_setup, spot_price, State3}; use anyhow::{anyhow, Error}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; -use std::fmt::Debug; use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { + SwapRequestDeclined { + peer: PeerId, + error: spot_price::Error, + }, ExecutionSetupStart { peer: PeerId, btc: bitcoin::Amount, @@ -65,7 +68,7 @@ impl OutEvent { #[allow(missing_debug_implementations)] pub struct Behaviour where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static, { pub quote: quote::Behaviour, pub spot_price: spot_price::Behaviour, @@ -76,7 +79,7 @@ where impl Behaviour where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static, { pub fn new( balance: monero::Amount, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index e5a49fe7..e6b21304 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,6 +3,7 @@ use crate::database::Database; use crate::env::Config; use crate::network::quote::BidQuote; use crate::network::transfer_proof; +use crate::protocol::alice::spot_price::Error; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{Context, Result}; @@ -203,6 +204,18 @@ where self.swarm.behaviour_mut().execution_setup.run(peer, state0); } + SwarmEvent::Behaviour(OutEvent::SwapRequestDeclined { peer, error }) => { + match error { + Error::ResumeOnlyMode | Error::MaxBuyAmountExceeded { .. } => { + tracing::warn!(%peer, "Ignoring spot price request because: {}", error); + } + Error::BalanceTooLow { .. } + | Error::LatestRateFetchFailed(_) + | Error::SellQuoteCalculationFailed(_) => { + tracing::error!(%peer, "Ignoring spot price request because: {}", error); + } + } + } SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { // TODO: Move the spot-price update into dedicated update stream to decouple it from quote requests let current_balance = self.monero_wallet.get_balance().await; diff --git a/swap/src/protocol/alice/spot_price.rs b/swap/src/protocol/alice/spot_price.rs index e7aeec86..d960de28 100644 --- a/swap/src/protocol/alice/spot_price.rs +++ b/swap/src/protocol/alice/spot_price.rs @@ -14,10 +14,16 @@ use std::collections::VecDeque; use std::fmt::Debug; use std::task::{Context, Poll}; -pub struct OutEvent { - peer: PeerId, - btc: bitcoin::Amount, - xmr: monero::Amount, +pub enum OutEvent { + ExecutionSetupParams { + peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, + }, + Error { + peer: PeerId, + error: Error, + }, } #[derive(NetworkBehaviour)] @@ -25,7 +31,7 @@ pub struct OutEvent { #[allow(missing_debug_implementations)] pub struct Behaviour where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static, { behaviour: spot_price::Behaviour, @@ -50,7 +56,7 @@ where /// bubbled up to the parent behaviour. impl Behaviour where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static, { pub fn new( balance: monero::Amount, @@ -78,30 +84,24 @@ where self.balance = balance; } - fn send_error_response( + fn decline( &mut self, peer: PeerId, channel: ResponseChannel, - error: Error, + error: Error, ) { - match error { - Error::ResumeOnlyMode | Error::MaxBuyAmountExceeded { .. } => { - tracing::warn!(%peer, "Ignoring spot price request because: {}", error); - } - Error::BalanceTooLow { .. } - | Error::LatestRateFetchFailed(_) - | Error::SellQuoteCalculationFailed(_) => { - tracing::error!(%peer, "Ignoring spot price request because: {}", error); - } - } - if self .behaviour - .send_response(channel, spot_price::Response::Error(error.into())) + .send_response( + channel, + spot_price::Response::Error(error.to_error_response()), + ) .is_err() { tracing::debug!(%peer, "Unable to send error response for spot price request"); } + + self.events.push_back(OutEvent::Error { peer, error }); } fn poll( @@ -120,7 +120,7 @@ where impl NetworkBehaviourEventProcess for Behaviour where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static, { fn inject_event(&mut self, event: spot_price::OutEvent) { let (peer, message) = match event { @@ -150,13 +150,13 @@ where }; if self.resume_only { - self.send_error_response(peer, channel, Error::ResumeOnlyMode); + self.decline(peer, channel, Error::ResumeOnlyMode); return; } let btc = request.btc; if btc > self.max_buy { - self.send_error_response(peer, channel, Error::MaxBuyAmountExceeded { + self.decline(peer, channel, Error::MaxBuyAmountExceeded { max: self.max_buy, buy: btc, }); @@ -166,14 +166,14 @@ where let rate = match self.latest_rate.latest_rate() { Ok(rate) => rate, Err(e) => { - self.send_error_response(peer, channel, Error::LatestRateFetchFailed(e)); + self.decline(peer, channel, Error::LatestRateFetchFailed(Box::new(e))); return; } }; let xmr = match rate.sell_quote(btc) { Ok(xmr) => xmr, Err(e) => { - self.send_error_response(peer, channel, Error::SellQuoteCalculationFailed(e)); + self.decline(peer, channel, Error::SellQuoteCalculationFailed(e)); return; } }; @@ -182,7 +182,7 @@ where let xmr_lock_fees = self.lock_fee; if xmr_balance < xmr + xmr_lock_fees { - self.send_error_response(peer, channel, Error::BalanceTooLow { buy: btc }); + self.decline(peer, channel, Error::BalanceTooLow { buy: btc }); return; } @@ -194,43 +194,24 @@ where tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc) } - self.events.push_back(OutEvent { peer, btc, xmr }); + self.events + .push_back(OutEvent::ExecutionSetupParams { peer, btc, xmr }); } } impl From for alice::OutEvent { fn from(event: OutEvent) -> Self { - Self::ExecutionSetupStart { - peer: event.peer, - btc: event.btc, - xmr: event.xmr, - } - } -} - -impl From> for spot_price::Error -where - LR: LatestRate + Debug, -{ - fn from(error: Error) -> Self { - match error { - Error::ResumeOnlyMode => spot_price::Error::NoSwapsAccepted, - Error::MaxBuyAmountExceeded { max, buy } => { - spot_price::Error::MaxBuyAmountExceeded { max, buy } - } - Error::BalanceTooLow { buy } => spot_price::Error::BalanceTooLow { buy }, - Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => { - spot_price::Error::Other + match event { + OutEvent::ExecutionSetupParams { peer, btc, xmr } => { + Self::ExecutionSetupStart { peer, btc, xmr } } + OutEvent::Error { peer, error } => Self::SwapRequestDeclined { peer, error }, } } } #[derive(Debug, thiserror::Error)] -pub enum Error -where - LR: LatestRate + Debug, -{ +pub enum Error { #[error("ASB is running in resume-only mode")] ResumeOnlyMode, #[error("Maximum buy {max} exceeded {buy}")] @@ -242,8 +223,24 @@ where BalanceTooLow { buy: bitcoin::Amount }, #[error("Failed to fetch latest rate")] - LatestRateFetchFailed(LR::Error), + LatestRateFetchFailed(#[source] Box), #[error("Failed to calculate quote: {0}")] - SellQuoteCalculationFailed(anyhow::Error), + SellQuoteCalculationFailed(#[source] anyhow::Error), +} + +impl Error { + pub fn to_error_response(&self) -> spot_price::Error { + match self { + Error::ResumeOnlyMode => spot_price::Error::NoSwapsAccepted, + Error::MaxBuyAmountExceeded { max, buy } => spot_price::Error::MaxBuyAmountExceeded { + max: *max, + buy: *buy, + }, + Error::BalanceTooLow { buy } => spot_price::Error::BalanceTooLow { buy: *buy }, + Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => { + spot_price::Error::Other + } + } + } } From 89b3d07eba29ceea3ae97bd2e67ce3070f2e8654 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 6 May 2021 16:11:07 +1000 Subject: [PATCH 6/6] Network protocol tests for spot_price behaviour Each test spawns swarm for Alice and Bob that only contains the spot_price behaviours and uses a memory transport. Tests cover happy path (i.e. expected price is returned) and error scenarios. Implementation of `TestRate` on `LatestRate` allows testing rate fetch error and quote calculation error behaviour. Thanks to @thomaseizinger for ramping up the test framework for comit-rs in the past! --- swap/src/network.rs | 3 + swap/src/network/test.rs | 162 +++++++++++ swap/src/protocol/alice/spot_price.rs | 384 ++++++++++++++++++++++++++ swap/src/protocol/bob.rs | 2 +- swap/src/protocol/bob/spot_price.rs | 4 +- 5 files changed, 552 insertions(+), 3 deletions(-) create mode 100644 swap/src/network/test.rs diff --git a/swap/src/network.rs b/swap/src/network.rs index 59b2b46f..fb4606fb 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -10,3 +10,6 @@ pub mod swarm; pub mod tor_transport; pub mod transfer_proof; pub mod transport; + +#[cfg(any(test, feature = "test"))] +pub mod test; diff --git a/swap/src/network/test.rs b/swap/src/network/test.rs new file mode 100644 index 00000000..ea4b1a8d --- /dev/null +++ b/swap/src/network/test.rs @@ -0,0 +1,162 @@ +use futures::future; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::memory::MemoryTransport; +use libp2p::core::upgrade::{SelectUpgrade, Version}; +use libp2p::core::{Executor, Multiaddr}; +use libp2p::mplex::MplexConfig; +use libp2p::noise::{self, NoiseConfig, X25519Spec}; +use libp2p::swarm::{ + IntoProtocolsHandler, NetworkBehaviour, ProtocolsHandler, SwarmBuilder, SwarmEvent, +}; +use libp2p::{identity, yamux, PeerId, Swarm, Transport}; +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; +use tokio::time; + +/// An adaptor struct for libp2p that spawns futures into the current +/// thread-local runtime. +struct GlobalSpawnTokioExecutor; + +impl Executor for GlobalSpawnTokioExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = tokio::spawn(future); + } +} + +#[allow(missing_debug_implementations)] +pub struct Actor { + pub swarm: Swarm, + pub addr: Multiaddr, + pub peer_id: PeerId, +} + +pub async fn new_connected_swarm_pair(behaviour_fn: F) -> (Actor, Actor) +where + B: NetworkBehaviour, + F: Fn(PeerId, identity::Keypair) -> B + Clone, + <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Clone, +::OutEvent: Debug{ + let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone()); + let mut alice = Actor { + swarm, + addr, + peer_id, + }; + + let (swarm, addr, peer_id) = new_swarm(behaviour_fn); + let mut bob = Actor { + swarm, + addr, + peer_id, + }; + + connect(&mut alice.swarm, &mut bob.swarm).await; + + (alice, bob) +} + +pub fn new_swarm B>( + behaviour_fn: F, +) -> (Swarm, Multiaddr, PeerId) +where + B: NetworkBehaviour, +{ + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(id_keys.public()); + + let dh_keys = noise::Keypair::::new() + .into_authentic(&id_keys) + .expect("failed to create dh_keys"); + let noise = NoiseConfig::xx(dh_keys).into_authenticated(); + + let transport = MemoryTransport::default() + .upgrade(Version::V1) + .authenticate(noise) + .multiplex(SelectUpgrade::new( + yamux::YamuxConfig::default(), + MplexConfig::new(), + )) + .timeout(Duration::from_secs(5)) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) + .boxed(); + + let mut swarm: Swarm = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id) + .executor(Box::new(GlobalSpawnTokioExecutor)) + .build(); + + let address_port = rand::random::(); + let addr = format!("/memory/{}", address_port) + .parse::() + .unwrap(); + + Swarm::listen_on(&mut swarm, addr.clone()).unwrap(); + + (swarm, addr, peer_id) +} + +pub async fn await_events_or_timeout( + alice_event: impl Future, + bob_event: impl Future, +) -> (A, B) { + time::timeout( + Duration::from_secs(10), + future::join(alice_event, bob_event), + ) + .await + .expect("network behaviours to emit an event within 10 seconds") +} + +/// Connects two swarms with each other. +/// +/// This assumes the transport that is in use can be used by Bob to connect to +/// the listen address that is emitted by Alice. In other words, they have to be +/// on the same network. The memory transport used by the above `new_swarm` +/// function fulfills this. +/// +/// We also assume that the swarms don't emit any behaviour events during the +/// connection phase. Any event emitted is considered a bug from this functions +/// PoV because they would be lost. +pub async fn connect(alice: &mut Swarm, bob: &mut Swarm) +where + BA: NetworkBehaviour, + BB: NetworkBehaviour, + ::OutEvent: Debug, + ::OutEvent: Debug, +{ + let mut alice_connected = false; + let mut bob_connected = false; + + while !alice_connected && !bob_connected { + let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await; + + match alice_event { + SwarmEvent::ConnectionEstablished { .. } => { + alice_connected = true; + } + SwarmEvent::NewListenAddr(addr) => { + bob.dial_addr(addr).unwrap(); + } + SwarmEvent::Behaviour(event) => { + panic!( + "alice unexpectedly emitted a behaviour event during connection: {:?}", + event + ); + } + _ => {} + } + match bob_event { + SwarmEvent::ConnectionEstablished { .. } => { + bob_connected = true; + } + SwarmEvent::Behaviour(event) => { + panic!( + "bob unexpectedly emitted a behaviour event during connection: {:?}", + event + ); + } + _ => {} + } + } +} diff --git a/swap/src/protocol/alice/spot_price.rs b/swap/src/protocol/alice/spot_price.rs index d960de28..032d9de5 100644 --- a/swap/src/protocol/alice/spot_price.rs +++ b/swap/src/protocol/alice/spot_price.rs @@ -14,6 +14,7 @@ use std::collections::VecDeque; use std::fmt::Debug; use std::task::{Context, Poll}; +#[derive(Debug)] pub enum OutEvent { ExecutionSetupParams { peer: PeerId, @@ -244,3 +245,386 @@ impl Error { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::asb::Rate; + use crate::monero; + use crate::network::test::{await_events_or_timeout, connect, new_swarm}; + use crate::protocol::{alice, bob}; + use anyhow::anyhow; + use libp2p::Swarm; + use rust_decimal::Decimal; + + impl Default for AliceBehaviourValues { + fn default() -> Self { + Self { + balance: monero::Amount::from_monero(1.0).unwrap(), + lock_fee: monero::Amount::ZERO, + max_buy: bitcoin::Amount::from_btc(0.01).unwrap(), + rate: TestRate::default(), // 0.01 + resume_only: false, + } + } + } + + #[tokio::test] + async fn given_alice_has_sufficient_balance_then_returns_price() { + let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + let expected_xmr = monero::Amount::from_monero(1.0).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_price((btc_to_swap, expected_xmr), expected_xmr) + .await; + } + + #[tokio::test] + async fn given_alice_has_insufficient_balance_then_returns_error() { + let mut test = SpotPriceTest::setup( + AliceBehaviourValues::default().with_balance(monero::Amount::ZERO), + ) + .await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, + bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, + ) + .await; + } + + #[tokio::test] + async fn given_alice_has_insufficient_balance_after_balance_update_then_returns_error() { + let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + let expected_xmr = monero::Amount::from_monero(1.0).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_price((btc_to_swap, expected_xmr), expected_xmr) + .await; + + test.alice_swarm + .behaviour_mut() + .update_balance(monero::Amount::ZERO); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, + bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, + ) + .await; + } + + #[tokio::test] + async fn given_alice_has_insufficient_balance_because_of_lock_fee_then_returns_error() { + let mut test = SpotPriceTest::setup( + AliceBehaviourValues::default().with_lock_fee(monero::Amount::from_piconero(1)), + ) + .await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, + bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, + ) + .await; + } + + #[tokio::test] + async fn given_max_buy_exceeded_then_returns_error() { + let max_buy = bitcoin::Amount::from_btc(0.001).unwrap(); + + let mut test = + SpotPriceTest::setup(AliceBehaviourValues::default().with_max_buy(max_buy)).await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::MaxBuyAmountExceeded { + buy: btc_to_swap, + max: max_buy, + }, + bob::spot_price::Error::MaxBuyAmountExceeded { + buy: btc_to_swap, + max: max_buy, + }, + ) + .await; + } + + #[tokio::test] + async fn given_alice_in_resume_only_mode_then_returns_error() { + let mut test = + SpotPriceTest::setup(AliceBehaviourValues::default().with_resume_only(true)).await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::ResumeOnlyMode, + bob::spot_price::Error::NoSwapsAccepted, + ) + .await; + } + + #[tokio::test] + async fn given_rate_fetch_problem_then_returns_error() { + let mut test = + SpotPriceTest::setup(AliceBehaviourValues::default().with_rate(TestRate::error_rate())) + .await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::LatestRateFetchFailed(Box::new(TestRateError {})), + bob::spot_price::Error::Other, + ) + .await; + } + + #[tokio::test] + async fn given_rate_calculation_problem_then_returns_error() { + let mut test = SpotPriceTest::setup( + AliceBehaviourValues::default().with_rate(TestRate::from_rate_and_spread(0.0, 0)), + ) + .await; + + let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); + + let request = spot_price::Request { btc: btc_to_swap }; + + test.send_request(request); + test.assert_error( + alice::spot_price::Error::SellQuoteCalculationFailed(anyhow!( + "Error text irrelevant, won't be checked here" + )), + bob::spot_price::Error::Other, + ) + .await; + } + + struct SpotPriceTest { + alice_swarm: Swarm>, + bob_swarm: Swarm, + + alice_peer_id: PeerId, + } + + impl SpotPriceTest { + pub async fn setup(values: AliceBehaviourValues) -> Self { + let (mut alice_swarm, _, alice_peer_id) = new_swarm(|_, _| { + Behaviour::new( + values.balance, + values.lock_fee, + values.max_buy, + values.rate.clone(), + values.resume_only, + ) + }); + let (mut bob_swarm, ..) = new_swarm(|_, _| bob::spot_price::bob()); + + connect(&mut alice_swarm, &mut bob_swarm).await; + + Self { + alice_swarm, + bob_swarm, + alice_peer_id, + } + } + + pub fn send_request(&mut self, spot_price_request: spot_price::Request) { + self.bob_swarm + .behaviour_mut() + .send_request(&self.alice_peer_id, spot_price_request); + } + + async fn assert_price( + &mut self, + alice_assert: (bitcoin::Amount, monero::Amount), + bob_assert: monero::Amount, + ) { + match await_events_or_timeout(self.alice_swarm.next(), self.bob_swarm.next()).await { + ( + alice::spot_price::OutEvent::ExecutionSetupParams { btc, xmr, .. }, + spot_price::OutEvent::Message { message, .. }, + ) => { + assert_eq!(alice_assert, (btc, xmr)); + + let response = match message { + RequestResponseMessage::Response { response, .. } => response, + _ => panic!("Unexpected message {:?} for Bob", message), + }; + + match response { + spot_price::Response::Xmr(xmr) => { + assert_eq!(bob_assert, xmr) + } + _ => panic!("Unexpected response {:?} for Bob", response), + } + } + (alice_event, bob_event) => panic!( + "Received unexpected event, alice emitted {:?} and bob emitted {:?}", + alice_event, bob_event + ), + } + } + + async fn assert_error( + &mut self, + alice_assert: alice::spot_price::Error, + bob_assert: bob::spot_price::Error, + ) { + match await_events_or_timeout(self.alice_swarm.next(), self.bob_swarm.next()).await { + ( + alice::spot_price::OutEvent::Error { error, .. }, + spot_price::OutEvent::Message { message, .. }, + ) => { + // TODO: Somehow make PartialEq work on Alice's spot_price::Error + match (alice_assert, error) { + ( + alice::spot_price::Error::BalanceTooLow { .. }, + alice::spot_price::Error::BalanceTooLow { .. }, + ) + | ( + alice::spot_price::Error::MaxBuyAmountExceeded { .. }, + alice::spot_price::Error::MaxBuyAmountExceeded { .. }, + ) + | ( + alice::spot_price::Error::LatestRateFetchFailed(_), + alice::spot_price::Error::LatestRateFetchFailed(_), + ) + | ( + alice::spot_price::Error::SellQuoteCalculationFailed(_), + alice::spot_price::Error::SellQuoteCalculationFailed(_), + ) + | ( + alice::spot_price::Error::ResumeOnlyMode, + alice::spot_price::Error::ResumeOnlyMode, + ) => {} + (alice_assert, error) => { + panic!("Expected: {:?} Actual: {:?}", alice_assert, error) + } + } + + let response = match message { + RequestResponseMessage::Response { response, .. } => response, + _ => panic!("Unexpected message {:?} for Bob", message), + }; + + match response { + spot_price::Response::Error(error) => { + assert_eq!(bob_assert, error.into()) + } + _ => panic!("Unexpected response {:?} for Bob", response), + } + } + (alice_event, bob_event) => panic!( + "Received unexpected event, alice emitted {:?} and bob emitted {:?}", + alice_event, bob_event + ), + } + } + } + + struct AliceBehaviourValues { + pub balance: monero::Amount, + pub lock_fee: monero::Amount, + pub max_buy: bitcoin::Amount, + pub rate: TestRate, // 0.01 + pub resume_only: bool, + } + + impl AliceBehaviourValues { + pub fn with_balance(mut self, balance: monero::Amount) -> AliceBehaviourValues { + self.balance = balance; + self + } + + pub fn with_lock_fee(mut self, lock_fee: monero::Amount) -> AliceBehaviourValues { + self.lock_fee = lock_fee; + self + } + + pub fn with_max_buy(mut self, max_buy: bitcoin::Amount) -> AliceBehaviourValues { + self.max_buy = max_buy; + self + } + + pub fn with_resume_only(mut self, resume_only: bool) -> AliceBehaviourValues { + self.resume_only = resume_only; + self + } + + pub fn with_rate(mut self, rate: TestRate) -> AliceBehaviourValues { + self.rate = rate; + self + } + } + + #[derive(Clone, Debug)] + pub enum TestRate { + Rate(Rate), + Err(TestRateError), + } + + impl TestRate { + pub const RATE: f64 = 0.01; + + pub fn from_rate_and_spread(rate: f64, spread: u64) -> Self { + let ask = bitcoin::Amount::from_btc(rate).expect("Static value should never fail"); + let spread = Decimal::from(spread); + Self::Rate(Rate::new(ask, spread)) + } + + pub fn error_rate() -> Self { + Self::Err(TestRateError {}) + } + } + + impl Default for TestRate { + fn default() -> Self { + TestRate::from_rate_and_spread(Self::RATE, 0) + } + } + + #[derive(Debug, Clone, thiserror::Error)] + #[error("Could not fetch rate")] + pub struct TestRateError {} + + impl LatestRate for TestRate { + type Error = TestRateError; + + fn latest_rate(&mut self) -> Result { + match self { + TestRate::Rate(rate) => Ok(*rate), + TestRate::Err(error) => Err(error.clone()), + } + } + } +} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 292c2f40..ae263f97 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -16,7 +16,7 @@ pub mod cancel; pub mod event_loop; mod execution_setup; pub mod refund; -mod spot_price; +pub mod spot_price; pub mod state; pub mod swap; diff --git a/swap/src/protocol/bob/spot_price.rs b/swap/src/protocol/bob/spot_price.rs index b16f06f1..e2a4cef7 100644 --- a/swap/src/protocol/bob/spot_price.rs +++ b/swap/src/protocol/bob/spot_price.rs @@ -6,7 +6,7 @@ use libp2p::request_response::{ProtocolSupport, RequestResponseConfig}; use libp2p::PeerId; const PROTOCOL: &str = spot_price::PROTOCOL; -type SpotPriceOutEvent = spot_price::OutEvent; +pub type SpotPriceOutEvent = spot_price::OutEvent; /// Constructs a new instance of the `spot-price` behaviour to be used by Bob. /// @@ -37,7 +37,7 @@ impl From<(PeerId, spot_price::Message)> for OutEvent { crate::impl_from_rr_event!(SpotPriceOutEvent, OutEvent, PROTOCOL); -#[derive(Clone, Debug, thiserror::Error)] +#[derive(Clone, Debug, thiserror::Error, PartialEq)] pub enum Error { #[error("Seller currently does not accept incoming swap requests, please try again later")] NoSwapsAccepted,