diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index ef590dd1..63b4c459 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -144,13 +144,9 @@ async fn main() -> Result<()> { } }; - let current_balance = monero_wallet.get_balance().await?; - let lock_fee = monero_wallet.static_tx_fee_estimate(); let kraken_rate = KrakenRate::new(config.maker.ask_spread, kraken_price_updates); let mut swarm = swarm::asb( &seed, - current_balance, - lock_fee, config.maker.min_buy_btc, config.maker.max_buy_btc, kraken_rate.clone(), diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 37e9f219..6b0090ba 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,7 +1,7 @@ use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::{alice, bob}; use crate::seed::Seed; -use crate::{asb, cli, env, monero, tor}; +use crate::{asb, cli, env, tor}; use anyhow::Result; use libp2p::swarm::SwarmBuilder; use libp2p::{PeerId, Swarm}; @@ -10,8 +10,6 @@ use std::fmt::Debug; #[allow(clippy::too_many_arguments)] pub fn asb( seed: &Seed, - balance: monero::Amount, - lock_fee: monero::Amount, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, latest_rate: LR, @@ -19,17 +17,9 @@ pub fn asb( env_config: env::Config, ) -> Result>> where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static + Debug + Clone, { - let behaviour = alice::Behaviour::new( - balance, - lock_fee, - min_buy, - max_buy, - latest_rate, - resume_only, - env_config, - ); + let behaviour = alice::Behaviour::new(min_buy, max_buy, latest_rate, resume_only, env_config); let identity = seed.derive_libp2p_identity(); let transport = asb::transport::new(&identity)?; diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 2c7fdd13..d65ff6c8 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -21,7 +21,6 @@ pub use self::swap::{run, run_until}; mod behaviour; pub mod event_loop; -mod execution_setup; mod recovery; pub mod state; pub mod swap; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index b0507472..bde934d8 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -4,18 +4,17 @@ use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; use uuid::Uuid; +use crate::env; use crate::network::quote::BidQuote; use crate::network::{encrypted_signature, quote, transfer_proof}; use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::alice::swap_setup::WalletSnapshot; -use crate::protocol::alice::{execution_setup, swap_setup, State3}; -use crate::{env, monero}; -use tokio::sync::oneshot; +use crate::protocol::alice::{swap_setup, State3}; #[derive(Debug)] pub enum OutEvent { SwapSetupInitiated { - send_wallet_snapshot: oneshot::Sender, + send_wallet_snapshot: bmrng::RequestReceiver, }, SwapSetupCompleted { peer_id: PeerId, @@ -73,8 +72,7 @@ where LR: LatestRate + Send + 'static, { pub quote: quote::Behaviour, - pub spot_price: swap_setup::Behaviour, - pub execution_setup: execution_setup::Behaviour, + pub swap_setup: swap_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour, @@ -89,8 +87,6 @@ where LR: LatestRate + Send + 'static, { pub fn new( - balance: monero::Amount, - lock_fee: monero::Amount, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, latest_rate: LR, @@ -99,16 +95,13 @@ where ) -> Self { Self { quote: quote::alice(), - spot_price: swap_setup::Behaviour::new( - balance, - lock_fee, + swap_setup: swap_setup::Behaviour::new( min_buy, max_buy, env_config, latest_rate, resume_only, ), - execution_setup: Default::default(), transfer_proof: transfer_proof::alice(), encrypted_signature: encrypted_signature::alice(), ping: Ping::default(), diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 475fa54f..65e813a7 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -4,7 +4,7 @@ use crate::env::Config; use crate::network::quote::BidQuote; use crate::network::transfer_proof; use crate::protocol::alice::swap_setup::WalletSnapshot; -use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; +use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{Context, Result}; use futures::future; @@ -13,7 +13,6 @@ use futures::stream::{FuturesUnordered, StreamExt}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; -use rand::rngs::OsRng; use rust_decimal::Decimal; use std::collections::HashMap; use std::convert::Infallible; @@ -35,7 +34,7 @@ type OutgoingTransferProof = #[allow(missing_debug_implementations)] pub struct EventLoop where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static + Debug + Clone, { swarm: libp2p::Swarm>, env_config: Config, @@ -65,7 +64,7 @@ where impl EventLoop where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Send + 'static + Debug + Clone, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -151,21 +150,23 @@ where tokio::select! { swarm_event = self.swarm.next_event() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { send_wallet_snapshot }) => { + SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot }) => { - let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &self.monero_wallet).await { + let (btc, responder) = send_wallet_snapshot.recv().await.expect("TODO: handle error"); + + let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &self.monero_wallet, btc).await { Ok(wallet_snapshot) => wallet_snapshot, Err(error) => { - tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error) + tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error); + continue; } }; - match send_wallet_snapshot.send().await { - Ok() - } + // Ignore result, we should never hit this because the receiver will alive as long as the connection is. + let _ = responder.respond(wallet_snapshot); } SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3}) => { - let _ = self.handle_execution_setup_done(bob_peer_id, swap_id, *state3).await; + let _ = self.handle_execution_setup_done(peer_id, swap_id, *state3).await; } SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error }) => { tracing::warn!(%peer, "Ignoring spot price request because: {}", error); @@ -175,7 +176,8 @@ where let current_balance = self.monero_wallet.get_balance().await; match current_balance { Ok(balance) => { - self.swarm.behaviour_mut().spot_price.update_balance(balance); + + // FIXME self.swarm.behaviour_mut().spot_price.update_balance(balance); } Err(e) => { tracing::error!("Failed to fetch Monero balance: {:#}", e); diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs deleted file mode 100644 index 2be13327..00000000 --- a/swap/src/protocol/alice/execution_setup.rs +++ /dev/null @@ -1,76 +0,0 @@ -use crate::network::cbor_request_response::BUF_SIZE; -use crate::protocol::alice::{State0, State3}; -use crate::protocol::{alice, Message0, Message2, Message4}; -use anyhow::{Context, Error}; -use libp2p::PeerId; -use libp2p_async_await::BehaviourOutEvent; -use std::time::Duration; -use uuid::Uuid; - -#[derive(Debug)] -pub enum OutEvent { - Done { - bob_peer_id: PeerId, - swap_id: Uuid, - state3: State3, - }, - Failure { - peer: PeerId, - error: Error, - }, -} - -impl From> for OutEvent { - fn from(event: BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>) -> Self { - match event { - BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, (swap_id, state3)))) => OutEvent::Done { - bob_peer_id, - swap_id, - state3, - }, - BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e }, - BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"), - } - } -} - -#[derive(libp2p::NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -pub struct Behaviour { - inner: libp2p_async_await::Behaviour<(PeerId, (Uuid, State3)), (), anyhow::Error>, -} - -impl Default for Behaviour { - fn default() -> Self { - Self { - inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"), - } - } -} - -impl Behaviour { - pub fn run(&mut self, bob: PeerId, state0: State0) { - self.inner.do_protocol_listener(bob, move |mut substream| { - let protocol = async move { Ok((bob, (swap_id, state3))) }; - - async move { tokio::time::timeout(Duration::from_secs(60), protocol).await? } - }); - } -} - -impl From for alice::OutEvent { - fn from(event: OutEvent) -> Self { - match event { - OutEvent::Done { - bob_peer_id, - state3, - swap_id, - } => Self::SwapSetupCompleted { - peer_id: bob_peer_id, - state3: Box::new(state3), - swap_id, - }, - OutEvent::Failure { peer, error } => Self::Failure { peer, error }, - } - } -} diff --git a/swap/src/protocol/alice/swap_setup.rs b/swap/src/protocol/alice/swap_setup.rs index ba36ac80..3cd25955 100644 --- a/swap/src/protocol/alice/swap_setup.rs +++ b/swap/src/protocol/alice/swap_setup.rs @@ -1,39 +1,31 @@ -use anyhow::{Result, Context}; +use crate::protocol::alice::event_loop::LatestRate; +use crate::protocol::alice::{State0, State3}; +use crate::protocol::{alice, Message0, Message2, Message4}; +use crate::{bitcoin, env, monero}; +use anyhow::{Context as _, Result}; +use futures::future::{BoxFuture, OptionFuture}; +use futures::FutureExt; +use libp2p::core::connection::ConnectionId; +use libp2p::core::upgrade::{from_fn, FromFnUpgrade}; +use libp2p::core::{upgrade, Endpoint}; +use libp2p::swarm::{ + KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use libp2p::{Multiaddr, PeerId}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fmt::Debug; use std::future; use std::task::{Context, Poll}; - -use futures::future::{BoxFuture, OptionFuture}; -use libp2p::{Multiaddr, NetworkBehaviour, PeerId}; -use libp2p::core::connection::ConnectionId; -use libp2p::core::{Endpoint, upgrade}; -use libp2p::core::upgrade::from_fn; -use libp2p::core::upgrade::FromFnUpgrade; -use libp2p::request_response::{ - ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, - ResponseChannel, -}; -use libp2p::swarm::{IntoProtocolsHandler, KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol}; -use libp2p::swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; use uuid::Uuid; use void::Void; -use crate::{env, monero}; -use crate::network::cbor_request_response::CborCodec; -use crate::network::spot_price; -use crate::network::spot_price::{BlockchainNetwork, SpotPriceProtocol}; -use crate::protocol::{alice, bob, Message0, Message2, Message4}; -use crate::protocol::alice::event_loop::LatestRate; -use crate::protocol::alice::{State3, State0}; -use futures::FutureExt; -use tokio::sync::oneshot; -use serde::{Deserialize, Serialize}; - #[derive(Debug)] pub enum OutEvent { Initiated { - send_wallet_snapshot: oneshot::Sender + send_wallet_snapshot: bmrng::RequestReceiver, }, Completed { bob_peer_id: PeerId, @@ -43,40 +35,82 @@ pub enum OutEvent { Error, // TODO be more descriptive } +#[derive(Debug)] pub struct WalletSnapshot { balance: monero::Amount, lock_fee: monero::Amount, - // TODO: Consider using the same address for punish and redeem (they are mutually exclusive, so effectively the address will only be used once) + // TODO: Consider using the same address for punish and redeem (they are mutually exclusive, so + // effectively the address will only be used once) redeem_address: bitcoin::Address, punish_address: bitcoin::Address, redeem_fee: bitcoin::Amount, - refund_fee: bitcoin::Amount, + punish_fee: bitcoin::Amount, } impl WalletSnapshot { - pub async fn capture(bitcoin_wallet: &bitcoin::Wallet, monero_wallet: &monero::Wallet) -> Result { + pub async fn capture( + bitcoin_wallet: &bitcoin::Wallet, + monero_wallet: &monero::Wallet, + transfer_amount: bitcoin::Amount, + ) -> Result { + let balance = monero_wallet.get_balance().await?; + let redeem_address = bitcoin_wallet.new_address().await?; + let punish_address = bitcoin_wallet.new_address().await?; + let redeem_fee = bitcoin_wallet + .estimate_fee(bitcoin::TxRedeem::weight(), transfer_amount) + .await?; + let punish_fee = bitcoin_wallet + .estimate_fee(bitcoin::TxPunish::weight(), transfer_amount) + .await?; + Ok(Self { - balance: monero_wallet.get_balance().await?, + balance, lock_fee: monero::MONERO_FEE, - redeem_address: bitcoin_wallet.new_address().await?, - punish_address: bitcoin_wallet.new_address().await?, - redeem_fee: bitcoin_wallet - .estimate_fee(bitcoin::TxRedeem::weight(), btc) - .await, - refund_fee: bitcoin_wallet - .estimate_fee(bitcoin::TxPunish::weight(), btc) - .await + redeem_address, + punish_address, + redeem_fee, + punish_fee, }) } } +impl From for alice::OutEvent { + fn from(event: OutEvent) -> Self { + match event { + OutEvent::Initiated { + send_wallet_snapshot, + } => alice::OutEvent::SwapSetupInitiated { + send_wallet_snapshot, + }, + OutEvent::Completed { + bob_peer_id, + swap_id, + state3, + } => alice::OutEvent::SwapSetupCompleted { + peer_id: bob_peer_id, + swap_id, + state3: Box::new(state3), + }, + OutEvent::Error => alice::OutEvent::Failure { + peer: todo!(), + error: todo!(), + }, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] +pub struct BlockchainNetwork { + #[serde(with = "crate::bitcoin::network")] + pub bitcoin: bitcoin::Network, + #[serde(with = "crate::monero::network")] + pub monero: monero::Network, +} + #[allow(missing_debug_implementations)] -pub struct Behaviour -where - LR: LatestRate + Send + 'static, -{ +pub struct Behaviour { events: VecDeque, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, @@ -86,10 +120,7 @@ where resume_only: bool, } -impl Behaviour -where - LR: LatestRate + Send + 'static, -{ +impl Behaviour { pub fn new( min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, @@ -108,12 +139,21 @@ where } } -impl NetworkBehaviour for Behaviour { - type ProtocolsHandler = Handler; +impl NetworkBehaviour for Behaviour +where + LR: LatestRate + Send + 'static + Clone, +{ + type ProtocolsHandler = Handler; type OutEvent = OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - Handler::default() + Handler::new( + self.min_buy, + self.max_buy, + self.env_config, + self.latest_rate.clone(), + self.resume_only, + ) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -128,11 +168,15 @@ impl NetworkBehaviour for Behaviour { todo!() } - fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, event: _) { - todo!() + fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, event: HandlerOutEvent) { + todo!("bubble up events to network behaviour") } - fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll> { + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { todo!() } } @@ -144,6 +188,12 @@ pub struct SpotPriceRequest { pub blockchain_network: BlockchainNetwork, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum SpotPriceResponse { + Xmr(monero::Amount), + Error(SpotPriceError), +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum SpotPriceError { NoSwapsAccepted, @@ -175,8 +225,8 @@ pub enum SpotPriceError { // TODO: This is bob only. // enum OutboundState { // PendingOpen( -// // TODO: put data in here we pass in when we want to kick of swap setup, just bitcoin amount? -// ), +// // TODO: put data in here we pass in when we want to kick of swap +// setup, just bitcoin amount? ), // PendingNegotiate, // Executing(BoxFuture<'static, anyhow::Result<(Uuid, bob::State3)>>) // } @@ -184,25 +234,56 @@ pub enum SpotPriceError { // TODO: Don't just use anyhow::Error type InboundStream = BoxFuture<'static, anyhow::Result<(Uuid, alice::State3)>>; -struct Handler { +pub struct Handler { inbound_stream: OptionFuture, events: VecDeque, - resume_only: bool + + min_buy: bitcoin::Amount, + max_buy: bitcoin::Amount, + env_config: env::Config, + + latest_rate: LR, + resume_only: bool, } -enum HandlerOutEvent { - Initiated(oneshot::Sender), - Completed(anyhow::Result<(Uuid, alice::State3)>) +impl Handler { + fn new( + min_buy: bitcoin::Amount, + max_buy: bitcoin::Amount, + env_config: env::Config, + latest_rate: LR, + resume_only: bool, + ) -> Self { + Self { + inbound_stream: OptionFuture::from(None), + events: Default::default(), + min_buy, + max_buy, + env_config, + latest_rate, + resume_only, + } + } } +pub enum HandlerOutEvent { + Initiated(bmrng::RequestReceiver), + Completed(anyhow::Result<(Uuid, alice::State3)>), +} + +pub enum HandlerInEvent {} + pub const BUF_SIZE: usize = 1024 * 1024; -impl ProtocolsHandler for Handler { - type InEvent = (); +impl ProtocolsHandler for Handler +where + LR: LatestRate + Send + 'static, +{ + type InEvent = HandlerInEvent; type OutEvent = HandlerOutEvent; - type Error = (); + type Error = Void; type InboundProtocol = protocol::SwapSetup; - type OutboundProtocol = (); + type OutboundProtocol = upgrade::DeniedUpgrade; type InboundOpenInfo = (); type OutboundOpenInfo = (); @@ -210,115 +291,137 @@ impl ProtocolsHandler for Handler { SubstreamProtocol::new(protocol::new(), todo!("pass data down to handler")) } - fn inject_fully_negotiated_inbound(&mut self, mut protocol: NegotiatedSubstream, _: Self::InboundOpenInfo) { - let (sender, receiver) = oneshot::channel(); + fn inject_fully_negotiated_inbound( + &mut self, + mut substream: NegotiatedSubstream, + _: Self::InboundOpenInfo, + ) { + let (sender, receiver) = bmrng::channel_with_timeout::( + 1, + todo!("decide on timeout"), + ); let resume_only = self.resume_only; - - self.inbound_stream = OptionFuture::from(Some(async move { - let request = read_cbor_message::(&mut protocol).await?; - let wallet_snapshot = receiver.await?; // TODO Put a timeout on this + let min_buy = self.min_buy; + let max_buy = self.max_buy; + let latest_rate = self.latest_rate.latest_rate(); + let env_config = self.env_config; - async { - if resume_only { - return Err(Error::ResumeOnlyMode) + // TODO: Put a timeout on the whole future + self.inbound_stream = OptionFuture::from(Some( + async move { + let request = read_cbor_message::(&mut substream).await?; + let wallet_snapshot = sender.send_receive(request.btc).await?; + + // wrap all of these into another future so we can `return` from all the + // different blocks + let validate = async { + if resume_only { + return Err(Error::ResumeOnlyMode); + }; + + let blockchain_network = BlockchainNetwork { + bitcoin: env_config.bitcoin_network, + monero: env_config.monero_network, + }; + + if request.blockchain_network != blockchain_network { + return Err(Error::BlockchainNetworkMismatch { + cli: request.blockchain_network, + asb: blockchain_network, + }); + } + + let btc = request.btc; + + if btc < min_buy { + return Err(Error::AmountBelowMinimum { + min: min_buy, + buy: btc, + }); + } + + if btc > max_buy { + return Err(Error::AmountAboveMaximum { + max: max_buy, + buy: btc, + }); + } + + let rate = + latest_rate.map_err(|e| Error::LatestRateFetchFailed(Box::new(e)))?; + let xmr = rate + .sell_quote(btc) + .map_err(|e| Error::SellQuoteCalculationFailed(e))?; + + if wallet_snapshot.balance < xmr + wallet_snapshot.lock_fee { + return Err(Error::BalanceTooLow { + balance: wallet_snapshot.balance, + buy: btc, + }); + } + + Ok(xmr) }; + let xmr = match validate.await { + Ok(xmr) => { + write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)).await?; + xmr + } + Err(e) => { + write_cbor_message( + &mut substream, + SpotPriceResponse::Error(e.to_error_response()), + ) + .await?; + return Err(e.into()); + } + }; + + let state0 = State0::new( + request.btc, + xmr, + env_config, + wallet_snapshot.redeem_address, + wallet_snapshot.punish_address, + wallet_snapshot.redeem_fee, + wallet_snapshot.punish_fee, + &mut rand::thread_rng(), + )?; + + let message0 = read_cbor_message::(&mut substream) + .await + .context("Failed to deserialize message0")?; + let (swap_id, state1) = state0.receive(message0)?; + + write_cbor_message(&mut substream, state1.next_message()).await?; + + let message2 = read_cbor_message::(&mut substream) + .await + .context("Failed to deserialize message2")?; + let state2 = state1 + .receive(message2) + .context("Failed to receive Message2")?; + + write_cbor_message(&mut substream, state2.next_message()).await?; + + let message4 = read_cbor_message::(&mut substream) + .await + .context("Failed to deserialize message4")?; + let state3 = state2 + .receive(message4) + .context("Failed to receive Message4")?; + + Ok((swap_id, state3)) } + .boxed(), + )); - - let blockchain_network = BlockchainNetwork { - bitcoin: self.env_config.bitcoin_network, - monero: self.env_config.monero_network, - }; - - if request.blockchain_network != blockchain_network { - self.decline(peer, channel, Error::BlockchainNetworkMismatch { - cli: request.blockchain_network, - asb: blockchain_network, - }); - return; - } - - - - let btc = request.btc; - - if btc < self.min_buy { - self.decline(peer, channel, Error::AmountBelowMinimum { - min: self.min_buy, - buy: btc, - }); - return; - } - - if btc > self.max_buy { - self.decline(peer, channel, Error::AmountAboveMaximum { - max: self.max_buy, - buy: btc, - }); - return; - } - - let rate = match self.latest_rate.latest_rate() { - Ok(rate) => rate, - Err(e) => { - self.decline(peer, channel, Error::LatestRateFetchFailed(Box::new(e))); - return; - } - }; - let xmr = match rate.sell_quote(btc) { - Ok(xmr) => xmr, - Err(e) => { - self.decline(peer, channel, Error::SellQuoteCalculationFailed(e)); - return; - } - }; - - let xmr_balance = self.balance; - let xmr_lock_fees = self.lock_fee; - - if xmr_balance < xmr + xmr_lock_fees { - self.decline(peer, channel, Error::BalanceTooLow { - balance: xmr_balance, - buy: btc, - }); - return; - } - - if self - .behaviour - .send_response(channel, spot_price::Response::Xmr(xmr)) - .is_err() - { - tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc) - } - - let state0 = State0::new(spot_price_request.btc, todo!(), todo!(), todo!(), todo!(), todo!(), todo!(), todo!())?; - - let message0 = read_cbor_message::(&mut protocol).context("Failed to deserialize message0")?; - let (swap_id, state1) = state0.receive(message0)?; - - write_cbor_message(&mut protocol, state1.next_message()).await?; - - let message2 = read_cbor_message::(&mut protocol).context("Failed to deserialize message2")?; - let state2 = state1 - .receive(message2) - .context("Failed to receive Message2")?; - - write_cbor_message(&mut protocol, state2.next_message()).await?; - - let message4 = read_cbor_message::(&mut protocol).context("Failed to deserialize message4")?; - let state3 = state2 - .receive(message4) - .context("Failed to receive Message4")?; - - Ok((swap_id, state3)) - }.boxed())); - self.events.push_back(HandlerOutEvent::Initiated(sender)); + self.events.push_back(HandlerOutEvent::Initiated(receiver)); } - fn inject_fully_negotiated_outbound(&mut self, protocol: NegotiatedSubstream, info: Self::OutboundOpenInfo) { + fn inject_fully_negotiated_outbound(&mut self, protocol: Void, info: Self::OutboundOpenInfo) { unreachable!("we don't support outbound") } @@ -326,7 +429,11 @@ impl ProtocolsHandler for Handler { todo!() } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<_>) { + fn inject_dial_upgrade_error( + &mut self, + info: Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr, + ) { todo!() } @@ -334,31 +441,44 @@ impl ProtocolsHandler for Handler { todo!() } - fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { - let event = futures::ready!(self.inbound_stream.poll(cx)); - - Poll::Ready(ProtocolsHandlerEvent::Custom(HandlerOutEvent::Completed(event))) + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if let Some(result) = futures::ready!(self.inbound_stream.poll_unpin(cx)) { + return Poll::Ready(ProtocolsHandlerEvent::Custom(HandlerOutEvent::Completed( + result, + ))); + } + + Poll::Pending } } -async fn read_cbor_message(substream: &mut NegotiatedSubstream) -> Result where T: Deserialize { +async fn read_cbor_message(substream: &mut NegotiatedSubstream) -> Result +where + T: DeserializeOwned, +{ let bytes = upgrade::read_one(substream, BUF_SIZE).await?; let mut de = serde_cbor::Deserializer::from_slice(&bytes); - let message = T::deserialize(de)?; - + let message = T::deserialize(&mut de)?; + Ok(message) } -async fn write_cbor_message(substream: &mut NegotiatedSubstream, message: T) -> Result<()> where T: Serialize { +async fn write_cbor_message(substream: &mut NegotiatedSubstream, message: T) -> Result<()> +where + T: Serialize, +{ let bytes = serde_cbor::to_vec(&message)?; upgrade::write_one(substream, &bytes).await?; - - Ok(()) -} - -async fn write_error_message(substream: &mut NegotiatedSubstream, message: impl Into) -> Result<()> { - let bytes = serde_cbor::to_vec(&message.into())?; - upgrade::write_one(substream, &bytes).await?; Ok(()) } @@ -369,10 +489,12 @@ mod protocol { pub fn new() -> SwapSetup { from_fn( b"/comit/xmr/btc/swap_setup/1.0.0", - Box::new(|socket, endpoint| future::ready(match endpoint { - Endpoint::Listener => Ok(socket), - Endpoint::Dialer => todo!("return error") - })), + Box::new(|socket, endpoint| { + future::ready(match endpoint { + Endpoint::Listener => Ok(socket), + Endpoint::Dialer => todo!("return error"), + }) + }), ) } @@ -380,15 +502,13 @@ mod protocol { &'static [u8], Box< dyn Fn( - NegotiatedSubstream, - Endpoint, - ) - -> future::Ready> - + Send - + 'static, + NegotiatedSubstream, + Endpoint, + ) -> future::Ready> + + Send + + 'static, >, >; - } #[derive(Debug, thiserror::Error)] @@ -411,563 +531,37 @@ pub enum Error { buy: bitcoin::Amount, }, #[error("Failed to fetch latest rate")] - LatestRateFetchFailed(#[source] Box), + LatestRateFetchFailed(#[source] Box), #[error("Failed to calculate quote: {0}")] SellQuoteCalculationFailed(#[source] anyhow::Error), #[error("Blockchain networks did not match, we are on {asb:?}, but request from {cli:?}")] BlockchainNetworkMismatch { - cli: spot_price::BlockchainNetwork, - asb: spot_price::BlockchainNetwork, + cli: BlockchainNetwork, + asb: BlockchainNetwork, }, } impl Error { - pub fn to_error_response(&self) -> spot_price::Error { + pub fn to_error_response(&self) -> SpotPriceError { match self { - Error::ResumeOnlyMode => spot_price::Error::NoSwapsAccepted, - Error::AmountBelowMinimum { min, buy } => spot_price::Error::AmountBelowMinimum { + Error::ResumeOnlyMode => SpotPriceError::NoSwapsAccepted, + Error::AmountBelowMinimum { min, buy } => SpotPriceError::AmountBelowMinimum { min: *min, buy: *buy, }, - Error::AmountAboveMaximum { max, buy } => spot_price::Error::AmountAboveMaximum { + Error::AmountAboveMaximum { max, buy } => SpotPriceError::AmountAboveMaximum { max: *max, buy: *buy, }, - Error::BalanceTooLow { buy, .. } => spot_price::Error::BalanceTooLow { buy: *buy }, + Error::BalanceTooLow { buy, .. } => SpotPriceError::BalanceTooLow { buy: *buy }, Error::BlockchainNetworkMismatch { cli, asb } => { - spot_price::Error::BlockchainNetworkMismatch { + SpotPriceError::BlockchainNetworkMismatch { cli: *cli, asb: *asb, } } Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => { - spot_price::Error::Other - } - } - } -} - -#[cfg(test)] -mod tests { - use anyhow::anyhow; - use libp2p::Swarm; - use rust_decimal::Decimal; - - use crate::{monero, network}; - use crate::asb::Rate; - use crate::env::GetConfig; - use crate::network::test::{await_events_or_timeout, connect, new_swarm}; - use crate::protocol::{alice, bob}; - - use super::*; - - impl Default for AliceBehaviourValues { - fn default() -> Self { - Self { - balance: monero::Amount::from_monero(1.0).unwrap(), - lock_fee: monero::Amount::ZERO, - min_buy: bitcoin::Amount::from_btc(0.001).unwrap(), - max_buy: bitcoin::Amount::from_btc(0.01).unwrap(), - rate: TestRate::default(), // 0.01 - resume_only: false, - env_config: env::Testnet::get_config(), - } - } - } - - #[tokio::test] - async fn given_alice_has_sufficient_balance_then_returns_price() { - let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - let expected_xmr = monero::Amount::from_monero(1.0).unwrap(); - - test.construct_and_send_request(btc_to_swap); - test.assert_price((btc_to_swap, expected_xmr), expected_xmr) - .await; - } - - #[tokio::test] - async fn given_alice_has_insufficient_balance_then_returns_error() { - let mut test = SpotPriceTest::setup( - AliceBehaviourValues::default().with_balance(monero::Amount::ZERO), - ) - .await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::BalanceTooLow { - balance: monero::Amount::ZERO, - buy: btc_to_swap, - }, - bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, - ) - .await; - } - - #[tokio::test] - async fn given_alice_has_insufficient_balance_after_balance_update_then_returns_error() { - let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - let expected_xmr = monero::Amount::from_monero(1.0).unwrap(); - - test.construct_and_send_request(btc_to_swap); - test.assert_price((btc_to_swap, expected_xmr), expected_xmr) - .await; - - test.alice_swarm - .behaviour_mut() - .update_balance(monero::Amount::ZERO); - - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::BalanceTooLow { - balance: monero::Amount::ZERO, - buy: btc_to_swap, - }, - bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, - ) - .await; - } - - #[tokio::test] - async fn given_alice_has_insufficient_balance_because_of_lock_fee_then_returns_error() { - let balance = monero::Amount::from_monero(1.0).unwrap(); - - let mut test = SpotPriceTest::setup( - AliceBehaviourValues::default() - .with_balance(balance) - .with_lock_fee(monero::Amount::from_piconero(1)), - ) - .await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::BalanceTooLow { - balance, - buy: btc_to_swap, - }, - bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap }, - ) - .await; - } - - #[tokio::test] - async fn given_below_min_buy_then_returns_error() { - let min_buy = bitcoin::Amount::from_btc(0.001).unwrap(); - - let mut test = - SpotPriceTest::setup(AliceBehaviourValues::default().with_min_buy(min_buy)).await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.0001).unwrap(); - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::AmountBelowMinimum { - buy: btc_to_swap, - min: min_buy, - }, - bob::spot_price::Error::AmountBelowMinimum { - buy: btc_to_swap, - min: min_buy, - }, - ) - .await; - } - - #[tokio::test] - async fn given_above_max_buy_then_returns_error() { - let max_buy = bitcoin::Amount::from_btc(0.001).unwrap(); - - let mut test = - SpotPriceTest::setup(AliceBehaviourValues::default().with_max_buy(max_buy)).await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::AmountAboveMaximum { - buy: btc_to_swap, - max: max_buy, - }, - bob::spot_price::Error::AmountAboveMaximum { - buy: btc_to_swap, - max: max_buy, - }, - ) - .await; - } - - #[tokio::test] - async fn given_alice_in_resume_only_mode_then_returns_error() { - let mut test = - SpotPriceTest::setup(AliceBehaviourValues::default().with_resume_only(true)).await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::ResumeOnlyMode, - bob::spot_price::Error::NoSwapsAccepted, - ) - .await; - } - - #[tokio::test] - async fn given_rate_fetch_problem_then_returns_error() { - let mut test = - SpotPriceTest::setup(AliceBehaviourValues::default().with_rate(TestRate::error_rate())) - .await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::LatestRateFetchFailed(Box::new(TestRateError {})), - bob::spot_price::Error::Other, - ) - .await; - } - - #[tokio::test] - async fn given_rate_calculation_problem_then_returns_error() { - let mut test = SpotPriceTest::setup( - AliceBehaviourValues::default().with_rate(TestRate::from_rate_and_spread(0.0, 0)), - ) - .await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::SellQuoteCalculationFailed(anyhow!( - "Error text irrelevant, won't be checked here" - )), - bob::spot_price::Error::Other, - ) - .await; - } - - #[tokio::test] - async fn given_alice_mainnnet_bob_testnet_then_network_mismatch_error() { - let mut test = SpotPriceTest::setup( - AliceBehaviourValues::default().with_env_config(env::Mainnet::get_config()), - ) - .await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - test.construct_and_send_request(btc_to_swap); - test.assert_error( - alice::swap_setup::Error::BlockchainNetworkMismatch { - cli: BlockchainNetwork { - bitcoin: bitcoin::Network::Testnet, - monero: monero::Network::Stagenet, - }, - asb: BlockchainNetwork { - bitcoin: bitcoin::Network::Bitcoin, - monero: monero::Network::Mainnet, - }, - }, - bob::spot_price::Error::BlockchainNetworkMismatch { - cli: BlockchainNetwork { - bitcoin: bitcoin::Network::Testnet, - monero: monero::Network::Stagenet, - }, - asb: BlockchainNetwork { - bitcoin: bitcoin::Network::Bitcoin, - monero: monero::Network::Mainnet, - }, - }, - ) - .await; - } - - #[tokio::test] - async fn given_alice_testnet_bob_mainnet_then_network_mismatch_error() { - let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await; - - let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap(); - let request = spot_price::Request { - btc: btc_to_swap, - blockchain_network: BlockchainNetwork { - bitcoin: bitcoin::Network::Bitcoin, - monero: monero::Network::Mainnet, - }, - }; - - test.send_request(request); - test.assert_error( - alice::swap_setup::Error::BlockchainNetworkMismatch { - cli: BlockchainNetwork { - bitcoin: bitcoin::Network::Bitcoin, - monero: monero::Network::Mainnet, - }, - asb: BlockchainNetwork { - bitcoin: bitcoin::Network::Testnet, - monero: monero::Network::Stagenet, - }, - }, - bob::spot_price::Error::BlockchainNetworkMismatch { - cli: BlockchainNetwork { - bitcoin: bitcoin::Network::Bitcoin, - monero: monero::Network::Mainnet, - }, - asb: BlockchainNetwork { - bitcoin: bitcoin::Network::Testnet, - monero: monero::Network::Stagenet, - }, - }, - ) - .await; - } - - struct SpotPriceTest { - alice_swarm: Swarm>, - bob_swarm: Swarm, - - alice_peer_id: PeerId, - } - - impl SpotPriceTest { - pub async fn setup(values: AliceBehaviourValues) -> Self { - let (mut alice_swarm, _, alice_peer_id) = new_swarm(|_, _| { - Behaviour::new( - values.balance, - values.lock_fee, - values.min_buy, - values.max_buy, - values.env_config, - values.rate.clone(), - values.resume_only, - ) - }); - let (mut bob_swarm, ..) = new_swarm(|_, _| bob::spot_price::bob()); - - connect(&mut alice_swarm, &mut bob_swarm).await; - - Self { - alice_swarm, - bob_swarm, - alice_peer_id, - } - } - - pub fn construct_and_send_request(&mut self, btc_to_swap: bitcoin::Amount) { - let request = spot_price::Request { - btc: btc_to_swap, - blockchain_network: BlockchainNetwork { - bitcoin: bitcoin::Network::Testnet, - monero: monero::Network::Stagenet, - }, - }; - self.send_request(request); - } - - pub fn send_request(&mut self, spot_price_request: spot_price::Request) { - self.bob_swarm - .behaviour_mut() - .send_request(&self.alice_peer_id, spot_price_request); - } - - async fn assert_price( - &mut self, - alice_assert: (bitcoin::Amount, monero::Amount), - bob_assert: monero::Amount, - ) { - match await_events_or_timeout(self.alice_swarm.next(), self.bob_swarm.next()).await { - ( - alice::swap_setup::OutEvent::ExecutionSetupParams { btc, xmr, .. }, - spot_price::OutEvent::Message { message, .. }, - ) => { - assert_eq!(alice_assert, (btc, xmr)); - - let response = match message { - RequestResponseMessage::Response { response, .. } => response, - _ => panic!("Unexpected message {:?} for Bob", message), - }; - - match response { - spot_price::Response::Xmr(xmr) => { - assert_eq!(bob_assert, xmr) - } - _ => panic!("Unexpected response {:?} for Bob", response), - } - } - (alice_event, bob_event) => panic!( - "Received unexpected event, alice emitted {:?} and bob emitted {:?}", - alice_event, bob_event - ), - } - } - - async fn assert_error( - &mut self, - alice_assert: alice::swap_setup::Error, - bob_assert: bob::spot_price::Error, - ) { - match await_events_or_timeout(self.alice_swarm.next(), self.bob_swarm.next()).await { - ( - alice::swap_setup::OutEvent::Error { error, .. }, - spot_price::OutEvent::Message { message, .. }, - ) => { - // TODO: Somehow make PartialEq work on Alice's spot_price::Error - match (alice_assert, error) { - ( - alice::swap_setup::Error::BalanceTooLow { - balance: balance1, - buy: buy1, - }, - alice::swap_setup::Error::BalanceTooLow { - balance: balance2, - buy: buy2, - }, - ) => { - assert_eq!(balance1, balance2); - assert_eq!(buy1, buy2); - } - ( - alice::swap_setup::Error::BlockchainNetworkMismatch { - cli: cli1, - asb: asb1, - }, - alice::swap_setup::Error::BlockchainNetworkMismatch { - cli: cli2, - asb: asb2, - }, - ) => { - assert_eq!(cli1, cli2); - assert_eq!(asb1, asb2); - } - ( - alice::swap_setup::Error::AmountBelowMinimum { .. }, - alice::swap_setup::Error::AmountBelowMinimum { .. }, - ) - | ( - alice::swap_setup::Error::AmountAboveMaximum { .. }, - alice::swap_setup::Error::AmountAboveMaximum { .. }, - ) - | ( - alice::swap_setup::Error::LatestRateFetchFailed(_), - alice::swap_setup::Error::LatestRateFetchFailed(_), - ) - | ( - alice::swap_setup::Error::SellQuoteCalculationFailed(_), - alice::swap_setup::Error::SellQuoteCalculationFailed(_), - ) - | ( - alice::swap_setup::Error::ResumeOnlyMode, - alice::swap_setup::Error::ResumeOnlyMode, - ) => {} - (alice_assert, error) => { - panic!("Expected: {:?} Actual: {:?}", alice_assert, error) - } - } - - let response = match message { - RequestResponseMessage::Response { response, .. } => response, - _ => panic!("Unexpected message {:?} for Bob", message), - }; - - match response { - spot_price::Response::Error(error) => { - assert_eq!(bob_assert, error.into()) - } - _ => panic!("Unexpected response {:?} for Bob", response), - } - } - (alice_event, bob_event) => panic!( - "Received unexpected event, alice emitted {:?} and bob emitted {:?}", - alice_event, bob_event - ), - } - } - } - - struct AliceBehaviourValues { - pub balance: monero::Amount, - pub lock_fee: monero::Amount, - pub min_buy: bitcoin::Amount, - pub max_buy: bitcoin::Amount, - pub rate: TestRate, // 0.01 - pub resume_only: bool, - pub env_config: env::Config, - } - - impl AliceBehaviourValues { - pub fn with_balance(mut self, balance: monero::Amount) -> AliceBehaviourValues { - self.balance = balance; - self - } - - pub fn with_lock_fee(mut self, lock_fee: monero::Amount) -> AliceBehaviourValues { - self.lock_fee = lock_fee; - self - } - - pub fn with_min_buy(mut self, min_buy: bitcoin::Amount) -> AliceBehaviourValues { - self.min_buy = min_buy; - self - } - - pub fn with_max_buy(mut self, max_buy: bitcoin::Amount) -> AliceBehaviourValues { - self.max_buy = max_buy; - self - } - - pub fn with_resume_only(mut self, resume_only: bool) -> AliceBehaviourValues { - self.resume_only = resume_only; - self - } - - pub fn with_rate(mut self, rate: TestRate) -> AliceBehaviourValues { - self.rate = rate; - self - } - - pub fn with_env_config(mut self, env_config: env::Config) -> AliceBehaviourValues { - self.env_config = env_config; - self - } - } - - #[derive(Clone, Debug)] - pub enum TestRate { - Rate(Rate), - Err(TestRateError), - } - - impl TestRate { - pub const RATE: f64 = 0.01; - - pub fn from_rate_and_spread(rate: f64, spread: u64) -> Self { - let ask = bitcoin::Amount::from_btc(rate).expect("Static value should never fail"); - let spread = Decimal::from(spread); - Self::Rate(Rate::new(ask, spread)) - } - - pub fn error_rate() -> Self { - Self::Err(TestRateError {}) - } - } - - impl Default for TestRate { - fn default() -> Self { - TestRate::from_rate_and_spread(Self::RATE, 0) - } - } - - #[derive(Debug, Clone, thiserror::Error)] - #[error("Could not fetch rate")] - pub struct TestRateError {} - - impl LatestRate for TestRate { - type Error = TestRateError; - - fn latest_rate(&mut self) -> Result { - match self { - TestRate::Rate(rate) => Ok(*rate), - TestRate::Err(error) => Err(error.clone()), + SpotPriceError::Other } } } diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index 3d266bd5..ecba916b 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -224,8 +224,6 @@ async fn start_alice( ) -> (AliceApplicationHandle, Receiver) { let db = Arc::new(Database::open(db_path.as_path()).unwrap()); - let current_balance = monero_wallet.get_balance().await.unwrap(); - let lock_fee = monero_wallet.static_tx_fee_estimate(); let min_buy = bitcoin::Amount::from_sat(u64::MIN); let max_buy = bitcoin::Amount::from_sat(u64::MAX); let latest_rate = FixedRate::default(); @@ -233,8 +231,6 @@ async fn start_alice( let mut swarm = swarm::asb( &seed, - current_balance, - lock_fee, min_buy, max_buy, latest_rate,