diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 80416503..5d099055 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -119,7 +119,17 @@ async fn main() -> Result<()> { } }; - let mut swarm = swarm::alice(&seed)?; + let current_balance = monero_wallet.get_balance().await?; + let lock_fee = monero_wallet.static_tx_fee_estimate(); + let kraken_rate = KrakenRate::new(ask_spread, kraken_price_updates); + let mut swarm = swarm::alice( + &seed, + current_balance, + lock_fee, + max_buy, + kraken_rate.clone(), + resume_only, + )?; for listen in config.network.listen { Swarm::listen_on(&mut swarm, listen.clone()) @@ -132,9 +142,8 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), - KrakenRate::new(ask_spread, kraken_price_updates), + kraken_rate, max_buy, - resume_only, ) .unwrap(); diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 24671481..04aaaf67 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,6 +1,6 @@ use crate::monero; use crate::network::cbor_request_response::CborCodec; -use crate::protocol::{alice, bob}; +use crate::protocol::bob; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, @@ -10,7 +10,7 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0"; -type OutEvent = RequestResponseEvent; +pub type OutEvent = RequestResponseEvent; type Message = RequestResponseMessage; pub type Behaviour = RequestResponse>; @@ -50,7 +50,7 @@ pub enum Error { #[error( "This seller currently does not accept incoming swap requests, please try again later" )] - MaintenanceMode, + NoSwapsAccepted, #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] MaxBuyAmountExceeded { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] @@ -63,18 +63,11 @@ pub enum Error { #[serde(with = "::bitcoin::util::amount::serde::as_sat")] buy: bitcoin::Amount, }, -} -/// Constructs a new instance of the `spot-price` behaviour to be used by Alice. -/// -/// Alice only supports inbound connections, i.e. providing spot prices for BTC -/// in XMR. -pub fn alice() -> Behaviour { - Behaviour::new( - CborCodec::default(), - vec![(SpotPriceProtocol, ProtocolSupport::Inbound)], - RequestResponseConfig::default(), - ) + /// To be used for errors that cannot be explained on the CLI side (e.g. + /// rate update problems on the seller side) + #[error("The seller encountered a problem, please try again later.")] + Other, } /// Constructs a new instance of the `spot-price` behaviour to be used by Bob. @@ -89,22 +82,6 @@ pub fn bob() -> Behaviour { ) } -impl From<(PeerId, Message)> for alice::OutEvent { - fn from((peer, message): (PeerId, Message)) -> Self { - match message { - Message::Request { - request, channel, .. - } => Self::SpotPriceRequested { - request, - channel, - peer, - }, - Message::Response { .. } => Self::unexpected_response(peer), - } - } -} -crate::impl_from_rr_event!(OutEvent, alice::OutEvent, PROTOCOL); - impl From<(PeerId, Message)> for bob::OutEvent { fn from((peer, message): (PeerId, Message)) -> Self { match message { diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index ebcce5fa..75ea57af 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,13 +1,27 @@ use crate::network::transport; +use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::{alice, bob}; use crate::seed::Seed; -use crate::tor; +use crate::{monero, tor}; use anyhow::Result; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::{PeerId, Swarm}; -pub fn alice(seed: &Seed) -> Result> { - with_clear_net(seed, alice::Behaviour::default()) +pub fn alice( + seed: &Seed, + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, +) -> Result>> +where + LR: LatestRate + Send + 'static, +{ + with_clear_net( + seed, + alice::Behaviour::new(balance, lock_fee, max_buy, latest_rate, resume_only), + ) } pub async fn bob( diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 5909a701..2410e289 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -14,6 +14,7 @@ pub use self::swap::{run, run_until}; mod behaviour; pub mod event_loop; mod execution_setup; +mod spot_price; pub mod state; pub mod swap; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index fb38403b..b12db8c9 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,6 +1,8 @@ +use crate::monero; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; -use crate::protocol::alice::{execution_setup, State3}; +use crate::network::{encrypted_signature, quote, transfer_proof}; +use crate::protocol::alice::event_loop::LatestRate; +use crate::protocol::alice::{execution_setup, spot_price, State3}; use anyhow::{anyhow, Error}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; @@ -8,10 +10,10 @@ use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { - SpotPriceRequested { - request: spot_price::Request, - channel: ResponseChannel, + ExecutionSetupStart { peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, }, QuoteRequested { channel: ResponseChannel, @@ -60,19 +62,34 @@ impl OutEvent { #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] -pub struct Behaviour { +pub struct Behaviour { pub quote: quote::Behaviour, - pub spot_price: spot_price::Behaviour, + pub spot_price: spot_price::Behaviour, pub execution_setup: execution_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour, } -impl Default for Behaviour { - fn default() -> Self { +impl Behaviour +where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { Self { quote: quote::alice(), - spot_price: spot_price::alice(), + spot_price: spot_price::Behaviour::new( + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + ), execution_setup: Default::default(), transfer_proof: transfer_proof::alice(), encrypted_signature: encrypted_signature::alice(), diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index bf2b5ba0..049c7af5 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,7 +2,7 @@ use crate::asb::Rate; use crate::database::Database; use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transfer_proof}; +use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{Context, Result}; @@ -31,17 +31,16 @@ type OutgoingTransferProof = BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>; #[allow(missing_debug_implementations)] -pub struct EventLoop { - swarm: libp2p::Swarm, +pub struct EventLoop { + swarm: libp2p::Swarm>, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - latest_rate: RS, + latest_rate: LR, max_buy: bitcoin::Amount, swap_sender: mpsc::Sender, - resume_only: bool, /// Stores incoming [`EncryptedSignature`]s per swap. recv_encrypted_signature: HashMap>, @@ -60,18 +59,17 @@ pub struct EventLoop { impl EventLoop where - LR: LatestRate, + LR: LatestRate + Send + 'static, { #[allow(clippy::too_many_arguments)] pub fn new( - swarm: Swarm, + swarm: Swarm>, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, latest_rate: LR, max_buy: bitcoin::Amount, - resume_only: bool, ) -> Result<(Self, mpsc::Receiver)> { let swap_channel = MpscChannels::default(); @@ -83,7 +81,6 @@ where db, latest_rate, swap_sender: swap_channel.sender, - resume_only, max_buy, recv_encrypted_signature: Default::default(), inflight_encrypted_signatures: Default::default(), @@ -146,38 +143,8 @@ where tokio::select! { swarm_event = self.swarm.next_event() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { - let btc = request.btc; - let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { - Ok(xmr) => match xmr { - Ok(xmr) => xmr, - Err(e) => { - tracing::warn!(%peer, "Ignoring spot price request from {} because: {:#}", peer, e); - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Error(e)) { - Ok(_) => { - continue; - }, - Err(_) => { - tracing::debug!(%peer, "Failed to respond with error to spot price request"); - continue; - } - } - } - }, - Err(e) => { - tracing::error!(%peer, "Unrecoverable error while producing spot price for {}: {:#}", btc, e); - continue; - } - }; + SwarmEvent::Behaviour(OutEvent::ExecutionSetupStart { peer, btc, xmr }) => { - match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Xmr(xmr)) { - Ok(_) => {}, - Err(_) => { - // if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug - tracing::debug!(%peer, "Failed to respond with spot price"); - continue; - } - } let tx_redeem_fee = self.bitcoin_wallet .estimate_fee(bitcoin::TxRedeem::weight(), btc) .await; @@ -195,7 +162,7 @@ where (redeem_address, punish_address) } _ => { - tracing::error!("Could not get new address."); + tracing::error!(%peer, "Failed to get new address during execution setup."); continue; } }; @@ -208,7 +175,7 @@ where (tx_redeem_fee, tx_punish_fee) } _ => { - tracing::error!("Could not calculate transaction fees."); + tracing::error!(%peer, "Failed to calculate transaction fees during execution setup."); continue; } }; @@ -233,6 +200,17 @@ where self.swarm.behaviour_mut().execution_setup.run(peer, state0); } SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { + // TODO: Move the spot-price update into dedicated update stream to decouple it from quote requests + let current_balance = self.monero_wallet.get_balance().await; + match current_balance { + Ok(balance) => { + self.swarm.behaviour_mut().spot_price.update_balance(balance); + } + Err(e) => { + tracing::error!("Failed to fetch Monero balance: {:#}", e); + } + } + let quote = match self.make_quote(self.max_buy).await { Ok(quote) => quote, Err(e) => { @@ -360,38 +338,6 @@ where } } - async fn handle_spot_price_request( - &mut self, - btc: bitcoin::Amount, - monero_wallet: Arc, - ) -> Result> { - if self.resume_only { - return Ok(Err(spot_price::Error::MaintenanceMode)); - } - - let rate = self - .latest_rate - .latest_rate() - .context("Failed to get latest rate")?; - - if btc > self.max_buy { - return Ok(Err(spot_price::Error::MaxBuyAmountExceeded { - buy: btc, - max: self.max_buy, - })); - } - - let xmr_balance = monero_wallet.get_balance().await?; - let xmr_lock_fees = monero_wallet.static_tx_fee_estimate(); - let xmr = rate.sell_quote(btc)?; - - if xmr_balance < xmr + xmr_lock_fees { - return Ok(Err(spot_price::Error::BalanceTooLow { buy: btc })); - } - - Ok(Ok(xmr)) - } - async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result { let rate = self .latest_rate @@ -510,7 +456,7 @@ impl LatestRate for FixedRate { /// Produces [`Rate`]s based on [`PriceUpdate`]s from kraken and a configured /// spread. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct KrakenRate { ask_spread: Decimal, price_updates: kraken::PriceUpdates, diff --git a/swap/src/protocol/alice/spot_price.rs b/swap/src/protocol/alice/spot_price.rs new file mode 100644 index 00000000..6bf64e6f --- /dev/null +++ b/swap/src/protocol/alice/spot_price.rs @@ -0,0 +1,199 @@ +use crate::monero; +use crate::network::cbor_request_response::CborCodec; +use crate::network::spot_price; +use crate::network::spot_price::SpotPriceProtocol; +use crate::protocol::alice; +use crate::protocol::alice::event_loop::LatestRate; +use libp2p::request_response::{ + ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, + ResponseChannel, +}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; +use libp2p::{NetworkBehaviour, PeerId}; +use std::collections::VecDeque; +use std::task::{Context, Poll}; + +pub struct OutEvent { + peer: PeerId, + btc: bitcoin::Amount, + xmr: monero::Amount, +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + behaviour: spot_price::Behaviour, + + #[behaviour(ignore)] + events: VecDeque, + + #[behaviour(ignore)] + balance: monero::Amount, + #[behaviour(ignore)] + lock_fee: monero::Amount, + #[behaviour(ignore)] + max_buy: bitcoin::Amount, + #[behaviour(ignore)] + latest_rate: LR, + #[behaviour(ignore)] + resume_only: bool, +} + +/// Behaviour that handles spot prices. +/// All the logic how to react to a spot price request is contained here, events +/// reporting the successful handling of a spot price request or a failure are +/// bubbled up to the parent behaviour. +impl Behaviour +where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { + Self { + behaviour: spot_price::Behaviour::new( + CborCodec::default(), + vec![(SpotPriceProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ), + events: Default::default(), + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + } + } + + pub fn update_balance(&mut self, balance: monero::Amount) { + self.balance = balance; + } + + fn send_error_response( + &mut self, + peer: PeerId, + channel: ResponseChannel, + error: spot_price::Error, + ) { + if self + .behaviour + .send_response(channel, spot_price::Response::Error(error)) + .is_err() + { + tracing::debug!(%peer, "Unable to send error response for spot price request"); + } + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + // We trust in libp2p to poll us. + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess for Behaviour +where + LR: LatestRate + Send + 'static, +{ + fn inject_event(&mut self, event: spot_price::OutEvent) { + let (peer, message) = match event { + RequestResponseEvent::Message { peer, message } => (peer, message), + RequestResponseEvent::OutboundFailure { peer, error, .. } => { + tracing::error!(%peer, "Failure sending spot price response: {:#}", error); + return; + } + RequestResponseEvent::InboundFailure { peer, error, .. } => { + tracing::warn!(%peer, "Inbound failure when handling spot price request: {:#}", error); + return; + } + RequestResponseEvent::ResponseSent { peer, .. } => { + tracing::debug!(%peer, "Spot price response sent"); + return; + } + }; + + let (request, channel) = match message { + RequestResponseMessage::Request { + request, channel, .. + } => (request, channel), + RequestResponseMessage::Response { .. } => { + tracing::error!("Unexpected message"); + return; + } + }; + + if self.resume_only { + tracing::warn!(%peer, "Ignoring spot price request from {} because ASB is running in resume-only mode", peer); + self.send_error_response(peer, channel, spot_price::Error::NoSwapsAccepted); + return; + } + + let btc = request.btc; + if btc > self.max_buy { + tracing::warn!(%peer, "Ignoring spot price request from {} because max muy amount exceeded", peer); + self.send_error_response(peer, channel, spot_price::Error::MaxBuyAmountExceeded { + max: self.max_buy, + buy: btc, + }); + return; + } + + let rate = match self.latest_rate.latest_rate() { + Ok(rate) => rate, + Err(e) => { + tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with fetching the latest rate: {:#}", peer, e); + self.send_error_response(peer, channel, spot_price::Error::Other); + return; + } + }; + let xmr = match rate.sell_quote(btc) { + Ok(xmr) => xmr, + Err(e) => { + tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with calculating the amount from rate: {:#}", peer, e); + self.send_error_response(peer, channel, spot_price::Error::Other); + return; + } + }; + + let xmr_balance = self.balance; + let xmr_lock_fees = self.lock_fee; + + if xmr_balance < xmr + xmr_lock_fees { + tracing::error!(%peer, "Ignoring spot price request from {} because the XMR balance is too low to fulfill the swap: {}", peer, xmr_balance); + self.send_error_response(peer, channel, spot_price::Error::BalanceTooLow { buy: btc }); + return; + } + + if self + .behaviour + .send_response(channel, spot_price::Response::Xmr(xmr)) + .is_err() + { + tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc) + } + + self.events.push_back(OutEvent { peer, btc, xmr }); + } +} + +impl From for alice::OutEvent { + fn from(event: OutEvent) -> Self { + Self::ExecutionSetupStart { + peer: event.peer, + btc: event.btc, + xmr: event.xmr, + } + } +} diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index de8bd925..c70d979d 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -90,7 +90,8 @@ where env_config, alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), - ); + ) + .await; let bob_seed = Seed::random().unwrap(); let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None); @@ -213,7 +214,7 @@ pub async fn init_electrs_container( Ok(docker) } -fn start_alice( +async fn start_alice( seed: &Seed, db_path: PathBuf, listen_address: Multiaddr, @@ -223,7 +224,21 @@ fn start_alice( ) -> (AliceApplicationHandle, Receiver) { let db = Arc::new(Database::open(db_path.as_path()).unwrap()); - let mut swarm = swarm::alice(&seed).unwrap(); + let current_balance = monero_wallet.get_balance().await.unwrap(); + let lock_fee = monero_wallet.static_tx_fee_estimate(); + let max_buy = bitcoin::Amount::from_sat(u64::MAX); + let latest_rate = FixedRate::default(); + let resume_only = false; + + let mut swarm = swarm::alice( + &seed, + current_balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + ) + .unwrap(); swarm.listen_on(listen_address).unwrap(); let (event_loop, swap_handle) = alice::EventLoop::new( @@ -234,7 +249,6 @@ fn start_alice( db, FixedRate::default(), bitcoin::Amount::ONE_BTC, - false, ) .unwrap(); @@ -497,7 +511,8 @@ impl TestContext { self.env_config, self.alice_bitcoin_wallet.clone(), self.alice_monero_wallet.clone(), - ); + ) + .await; self.alice_handle = alice_handle; self.alice_swap_handle = alice_swap_handle;