From 52f648e1de9c08c34d39523b99e5c6d94a934796 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Tue, 4 May 2021 14:57:50 +1000 Subject: [PATCH] Alice's spot price logic into dedicated behaviour Move Alice's spot price logic into a dedicated network behaviour that handles all the logic. The new behaviour encapsulates the complete state necessary for spot price request decision making. The network behaviour cannot handle asynchronous calls, thus the balance is managed inside the spot price and has to updated regularly from the outside to ensure the spot price balance check has up to date data. At the moment the balance is updated upon an incoming quote requests. Code that is relevant for both ASB and CLI remains in the `network::spot_price` module (e.g. `network::spot_price::Error`). --- swap/src/bin/asb.rs | 15 +- swap/src/network/spot_price.rs | 37 +---- swap/src/network/swarm.rs | 20 ++- swap/src/protocol/alice.rs | 1 + swap/src/protocol/alice/behaviour.rs | 37 +++-- swap/src/protocol/alice/event_loop.rs | 96 +++---------- swap/src/protocol/alice/spot_price.rs | 199 ++++++++++++++++++++++++++ swap/tests/harness/mod.rs | 25 +++- 8 files changed, 304 insertions(+), 126 deletions(-) create mode 100644 swap/src/protocol/alice/spot_price.rs diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 80416503..5d099055 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -119,7 +119,17 @@ async fn main() -> Result<()> { } }; - let mut swarm = swarm::alice(&seed)?; + let current_balance = monero_wallet.get_balance().await?; + let lock_fee = monero_wallet.static_tx_fee_estimate(); + let kraken_rate = KrakenRate::new(ask_spread, kraken_price_updates); + let mut swarm = swarm::alice( + &seed, + current_balance, + lock_fee, + max_buy, + kraken_rate.clone(), + resume_only, + )?; for listen in config.network.listen { Swarm::listen_on(&mut swarm, listen.clone()) @@ -132,9 +142,8 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), - KrakenRate::new(ask_spread, kraken_price_updates), + kraken_rate, max_buy, - resume_only, ) .unwrap(); diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 24671481..04aaaf67 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,6 +1,6 @@ use crate::monero; use crate::network::cbor_request_response::CborCodec; -use crate::protocol::{alice, bob}; +use crate::protocol::bob; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, @@ -10,7 +10,7 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0"; -type OutEvent = RequestResponseEvent; +pub type OutEvent = RequestResponseEvent; type Message = RequestResponseMessage; pub type Behaviour = RequestResponse>; @@ -50,7 +50,7 @@ pub enum Error { #[error( "This seller currently does not accept incoming swap requests, please try again later" )] - MaintenanceMode, + NoSwapsAccepted, #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] MaxBuyAmountExceeded { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] @@ -63,18 +63,11 @@ pub enum Error { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] buy: bitcoin::Amount, }, -} -/// Constructs a new instance of the `spot-price` behaviour to be used by Alice. -/// -/// Alice only supports inbound connections, i.e. providing spot prices for BTC -/// in XMR. -pub fn alice() -> Behaviour { - Behaviour::new( - CborCodec::default(), - vec![(SpotPriceProtocol, ProtocolSupport::Inbound)], - RequestResponseConfig::default(), - ) + /// To be used for errors that cannot be explained on the CLI side (e.g. + /// rate update problems on the seller side) + #[error("The seller encountered a problem, please try again later.")] + Other, } /// Constructs a new instance of the `spot-price` behaviour to be used by Bob. @@ -89,22 +82,6 @@ pub fn bob() -> Behaviour { ) } -impl From<(PeerId, Message)> for alice::OutEvent { - fn from((peer, message): (PeerId, Message)) -> Self { - match message { - Message::Request { - request, channel, .. - } => Self::SpotPriceRequested { - request, - channel, - peer, - }, - Message::Response { .. } => Self::unexpected_response(peer), - } - } -} -crate::impl_from_rr_event!(OutEvent, alice::OutEvent, PROTOCOL); - impl From<(PeerId, Message)> for bob::OutEvent { fn from((peer, message): (PeerId, Message)) -> Self { match message { diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index ebcce5fa..75ea57af 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,13 +1,27 @@ use crate::network::transport; +use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::{alice, bob}; use crate::seed::Seed; -use crate::tor; +use crate::{monero, tor}; use anyhow::Result; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::{PeerId, Swarm}; -pub fn alice(seed: &Seed) -> Result> { - with_clear_net(seed, alice::Behaviour::default()) +pub fn alice( + seed: &Seed, + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, +) -> Result>> +where + LR: LatestRate + Send + 'static, +{ + with_clear_net( + seed, + alice::Behaviour::new(balance, lock_fee, max_buy, latest_rate, resume_only), + ) } pub async fn bob( diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 5909a701..2410e289 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -14,6 +14,7 @@ pub use self::swap::{run, run_until}; mod behaviour; pub mod event_loop; mod execution_setup; +mod spot_price; pub mod state; pub mod swap; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index fb38403b..b12db8c9 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,6 +1,8 @@ +use crate::monero; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; -use crate::protocol::alice::{execution_setup, State3}; +use crate::network::{encrypted_signature, quote, transfer_proof}; +use crate::protocol::alice::event_loop::LatestRate; +use crate::protocol::alice::{execution_setup, spot_price, State3}; use anyhow::{anyhow, Error}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; @@ -8,10 +10,10 @@ use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { - SpotPriceRequested { - request: spot_price::Request, - channel: ResponseChannel, + ExecutionSetupStart { peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, }, QuoteRequested { channel: ResponseChannel, @@ -60,19 +62,34 @@ impl OutEvent { #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] -pub struct Behaviour { +pub struct Behaviour { pub quote: quote::Behaviour, - pub spot_price: spot_price::Behaviour, + pub spot_price: spot_price::Behaviour, pub execution_setup: execution_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour, } -impl Default for Behaviour { - fn default() -> Self { +impl Behaviour +where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { Self { quote: quote::alice(), - spot_price: spot_price::alice(), + spot_price: spot_price::Behaviour::new( + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + ), execution_setup: Default::default(), transfer_proof: transfer_proof::alice(), encrypted_signature: encrypted_signature::alice(), diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index bf2b5ba0..049c7af5 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,7 +2,7 @@ use crate::asb::Rate; use crate::database::Database; use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transfer_proof}; +use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{Context, Result}; @@ -31,17 +31,16 @@ type OutgoingTransferProof = BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>; #[allow(missing_debug_implementations)] -pub struct EventLoop { - swarm: libp2p::Swarm, +pub struct EventLoop { + swarm: libp2p::Swarm>, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - latest_rate: RS, + latest_rate: LR, max_buy: bitcoin::Amount, swap_sender: mpsc::Sender, - resume_only: bool, /// Stores incoming [`EncryptedSignature`]s per swap. recv_encrypted_signature: HashMap>, @@ -60,18 +59,17 @@ pub struct EventLoop { impl EventLoop where - LR: LatestRate, + LR: LatestRate + Send + 'static, { #[allow(clippy::too_many_arguments)] pub fn new( - swarm: Swarm, + swarm: Swarm>, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, latest_rate: LR, max_buy: bitcoin::Amount, - resume_only: bool, ) -> Result<(Self, mpsc::Receiver)> { let swap_channel = MpscChannels::default(); @@ -83,7 +81,6 @@ where db, latest_rate, swap_sender: swap_channel.sender, - resume_only, max_buy, recv_encrypted_signature: Default::default(), inflight_encrypted_signatures: Default::default(), @@ -146,38 +143,8 @@ where tokio::select! { swarm_event = self.swarm.next_event() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { - let btc = request.btc; - let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { - Ok(xmr) => match xmr { - Ok(xmr) => xmr, - Err(e) => { - tracing::warn!(%peer, "Ignoring spot price request from {} because: {:#}", peer, e); - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Error(e)) { - Ok(_) => { - continue; - }, - Err(_) => { - tracing::debug!(%peer, "Failed to respond with error to spot price request"); - continue; - } - } - } - }, - Err(e) => { - tracing::error!(%peer, "Unrecoverable error while producing spot price for {}: {:#}", btc, e); - continue; - } - }; + SwarmEvent::Behaviour(OutEvent::ExecutionSetupStart { peer, btc, xmr }) => { - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Xmr(xmr)) { - Ok(_) => {}, - Err(_) => { - // if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug - tracing::debug!(%peer, "Failed to respond with spot price"); - continue; - } - } let tx_redeem_fee = self.bitcoin_wallet .estimate_fee(bitcoin::TxRedeem::weight(), btc) .await; @@ -195,7 +162,7 @@ where (redeem_address, punish_address) } _ => { - tracing::error!("Could not get new address."); + tracing::error!(%peer, "Failed to get new address during execution setup."); continue; } }; @@ -208,7 +175,7 @@ where (tx_redeem_fee, tx_punish_fee) } _ => { - tracing::error!("Could not calculate transaction fees."); + tracing::error!(%peer, "Failed to calculate transaction fees during execution setup."); continue; } }; @@ -233,6 +200,17 @@ where self.swarm.behaviour_mut().execution_setup.run(peer, state0); } SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { + // TODO: Move the spot-price update into dedicated update stream to decouple it from quote requests + let current_balance = self.monero_wallet.get_balance().await; + match current_balance { + Ok(balance) => { + self.swarm.behaviour_mut().spot_price.update_balance(balance); + } + Err(e) => { + tracing::error!("Failed to fetch Monero balance: {:#}", e); + } + } + let quote = match self.make_quote(self.max_buy).await { Ok(quote) => quote, Err(e) => { @@ -360,38 +338,6 @@ where } } - async fn handle_spot_price_request( - &mut self, - btc: bitcoin::Amount, - monero_wallet: Arc, - ) -> Result> { - if self.resume_only { - return Ok(Err(spot_price::Error::MaintenanceMode)); - } - - let rate = self - .latest_rate - .latest_rate() - .context("Failed to get latest rate")?; - - if btc > self.max_buy { - return Ok(Err(spot_price::Error::MaxBuyAmountExceeded { - buy: btc, - max: self.max_buy, - })); - } - - let xmr_balance = monero_wallet.get_balance().await?; - let xmr_lock_fees = monero_wallet.static_tx_fee_estimate(); - let xmr = rate.sell_quote(btc)?; - - if xmr_balance < xmr + xmr_lock_fees { - return Ok(Err(spot_price::Error::BalanceTooLow { buy: btc })); - } - - Ok(Ok(xmr)) - } - async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result { let rate = self .latest_rate @@ -510,7 +456,7 @@ impl LatestRate for FixedRate { /// Produces [`Rate`]s based on [`PriceUpdate`]s from kraken and a configured /// spread. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct KrakenRate { ask_spread: Decimal, price_updates: kraken::PriceUpdates, diff --git a/swap/src/protocol/alice/spot_price.rs b/swap/src/protocol/alice/spot_price.rs new file mode 100644 index 00000000..6bf64e6f --- /dev/null +++ b/swap/src/protocol/alice/spot_price.rs @@ -0,0 +1,199 @@ +use crate::monero; +use crate::network::cbor_request_response::CborCodec; +use crate::network::spot_price; +use crate::network::spot_price::SpotPriceProtocol; +use crate::protocol::alice; +use crate::protocol::alice::event_loop::LatestRate; +use libp2p::request_response::{ + ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, + ResponseChannel, +}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; +use libp2p::{NetworkBehaviour, PeerId}; +use std::collections::VecDeque; +use std::task::{Context, Poll}; + +pub struct OutEvent { + peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + behaviour: spot_price::Behaviour, + + #[behaviour(ignore)] + events: VecDeque, + + #[behaviour(ignore)] + balance: monero::Amount, + #[behaviour(ignore)] + lock_fee: monero::Amount, + #[behaviour(ignore)] + max_buy: bitcoin::Amount, + #[behaviour(ignore)] + latest_rate: LR, + #[behaviour(ignore)] + resume_only: bool, +} + +/// Behaviour that handles spot prices. +/// All the logic how to react to a spot price request is contained here, events +/// reporting the successful handling of a spot price request or a failure are +/// bubbled up to the parent behaviour. +impl Behaviour +where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { + Self { + behaviour: spot_price::Behaviour::new( + CborCodec::default(), + vec![(SpotPriceProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ), + events: Default::default(), + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + } + } + + pub fn update_balance(&mut self, balance: monero::Amount) { + self.balance = balance; + } + + fn send_error_response( + &mut self, + peer: PeerId, + channel: ResponseChannel, + error: spot_price::Error, + ) { + if self + .behaviour + .send_response(channel, spot_price::Response::Error(error)) + .is_err() + { + tracing::debug!(%peer, "Unable to send error response for spot price request"); + } + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + // We trust in libp2p to poll us. + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess for Behaviour +where + LR: LatestRate + Send + 'static, +{ + fn inject_event(&mut self, event: spot_price::OutEvent) { + let (peer, message) = match event { + RequestResponseEvent::Message { peer, message } => (peer, message), + RequestResponseEvent::OutboundFailure { peer, error, .. } => { + tracing::error!(%peer, "Failure sending spot price response: {:#}", error); + return; + } + RequestResponseEvent::InboundFailure { peer, error, .. } => { + tracing::warn!(%peer, "Inbound failure when handling spot price request: {:#}", error); + return; + } + RequestResponseEvent::ResponseSent { peer, .. } => { + tracing::debug!(%peer, "Spot price response sent"); + return; + } + }; + + let (request, channel) = match message { + RequestResponseMessage::Request { + request, channel, .. + } => (request, channel), + RequestResponseMessage::Response { .. } => { + tracing::error!("Unexpected message"); + return; + } + }; + + if self.resume_only { + tracing::warn!(%peer, "Ignoring spot price request from {} because ASB is running in resume-only mode", peer); + self.send_error_response(peer, channel, spot_price::Error::NoSwapsAccepted); + return; + } + + let btc = request.btc; + if btc > self.max_buy { + tracing::warn!(%peer, "Ignoring spot price request from {} because max muy amount exceeded", peer); + self.send_error_response(peer, channel, spot_price::Error::MaxBuyAmountExceeded { + max: self.max_buy, + buy: btc, + }); + return; + } + + let rate = match self.latest_rate.latest_rate() { + Ok(rate) => rate, + Err(e) => { + tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with fetching the latest rate: {:#}", peer, e); + self.send_error_response(peer, channel, spot_price::Error::Other); + return; + } + }; + let xmr = match rate.sell_quote(btc) { + Ok(xmr) => xmr, + Err(e) => { + tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with calculating the amount from rate: {:#}", peer, e); + self.send_error_response(peer, channel, spot_price::Error::Other); + return; + } + }; + + let xmr_balance = self.balance; + let xmr_lock_fees = self.lock_fee; + + if xmr_balance < xmr + xmr_lock_fees { + tracing::error!(%peer, "Ignoring spot price request from {} because the XMR balance is too low to fulfill the swap: {}", peer, xmr_balance); + self.send_error_response(peer, channel, spot_price::Error::BalanceTooLow { buy: btc }); + return; + } + + if self + .behaviour + .send_response(channel, spot_price::Response::Xmr(xmr)) + .is_err() + { + tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc) + } + + self.events.push_back(OutEvent { peer, btc, xmr }); + } +} + +impl From for alice::OutEvent { + fn from(event: OutEvent) -> Self { + Self::ExecutionSetupStart { + peer: event.peer, + btc: event.btc, + xmr: event.xmr, + } + } +} diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index de8bd925..c70d979d 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -90,7 +90,8 @@ where env_config, alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), - ); + ) + .await; let bob_seed = Seed::random().unwrap(); let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None); @@ -213,7 +214,7 @@ pub async fn init_electrs_container( Ok(docker) } -fn start_alice( +async fn start_alice( seed: &Seed, db_path: PathBuf, listen_address: Multiaddr, @@ -223,7 +224,21 @@ fn start_alice( ) -> (AliceApplicationHandle, Receiver) { let db = Arc::new(Database::open(db_path.as_path()).unwrap()); - let mut swarm = swarm::alice(&seed).unwrap(); + let current_balance = monero_wallet.get_balance().await.unwrap(); + let lock_fee = monero_wallet.static_tx_fee_estimate(); + let max_buy = bitcoin::Amount::from_sat(u64::MAX); + let latest_rate = FixedRate::default(); + let resume_only = false; + + let mut swarm = swarm::alice( + &seed, + current_balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + ) + .unwrap(); swarm.listen_on(listen_address).unwrap(); let (event_loop, swap_handle) = alice::EventLoop::new( @@ -234,7 +249,6 @@ fn start_alice( db, FixedRate::default(), bitcoin::Amount::ONE_BTC, - false, ) .unwrap(); @@ -497,7 +511,8 @@ impl TestContext { self.env_config, self.alice_bitcoin_wallet.clone(), self.alice_monero_wallet.clone(), - ); + ) + .await; self.alice_handle = alice_handle; self.alice_swap_handle = alice_swap_handle;