diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 0cb557b7..71fff4f3 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -85,20 +85,22 @@ async fn main() -> Result<()> { init_monero_wallet(data_dir, monero_daemon_address, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let mut swarm = swarm::cli(&seed, seller_peer_id, tor_socks5_port).await?; + let mut swarm = swarm::cli( + &seed, + seller_peer_id, + tor_socks5_port, + env_config, + bitcoin_wallet.clone(), + ) + .await?; swarm .behaviour_mut() .add_address(seller_peer_id, seller_addr); tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); - let (event_loop, mut event_loop_handle) = EventLoop::new( - swap_id, - swarm, - seller_peer_id, - bitcoin_wallet.clone(), - env_config, - )?; + let (event_loop, mut event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id, env_config)?; let event_loop = tokio::spawn(event_loop.run()); let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size()); @@ -185,20 +187,22 @@ async fn main() -> Result<()> { let seller_peer_id = db.get_peer_id(swap_id)?; - let mut swarm = swarm::cli(&seed, seller_peer_id, tor_socks5_port).await?; + let mut swarm = swarm::cli( + &seed, + seller_peer_id, + tor_socks5_port, + env_config, + bitcoin_wallet.clone(), + ) + .await?; let our_peer_id = swarm.local_peer_id(); tracing::debug!(peer_id = %our_peer_id, "Initializing network module"); swarm .behaviour_mut() .add_address(seller_peer_id, seller_addr); - let (event_loop, event_loop_handle) = EventLoop::new( - swap_id, - swarm, - seller_peer_id, - bitcoin_wallet.clone(), - env_config, - )?; + let (event_loop, event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id, env_config)?; let handle = tokio::spawn(event_loop.run()); let swap = Swap::from_db( diff --git a/swap/src/database/bob.rs b/swap/src/database/bob.rs index bbd5b6a0..b0258b00 100644 --- a/swap/src/database/bob.rs +++ b/swap/src/database/bob.rs @@ -46,7 +46,7 @@ impl From for Bob { fn from(bob_state: BobState) -> Self { match bob_state { BobState::Started { btc_amount } => Bob::Started { btc_amount }, - BobState::ExecutionSetupDone(state2) => Bob::ExecutionSetupDone { state2 }, + BobState::SwapSetupCompleted(state2) => Bob::ExecutionSetupDone { state2 }, BobState::BtcLocked(state3) => Bob::BtcLocked { state3 }, BobState::XmrLockProofReceived { state, @@ -78,7 +78,7 @@ impl From for BobState { fn from(db_state: Bob) -> Self { match db_state { Bob::Started { btc_amount } => BobState::Started { btc_amount }, - Bob::ExecutionSetupDone { state2 } => BobState::ExecutionSetupDone(state2), + Bob::ExecutionSetupDone { state2 } => BobState::SwapSetupCompleted(state2), Bob::BtcLocked { state3 } => BobState::BtcLocked(state3), Bob::XmrLockProofReceived { state, diff --git a/swap/src/network.rs b/swap/src/network.rs index 90fe06b6..1eea35e4 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -5,7 +5,6 @@ pub mod encrypted_signature; pub mod json_pull_codec; pub mod quote; pub mod redial; -pub mod spot_price; pub mod swap_setup; pub mod swarm; pub mod tor_transport; diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs deleted file mode 100644 index 8268d146..00000000 --- a/swap/src/network/spot_price.rs +++ /dev/null @@ -1,138 +0,0 @@ -use crate::monero; -use crate::network::cbor_request_response::CborCodec; -use libp2p::core::ProtocolName; -use libp2p::request_response::{RequestResponse, RequestResponseEvent, RequestResponseMessage}; -use serde::{Deserialize, Serialize}; - -pub const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0"; -pub type OutEvent = RequestResponseEvent; -pub type Message = RequestResponseMessage; - -pub type Behaviour = RequestResponse>; - -/// The spot price protocol allows parties to **initiate** a trade by requesting -/// a spot price. -/// -/// A spot price is binding for both parties, i.e. after the spot-price protocol -/// completes, both parties are expected to follow up with the `execution-setup` -/// protocol. -/// -/// If a party wishes to only inquire about the current price, they should use -/// the `quote` protocol instead. -#[derive(Debug, Clone, Copy, Default)] -pub struct SpotPriceProtocol; - -impl ProtocolName for SpotPriceProtocol { - fn protocol_name(&self) -> &[u8] { - PROTOCOL.as_bytes() - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Request { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - pub btc: bitcoin::Amount, - pub blockchain_network: BlockchainNetwork, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Response { - Xmr(monero::Amount), - Error(Error), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Error { - NoSwapsAccepted, - AmountBelowMinimum { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - min: bitcoin::Amount, - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - buy: bitcoin::Amount, - }, - AmountAboveMaximum { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - max: bitcoin::Amount, - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - buy: bitcoin::Amount, - }, - BalanceTooLow { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - buy: bitcoin::Amount, - }, - BlockchainNetworkMismatch { - cli: BlockchainNetwork, - asb: BlockchainNetwork, - }, - /// To be used for errors that cannot be explained on the CLI side (e.g. - /// rate update problems on the seller side) - Other, -} - -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] -pub struct BlockchainNetwork { - #[serde(with = "crate::bitcoin::network")] - pub bitcoin: bitcoin::Network, - #[serde(with = "crate::monero::network")] - pub monero: monero::Network, -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::monero; - - #[test] - fn snapshot_test_serialize() { - let amount = monero::Amount::from_piconero(100_000u64); - let xmr = r#"{"Xmr":100000}"#.to_string(); - let serialized = serde_json::to_string(&Response::Xmr(amount)).unwrap(); - assert_eq!(xmr, serialized); - - let error = r#"{"Error":"NoSwapsAccepted"}"#.to_string(); - let serialized = serde_json::to_string(&Response::Error(Error::NoSwapsAccepted)).unwrap(); - assert_eq!(error, serialized); - - let error = r#"{"Error":{"AmountBelowMinimum":{"min":0,"buy":0}}}"#.to_string(); - let serialized = serde_json::to_string(&Response::Error(Error::AmountBelowMinimum { - min: Default::default(), - buy: Default::default(), - })) - .unwrap(); - assert_eq!(error, serialized); - - let error = r#"{"Error":{"AmountAboveMaximum":{"max":0,"buy":0}}}"#.to_string(); - let serialized = serde_json::to_string(&Response::Error(Error::AmountAboveMaximum { - max: Default::default(), - buy: Default::default(), - })) - .unwrap(); - assert_eq!(error, serialized); - - let error = r#"{"Error":{"BalanceTooLow":{"buy":0}}}"#.to_string(); - let serialized = serde_json::to_string(&Response::Error(Error::BalanceTooLow { - buy: Default::default(), - })) - .unwrap(); - assert_eq!(error, serialized); - - let error = r#"{"Error":{"BlockchainNetworkMismatch":{"cli":{"bitcoin":"Mainnet","monero":"Mainnet"},"asb":{"bitcoin":"Testnet","monero":"Stagenet"}}}}"#.to_string(); - let serialized = - serde_json::to_string(&Response::Error(Error::BlockchainNetworkMismatch { - cli: BlockchainNetwork { - bitcoin: bitcoin::Network::Bitcoin, - monero: monero::Network::Mainnet, - }, - asb: BlockchainNetwork { - bitcoin: bitcoin::Network::Testnet, - monero: monero::Network::Stagenet, - }, - })) - .unwrap(); - assert_eq!(error, serialized); - - let error = r#"{"Error":"Other"}"#.to_string(); - let serialized = serde_json::to_string(&Response::Error(Error::Other)).unwrap(); - assert_eq!(error, serialized); - } -} diff --git a/swap/src/network/swap_setup.rs b/swap/src/network/swap_setup.rs index 33919fdc..705551e4 100644 --- a/swap/src/network/swap_setup.rs +++ b/swap/src/network/swap_setup.rs @@ -1,3 +1,4 @@ +use crate::monero; use libp2p::core::upgrade; use libp2p::swarm::NegotiatedSubstream; use serde::de::DeserializeOwned; @@ -49,7 +50,7 @@ pub struct SpotPriceRequest { #[derive(Serialize, Deserialize, Debug, Clone)] pub enum SpotPriceResponse { - Xmr(crate::monero::Amount), + Xmr(monero::Amount), Error(SpotPriceError), } diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 6b0090ba..43d635d7 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,11 +1,12 @@ use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::{alice, bob}; use crate::seed::Seed; -use crate::{asb, cli, env, tor}; +use crate::{asb, bitcoin, cli, env, tor}; use anyhow::Result; use libp2p::swarm::SwarmBuilder; use libp2p::{PeerId, Swarm}; use std::fmt::Debug; +use std::sync::Arc; #[allow(clippy::too_many_arguments)] pub fn asb( @@ -38,13 +39,15 @@ pub async fn cli( seed: &Seed, alice: PeerId, tor_socks5_port: u16, + env_config: env::Config, + bitcoin_wallet: Arc, ) -> Result> { let maybe_tor_socks5_port = match tor::Client::new(tor_socks5_port).assert_tor_running().await { Ok(()) => Some(tor_socks5_port), Err(_) => None, }; - let behaviour = bob::Behaviour::new(alice); + let behaviour = bob::Behaviour::new(alice, env_config, bitcoin_wallet); let identity = seed.derive_libp2p_identity(); let transport = cli::transport::new(&identity, maybe_tor_socks5_port)?; diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index ae263f97..4c93b73b 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -14,11 +14,10 @@ pub use self::swap::{run, run_until}; mod behaviour; pub mod cancel; pub mod event_loop; -mod execution_setup; pub mod refund; -pub mod spot_price; pub mod state; pub mod swap; +mod swap_setup; pub struct Swap { pub state: BobState, diff --git a/swap/src/protocol/bob/behaviour.rs b/swap/src/protocol/bob/behaviour.rs index 8f156e1b..951c84f6 100644 --- a/swap/src/protocol/bob/behaviour.rs +++ b/swap/src/protocol/bob/behaviour.rs @@ -1,12 +1,13 @@ use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, quote, redial, spot_price, transfer_proof}; -use crate::protocol::bob; -use crate::protocol::bob::{execution_setup, State2}; +use crate::network::{encrypted_signature, quote, redial, transfer_proof}; +use crate::protocol::bob::{swap_setup, State2}; +use crate::{bitcoin, env}; use anyhow::{anyhow, Error, Result}; use libp2p::core::Multiaddr; use libp2p::ping::{Ping, PingEvent}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; +use std::sync::Arc; use std::time::Duration; #[derive(Debug)] @@ -15,11 +16,7 @@ pub enum OutEvent { id: RequestId, response: BidQuote, }, - SpotPriceReceived { - id: RequestId, - response: spot_price::Response, - }, - ExecutionSetupDone(Box>), + SwapSetupCompleted(Box>), TransferProofReceived { msg: Box, channel: ResponseChannel<()>, @@ -62,8 +59,7 @@ impl OutEvent { #[allow(missing_debug_implementations)] pub struct Behaviour { pub quote: quote::Behaviour, - pub spot_price: spot_price::Behaviour, - pub execution_setup: execution_setup::Behaviour, + pub swap_setup: swap_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour, pub redial: redial::Behaviour, @@ -75,11 +71,14 @@ pub struct Behaviour { } impl Behaviour { - pub fn new(alice: PeerId) -> Self { + pub fn new( + alice: PeerId, + env_config: env::Config, + bitcoin_wallet: Arc, + ) -> Self { Self { quote: quote::bob(), - spot_price: bob::spot_price::bob(), - execution_setup: Default::default(), + swap_setup: swap_setup::Behaviour::new(env_config, bitcoin_wallet), transfer_proof: transfer_proof::bob(), encrypted_signature: encrypted_signature::bob(), redial: redial::Behaviour::new(alice, Duration::from_secs(2)), @@ -90,7 +89,6 @@ impl Behaviour { /// Add a known address for the given peer pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { self.quote.add_address(&peer_id, address.clone()); - self.spot_price.add_address(&peer_id, address.clone()); self.transfer_proof.add_address(&peer_id, address.clone()); self.encrypted_signature.add_address(&peer_id, address); } diff --git a/swap/src/protocol/bob/cancel.rs b/swap/src/protocol/bob/cancel.rs index 9e667a5f..7723067c 100644 --- a/swap/src/protocol/bob/cancel.rs +++ b/swap/src/protocol/bob/cancel.rs @@ -26,7 +26,7 @@ pub async fn cancel( BobState::EncSigSent(state4) => state4.cancel(), BobState::CancelTimelockExpired(state6) => state6, BobState::Started { .. } - | BobState::ExecutionSetupDone(_) + | BobState::SwapSetupCompleted(_) | BobState::BtcRedeemed(_) | BobState::BtcCancelled(_) | BobState::BtcRefunded(_) diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index c8e39eee..742129a0 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,18 +1,16 @@ use crate::bitcoin::EncryptedSignature; +use crate::network::encrypted_signature; use crate::network::quote::BidQuote; -use crate::network::spot_price::{BlockchainNetwork, Response}; -use crate::network::{encrypted_signature, spot_price}; -use crate::protocol::bob; -use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; -use crate::{bitcoin, env, monero}; -use anyhow::{bail, Context, Result}; +use crate::protocol::bob::swap_setup::NewSwap; +use crate::protocol::bob::{Behaviour, OutEvent, State2}; +use crate::{env, monero}; +use anyhow::{Context, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::{FutureExt, StreamExt}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use std::collections::HashMap; -use std::sync::Arc; use std::time::Duration; use uuid::Uuid; @@ -20,22 +18,19 @@ use uuid::Uuid; pub struct EventLoop { swap_id: Uuid, swarm: libp2p::Swarm, - bitcoin_wallet: Arc, alice_peer_id: PeerId, // these streams represents outgoing requests that we have to make quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, - spot_price_requests: bmrng::RequestReceiverStream, encrypted_signatures: bmrng::RequestReceiverStream, - execution_setup_requests: bmrng::RequestReceiverStream>, + swap_setup_requests: bmrng::RequestReceiverStream>, // these represents requests that are currently in-flight. // once we get a response to a matching [`RequestId`], we will use the responder to relay the // response. - inflight_spot_price_requests: HashMap>, inflight_quote_requests: HashMap>, inflight_encrypted_signature_requests: HashMap>, - inflight_execution_setup: Option>>, + inflight_swap_setup: Option>>, /// The sender we will use to relay incoming transfer proofs. transfer_proof: bmrng::RequestSender, @@ -54,37 +49,31 @@ impl EventLoop { swap_id: Uuid, swarm: Swarm, alice_peer_id: PeerId, - bitcoin_wallet: Arc, env_config: env::Config, ) -> Result<(Self, EventLoopHandle)> { let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(60)); let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(60)); let encrypted_signature = bmrng::channel_with_timeout(1, Duration::from_secs(60)); - let spot_price = bmrng::channel_with_timeout(1, Duration::from_secs(60)); let quote = bmrng::channel_with_timeout(1, Duration::from_secs(60)); let event_loop = EventLoop { swap_id, swarm, alice_peer_id, - bitcoin_wallet, - execution_setup_requests: execution_setup.1.into(), + swap_setup_requests: execution_setup.1.into(), transfer_proof: transfer_proof.0, encrypted_signatures: encrypted_signature.1.into(), - spot_price_requests: spot_price.1.into(), quote_requests: quote.1.into(), - inflight_spot_price_requests: HashMap::default(), inflight_quote_requests: HashMap::default(), - inflight_execution_setup: None, + inflight_swap_setup: None, inflight_encrypted_signature_requests: HashMap::default(), pending_transfer_proof: OptionFuture::from(None), }; let handle = EventLoopHandle { - execution_setup: execution_setup.0, + swap_setup: execution_setup.0, transfer_proof: transfer_proof.1, encrypted_signature: encrypted_signature.0, - spot_price: spot_price.0, quote: quote.0, env_config, }; @@ -106,18 +95,13 @@ impl EventLoop { tokio::select! { swarm_event = self.swarm.next_event().fuse() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SpotPriceReceived { id, response }) => { - if let Some(responder) = self.inflight_spot_price_requests.remove(&id) { - let _ = responder.respond(response); - } - } SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => { if let Some(responder) = self.inflight_quote_requests.remove(&id) { let _ = responder.respond(response); } } - SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(response)) => { - if let Some(responder) = self.inflight_execution_setup.take() { + SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response)) => { + if let Some(responder) = self.inflight_swap_setup.take() { let _ = responder.respond(*response); } } @@ -197,17 +181,13 @@ impl EventLoop { // Handle to-be-sent requests for all our network protocols. // Use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected. - Some((request, responder)) = self.spot_price_requests.next().fuse(), if self.is_connected_to_alice() => { - let id = self.swarm.behaviour_mut().spot_price.send_request(&self.alice_peer_id, request); - self.inflight_spot_price_requests.insert(id, responder); - }, Some(((), responder)) = self.quote_requests.next().fuse(), if self.is_connected_to_alice() => { let id = self.swarm.behaviour_mut().quote.send_request(&self.alice_peer_id, ()); self.inflight_quote_requests.insert(id, responder); }, - Some((request, responder)) = self.execution_setup_requests.next().fuse(), if self.is_connected_to_alice() => { - self.swarm.behaviour_mut().execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone()); - self.inflight_execution_setup = Some(responder); + Some((swap, responder)) = self.swap_setup_requests.next().fuse(), if self.is_connected_to_alice() => { + self.swarm.behaviour_mut().swap_setup.start(self.alice_peer_id, swap).await; + self.inflight_swap_setup = Some(responder); }, Some((tx_redeem_encsig, responder)) = self.encrypted_signatures.next().fuse(), if self.is_connected_to_alice() => { let request = encrypted_signature::Request { @@ -235,17 +215,16 @@ impl EventLoop { #[derive(Debug)] pub struct EventLoopHandle { - execution_setup: bmrng::RequestSender>, + swap_setup: bmrng::RequestSender>, transfer_proof: bmrng::RequestReceiver, encrypted_signature: bmrng::RequestSender, - spot_price: bmrng::RequestSender, quote: bmrng::RequestSender<(), BidQuote>, env_config: env::Config, } impl EventLoopHandle { - pub async fn execution_setup(&mut self, state0: State0) -> Result { - self.execution_setup.send_receive(state0).await? + pub async fn setup_swap(&mut self, swap: NewSwap) -> Result { + self.swap_setup.send_receive(swap).await? } pub async fn recv_transfer_proof(&mut self) -> Result { @@ -261,27 +240,6 @@ impl EventLoopHandle { Ok(transfer_proof) } - pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { - let response = self - .spot_price - .send_receive(spot_price::Request { - btc, - blockchain_network: BlockchainNetwork { - bitcoin: self.env_config.bitcoin_network, - monero: self.env_config.monero_network, - }, - }) - .await?; - - match response { - Response::Xmr(xmr) => Ok(xmr), - Response::Error(error) => { - let error: bob::spot_price::Error = error.into(); - bail!(error); - } - } - } - pub async fn request_quote(&mut self) -> Result { Ok(self.quote.send_receive(()).await?) } diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs deleted file mode 100644 index 947af3bd..00000000 --- a/swap/src/protocol/bob/execution_setup.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::network::cbor_request_response::BUF_SIZE; -use crate::protocol::bob::{State0, State2}; -use crate::protocol::{bob, Message1, Message3}; -use anyhow::{Context, Error, Result}; -use libp2p::PeerId; -use libp2p_async_await::BehaviourOutEvent; -use std::sync::Arc; -use std::time::Duration; - -#[derive(Debug)] -pub enum OutEvent { - Done(Result), -} - -impl From> for OutEvent { - fn from(event: BehaviourOutEvent<(), State2, Error>) -> Self { - match event { - BehaviourOutEvent::Outbound(_, Ok(State2)) => OutEvent::Done(Ok(State2)), - BehaviourOutEvent::Outbound(_, Err(e)) => OutEvent::Done(Err(e)), - BehaviourOutEvent::Inbound(..) => unreachable!("Bob only supports outbound"), - } - } -} - -#[derive(libp2p::NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -pub struct Behaviour { - inner: libp2p_async_await::Behaviour<(), State2, anyhow::Error>, -} - -impl Default for Behaviour { - fn default() -> Self { - Self { - inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"), - } - } -} - -impl Behaviour { - pub fn run( - &mut self, - alice: PeerId, - state0: State0, - bitcoin_wallet: Arc, - ) { - self.inner.do_protocol_dialer(alice, move |mut substream| { - let protocol = async move { - tracing::debug!("Starting execution setup with {}", alice); - - substream - .write_message( - &serde_cbor::to_vec(&state0.next_message()) - .context("Failed to serialize message0")?, - ) - .await?; - - let message1 = - serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) - .context("Failed to deserialize message1")?; - let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?; - - substream - .write_message( - &serde_cbor::to_vec(&state1.next_message()) - .context("Failed to serialize message2")?, - ) - .await?; - - let message3 = - serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) - .context("Failed to deserialize message3")?; - let state2 = state1.receive(message3)?; - - substream - .write_message( - &serde_cbor::to_vec(&state2.next_message()) - .context("Failed to serialize message4")?, - ) - .await?; - - Ok(state2) - }; - - async move { tokio::time::timeout(Duration::from_secs(60), protocol).await? } - }) - } -} - -impl From for bob::OutEvent { - fn from(event: OutEvent) -> Self { - match event { - OutEvent::Done(res) => Self::ExecutionSetupDone(Box::new(res)), - } - } -} diff --git a/swap/src/protocol/bob/refund.rs b/swap/src/protocol/bob/refund.rs index 2fe324ce..a769eda9 100644 --- a/swap/src/protocol/bob/refund.rs +++ b/swap/src/protocol/bob/refund.rs @@ -26,7 +26,7 @@ pub async fn refund( BobState::CancelTimelockExpired(state6) => state6, BobState::BtcCancelled(state6) => state6, BobState::Started { .. } - | BobState::ExecutionSetupDone(_) + | BobState::SwapSetupCompleted(_) | BobState::BtcRedeemed(_) | BobState::BtcRefunded(_) | BobState::XmrRedeemed { .. } diff --git a/swap/src/protocol/bob/spot_price.rs b/swap/src/protocol/bob/spot_price.rs deleted file mode 100644 index 072fdb2d..00000000 --- a/swap/src/protocol/bob/spot_price.rs +++ /dev/null @@ -1,86 +0,0 @@ -use crate::network::cbor_request_response::CborCodec; -use crate::network::spot_price; -use crate::network::spot_price::SpotPriceProtocol; -use crate::protocol::bob::OutEvent; -use libp2p::request_response::{ProtocolSupport, RequestResponseConfig}; -use libp2p::PeerId; - -const PROTOCOL: &str = spot_price::PROTOCOL; -pub type SpotPriceOutEvent = spot_price::OutEvent; - -/// Constructs a new instance of the `spot-price` behaviour to be used by Bob. -/// -/// Bob only supports outbound connections, i.e. requesting a spot price for a -/// given amount of BTC in XMR. -pub fn bob() -> spot_price::Behaviour { - spot_price::Behaviour::new( - CborCodec::default(), - vec![(SpotPriceProtocol, ProtocolSupport::Outbound)], - RequestResponseConfig::default(), - ) -} - -impl From<(PeerId, spot_price::Message)> for OutEvent { - fn from((peer, message): (PeerId, spot_price::Message)) -> Self { - match message { - spot_price::Message::Request { .. } => Self::unexpected_request(peer), - spot_price::Message::Response { - response, - request_id, - } => Self::SpotPriceReceived { - id: request_id, - response, - }, - } - } -} - -crate::impl_from_rr_event!(SpotPriceOutEvent, OutEvent, PROTOCOL); - -#[derive(Clone, Debug, thiserror::Error, PartialEq)] -pub enum Error { - #[error("Seller currently does not accept incoming swap requests, please try again later")] - NoSwapsAccepted, - #[error("Seller refused to buy {buy} because the minimum configured buy limit is {min}")] - AmountBelowMinimum { - min: bitcoin::Amount, - buy: bitcoin::Amount, - }, - #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] - AmountAboveMaximum { - max: bitcoin::Amount, - buy: bitcoin::Amount, - }, - #[error("Seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")] - BalanceTooLow { buy: bitcoin::Amount }, - - #[error("Seller blockchain network {asb:?} setup did not match your blockchain network setup {cli:?}")] - BlockchainNetworkMismatch { - cli: spot_price::BlockchainNetwork, - asb: spot_price::BlockchainNetwork, - }, - - /// To be used for errors that cannot be explained on the CLI side (e.g. - /// rate update problems on the seller side) - #[error("Seller encountered a problem, please try again later.")] - Other, -} - -impl From for Error { - fn from(error: spot_price::Error) -> Self { - match error { - spot_price::Error::NoSwapsAccepted => Error::NoSwapsAccepted, - spot_price::Error::AmountBelowMinimum { min, buy } => { - Error::AmountBelowMinimum { min, buy } - } - spot_price::Error::AmountAboveMaximum { max, buy } => { - Error::AmountAboveMaximum { max, buy } - } - spot_price::Error::BalanceTooLow { buy } => Error::BalanceTooLow { buy }, - spot_price::Error::BlockchainNetworkMismatch { cli, asb } => { - Error::BlockchainNetworkMismatch { cli, asb } - } - spot_price::Error::Other => Error::Other, - } - } -} diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 94481fe0..e80634b6 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -26,7 +26,7 @@ pub enum BobState { Started { btc_amount: bitcoin::Amount, }, - ExecutionSetupDone(State2), + SwapSetupCompleted(State2), BtcLocked(State3), XmrLockProofReceived { state: State3, @@ -52,7 +52,7 @@ impl fmt::Display for BobState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BobState::Started { .. } => write!(f, "quote has been requested"), - BobState::ExecutionSetupDone(..) => write!(f, "execution setup done"), + BobState::SwapSetupCompleted(..) => write!(f, "execution setup done"), BobState::BtcLocked(..) => write!(f, "btc is locked"), BobState::XmrLockProofReceived { .. } => { write!(f, "XMR lock transaction transfer proof received") diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 5cdf7165..0d8eefee 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -1,12 +1,11 @@ use crate::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund}; use crate::database::Swap; -use crate::env::Config; use crate::protocol::bob; use crate::protocol::bob::event_loop::EventLoopHandle; use crate::protocol::bob::state::*; +use crate::protocol::bob::swap_setup::NewSwap; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; -use rand::rngs::OsRng; use tokio::select; use uuid::Uuid; @@ -38,7 +37,6 @@ pub async fn run_until( &mut swap.event_loop_handle, swap.bitcoin_wallet.as_ref(), swap.monero_wallet.as_ref(), - &swap.env_config, swap.receive_monero_address, ) .await?; @@ -58,7 +56,6 @@ async fn next_state( event_loop_handle: &mut EventLoopHandle, bitcoin_wallet: &bitcoin::Wallet, monero_wallet: &monero::Wallet, - env_config: &Config, receive_monero_address: monero::Address, ) -> Result { tracing::trace!(%state, "Advancing state"); @@ -73,20 +70,19 @@ async fn next_state( .estimate_fee(TxCancel::weight(), btc_amount) .await?; - let state2 = request_price_and_setup( - swap_id, - btc_amount, - event_loop_handle, - env_config, - bitcoin_refund_address, - tx_refund_fee, - tx_cancel_fee, - ) - .await?; + let state2 = event_loop_handle + .setup_swap(NewSwap { + swap_id, + btc: btc_amount, + tx_refund_fee, + tx_cancel_fee, + bitcoin_refund_address, + }) + .await?; - BobState::ExecutionSetupDone(state2) + BobState::SwapSetupCompleted(state2) } - BobState::ExecutionSetupDone(state2) => { + BobState::SwapSetupCompleted(state2) => { // Alice and Bob have exchanged info let (state3, tx_lock) = state2.lock_btc().await?; let signed_tx = bitcoin_wallet @@ -268,34 +264,3 @@ async fn next_state( BobState::XmrRedeemed { tx_lock_id } => BobState::XmrRedeemed { tx_lock_id }, }) } - -pub async fn request_price_and_setup( - swap_id: Uuid, - btc: bitcoin::Amount, - event_loop_handle: &mut EventLoopHandle, - env_config: &Config, - bitcoin_refund_address: bitcoin::Address, - tx_refund_fee: bitcoin::Amount, - tx_cancel_fee: bitcoin::Amount, -) -> Result { - let xmr = event_loop_handle.request_spot_price(btc).await?; - - tracing::info!(%btc, %xmr, "Spot price"); - - let state0 = State0::new( - swap_id, - &mut OsRng, - btc, - xmr, - env_config.bitcoin_cancel_timelock, - env_config.bitcoin_punish_timelock, - bitcoin_refund_address, - env_config.monero_finality_confirmations, - tx_refund_fee, - tx_cancel_fee, - ); - - let state2 = event_loop_handle.execution_setup(state0).await?; - - Ok(state2) -} diff --git a/swap/src/protocol/bob/swap_setup.rs b/swap/src/protocol/bob/swap_setup.rs new file mode 100644 index 00000000..a0015f58 --- /dev/null +++ b/swap/src/protocol/bob/swap_setup.rs @@ -0,0 +1,305 @@ +use crate::network::swap_setup::{ + protocol, read_cbor_message, write_cbor_message, BlockchainNetwork, SpotPriceError, + SpotPriceRequest, SpotPriceResponse, +}; +use crate::protocol::bob::State0; +use crate::protocol::{bob, Message1, Message3}; +use crate::{bitcoin, env, monero}; +use anyhow::Result; +use futures::future::{BoxFuture, OptionFuture}; +use futures::FutureExt; +use libp2p::core::connection::ConnectionId; +use libp2p::core::upgrade; +use libp2p::swarm::{ + KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, + SubstreamProtocol, +}; +use libp2p::{Multiaddr, PeerId}; +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; +use uuid::Uuid; +use void::Void; + +#[allow(missing_debug_implementations)] +pub struct Behaviour { + env_config: env::Config, + bitcoin_wallet: Arc, + new_swaps: VecDeque<(PeerId, NewSwap)>, + completed_swaps: VecDeque<(PeerId, Completed)>, +} + +impl Behaviour { + pub fn new(env_config: env::Config, bitcoin_wallet: Arc) -> Self { + Self { + env_config, + bitcoin_wallet, + new_swaps: VecDeque::default(), + completed_swaps: VecDeque::default(), + } + } + + pub async fn start(&mut self, alice: PeerId, swap: NewSwap) { + self.new_swaps.push_back((alice, swap)) + } +} + +impl From for bob::OutEvent { + fn from(completed: Completed) -> Self { + bob::OutEvent::SwapSetupCompleted(Box::new(completed.0)) + } +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = Handler; + type OutEvent = Completed; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + Handler::new(self.env_config, self.bitcoin_wallet.clone()) + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + todo!() + } + + fn inject_connected(&mut self, _: &PeerId) { + todo!() + } + + fn inject_disconnected(&mut self, _: &PeerId) { + todo!() + } + + fn inject_event(&mut self, peer: PeerId, _: ConnectionId, completed: Completed) { + self.completed_swaps.push_back((peer, completed)); + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + if let Some((_, event)) = self.completed_swaps.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + if let Some((peer, event)) = self.new_swaps.pop_front() { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id: peer, + handler: NotifyHandler::Any, + event, + }); + } + + Poll::Pending + } +} + +type OutboundStream = BoxFuture<'static, Result>; + +pub struct Handler { + outbound_stream: OptionFuture, + env_config: env::Config, + timeout: Duration, + new_swaps: VecDeque, + bitcoin_wallet: Arc, +} + +impl Handler { + fn new(env_config: env::Config, bitcoin_wallet: Arc) -> Self { + Self { + env_config, + outbound_stream: OptionFuture::from(None), + timeout: Duration::from_secs(60), + new_swaps: VecDeque::default(), + bitcoin_wallet, + } + } +} + +#[derive(Debug)] +pub struct NewSwap { + pub swap_id: Uuid, + pub btc: bitcoin::Amount, + pub tx_refund_fee: bitcoin::Amount, + pub tx_cancel_fee: bitcoin::Amount, + pub bitcoin_refund_address: bitcoin::Address, +} + +pub struct Completed(Result); + +impl ProtocolsHandler for Handler { + type InEvent = NewSwap; + type OutEvent = Completed; + type Error = Void; + type InboundProtocol = upgrade::DeniedUpgrade; + type OutboundProtocol = protocol::SwapSetup; + type InboundOpenInfo = (); + type OutboundOpenInfo = NewSwap; + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(upgrade::DeniedUpgrade, ()) + } + + fn inject_fully_negotiated_inbound(&mut self, _: Void, _: Self::InboundOpenInfo) { + unreachable!("Bob does not support inbound substreams") + } + + fn inject_fully_negotiated_outbound( + &mut self, + mut substream: NegotiatedSubstream, + info: Self::OutboundOpenInfo, + ) { + let bitcoin_wallet = self.bitcoin_wallet.clone(); + let env_config = self.env_config; + + let protocol = tokio::time::timeout(self.timeout, async move { + write_cbor_message(&mut substream, SpotPriceRequest { + btc: info.btc, + blockchain_network: BlockchainNetwork { + bitcoin: env_config.bitcoin_network, + monero: env_config.monero_network, + }, + }) + .await?; + + let xmr = Result::from(read_cbor_message::(&mut substream).await?)?; + + let state0 = State0::new( + info.swap_id, + &mut rand::thread_rng(), + info.btc, + xmr, + env_config.bitcoin_cancel_timelock, + env_config.bitcoin_punish_timelock, + info.bitcoin_refund_address, + env_config.monero_finality_confirmations, + info.tx_refund_fee, + info.tx_cancel_fee, + ); + + write_cbor_message(&mut substream, state0.next_message()).await?; + let message1 = read_cbor_message::(&mut substream).await?; + let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?; + + write_cbor_message(&mut substream, state1.next_message()).await?; + let message3 = read_cbor_message::(&mut substream).await?; + let state2 = state1.receive(message3)?; + + write_cbor_message(&mut substream, state2.next_message()).await?; + + Ok(state2) + }); + + let max_seconds = self.timeout.as_secs(); + self.outbound_stream = OptionFuture::from(Some( + async move { + protocol.await.map_err(|_| Error::Timeout { + seconds: max_seconds, + })? + } + .boxed(), + )); + } + + fn inject_event(&mut self, new_swap: Self::InEvent) { + self.new_swaps.push_back(new_swap); + } + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _: ProtocolsHandlerUpgrErr, + ) { + } + + fn connection_keep_alive(&self) -> KeepAlive { + todo!() + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if let Some(new_swap) = self.new_swaps.pop_front() { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(protocol::new(), new_swap), + }); + } + + if let Some(result) = futures::ready!(self.outbound_stream.poll_unpin(cx)) { + return Poll::Ready(ProtocolsHandlerEvent::Custom(Completed(result))); + } + + Poll::Pending + } +} + +impl From for Result { + fn from(response: SpotPriceResponse) -> Self { + match response { + SpotPriceResponse::Xmr(amount) => Ok(amount), + SpotPriceResponse::Error(e) => Err(e.into()), + } + } +} + +#[derive(Clone, Debug, thiserror::Error, PartialEq)] +pub enum Error { + #[error("Seller currently does not accept incoming swap requests, please try again later")] + NoSwapsAccepted, + #[error("Seller refused to buy {buy} because the minimum configured buy limit is {min}")] + AmountBelowMinimum { + min: bitcoin::Amount, + buy: bitcoin::Amount, + }, + #[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")] + AmountAboveMaximum { + max: bitcoin::Amount, + buy: bitcoin::Amount, + }, + #[error("Seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")] + BalanceTooLow { buy: bitcoin::Amount }, + + #[error("Seller blockchain network {asb:?} setup did not match your blockchain network setup {cli:?}")] + BlockchainNetworkMismatch { + cli: BlockchainNetwork, + asb: BlockchainNetwork, + }, + + #[error("Failed to complete swap setup within {seconds}s")] + Timeout { seconds: u64 }, + + /// To be used for errors that cannot be explained on the CLI side (e.g. + /// rate update problems on the seller side) + #[error("Seller encountered a problem, please try again later.")] + Other, +} + +impl From for Error { + fn from(error: SpotPriceError) -> Self { + match error { + SpotPriceError::NoSwapsAccepted => Error::NoSwapsAccepted, + SpotPriceError::AmountBelowMinimum { min, buy } => { + Error::AmountBelowMinimum { min, buy } + } + SpotPriceError::AmountAboveMaximum { max, buy } => { + Error::AmountAboveMaximum { max, buy } + } + SpotPriceError::BalanceTooLow { buy } => Error::BalanceTooLow { buy }, + SpotPriceError::BlockchainNetworkMismatch { cli, asb } => { + Error::BlockchainNetworkMismatch { cli, asb } + } + SpotPriceError::Other => Error::Other, + } + } +} diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index ecba916b..052b1d4b 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -445,18 +445,19 @@ impl BobParams { ) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { let tor_socks5_port = get_port() .expect("We don't care about Tor in the tests so we get a free port to disable it."); - let mut swarm = swarm::cli(&self.seed, self.alice_peer_id, tor_socks5_port).await?; + let mut swarm = swarm::cli( + &self.seed, + self.alice_peer_id, + tor_socks5_port, + self.env_config, + self.bitcoin_wallet.clone(), + ) + .await?; swarm .behaviour_mut() .add_address(self.alice_peer_id, self.alice_address.clone()); - bob::EventLoop::new( - swap_id, - swarm, - self.alice_peer_id, - self.bitcoin_wallet.clone(), - self.env_config, - ) + bob::EventLoop::new(swap_id, swarm, self.alice_peer_id, self.env_config) } }