diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8e557eff..51d1a674 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -116,7 +116,10 @@ jobs: bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, punish, alice_punishes_after_restart_punish_timelock_expired, - alice_refunds_after_restart_bob_refunded + alice_refunds_after_restart_bob_refunded, + ensure_same_swap_id, + concurrent_bobs_after_xmr_lock_proof_sent, + concurrent_bobs_before_xmr_lock_proof_sent ] runs-on: ubuntu-latest steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b986773..fe44ac8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Allow multiple concurrent swaps with the same peer on the ASB. + This is a breaking change because the swap ID is now agreed upon between CLI and ASB during swap setup. + Resuming swaps started prior to this change can result in unexpected behaviour. + ## [0.4.0] - 2021-04-06 ### Changed diff --git a/bors.toml b/bors.toml index 6d83fcd2..04973080 100644 --- a/bors.toml +++ b/bors.toml @@ -15,5 +15,8 @@ status = [ "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", "docker_tests (punish)", "docker_tests (alice_punishes_after_restart_punish_timelock_expired)", - "docker_tests (alice_refunds_after_restart_bob_refunded)" + "docker_tests (alice_refunds_after_restart_bob_refunded)", + "docker_tests (ensure_same_swap_id)", + "docker_tests (concurrent_bobs_after_xmr_lock_proof_sent)", + "docker_tests (concurrent_bobs_before_xmr_lock_proof_sent)" ] diff --git a/monero-harness/src/lib.rs b/monero-harness/src/lib.rs index 93a1579d..48615b92 100644 --- a/monero-harness/src/lib.rs +++ b/monero-harness/src/lib.rs @@ -98,7 +98,7 @@ impl<'c> Monero { Ok(wallet) } - pub async fn init(&self, wallet_amount: Vec<(&str, u64)>) -> Result<()> { + pub async fn init_miner(&self) -> Result<()> { let miner_wallet = self.wallet("miner")?; let miner_address = miner_wallet.address().await?.address; @@ -108,17 +108,34 @@ impl<'c> Monero { tracing::info!("Generated {:?} blocks", res.blocks.len()); miner_wallet.refresh().await?; - for (wallet, amount) in wallet_amount.iter() { - if *amount > 0 { - let wallet = self.wallet(wallet)?; - let address = wallet.address().await?.address; - miner_wallet.transfer(&address, *amount).await?; + Ok(()) + } + + pub async fn init_wallet(&self, name: &str, amount_in_outputs: Vec) -> Result<()> { + let miner_wallet = self.wallet("miner")?; + let miner_address = miner_wallet.address().await?.address; + let monerod = &self.monerod; + + let wallet = self.wallet(name)?; + let address = wallet.address().await?.address; + + for amount in amount_in_outputs { + if amount > 0 { + miner_wallet.transfer(&address, amount).await?; tracing::info!("Funded {} wallet with {}", wallet.name, amount); monerod.client().generate_blocks(10, &miner_address).await?; wallet.refresh().await?; } } + Ok(()) + } + + pub async fn start_miner(&self) -> Result<()> { + let miner_wallet = self.wallet("miner")?; + let miner_address = miner_wallet.address().await?.address; + let monerod = &self.monerod; + monerod.start_miner(&miner_address).await?; tracing::info!("Waiting for miner wallet to catch up..."); @@ -130,6 +147,13 @@ impl<'c> Monero { Ok(()) } + + pub async fn init_and_start_miner(&self) -> Result<()> { + self.init_miner().await?; + self.start_miner().await?; + + Ok(()) + } } fn random_prefix() -> String { diff --git a/monero-harness/tests/monerod.rs b/monero-harness/tests/monerod.rs index 40901b50..257e888d 100644 --- a/monero-harness/tests/monerod.rs +++ b/monero-harness/tests/monerod.rs @@ -14,7 +14,7 @@ async fn init_miner_and_mine_to_miner_address() { let tc = Cli::default(); let (monero, _monerod_container) = Monero::new(&tc, vec![]).await.unwrap(); - monero.init(vec![]).await.unwrap(); + monero.init_and_start_miner().await.unwrap(); let monerod = monero.monerod(); let miner_wallet = monero.wallet("miner").unwrap(); diff --git a/monero-harness/tests/wallet.rs b/monero-harness/tests/wallet.rs index 15fb3698..a3f4e52e 100644 --- a/monero-harness/tests/wallet.rs +++ b/monero-harness/tests/wallet.rs @@ -22,11 +22,10 @@ async fn fund_transfer_and_check_tx_key() { let alice_wallet = monero.wallet("alice").unwrap(); let bob_wallet = monero.wallet("bob").unwrap(); - // fund alice - monero - .init(vec![("alice", fund_alice), ("bob", fund_bob)]) - .await - .unwrap(); + monero.init_miner().await.unwrap(); + monero.init_wallet("alice", vec![fund_alice]).await.unwrap(); + monero.init_wallet("bob", vec![fund_bob]).await.unwrap(); + monero.start_miner().await.unwrap(); // check alice balance let got_alice_balance = alice_wallet.balance().await.unwrap(); diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index ed74a4c0..c881c21a 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -107,8 +107,9 @@ async fn main() -> Result<()> { let mut swarm = swarm::new::(&seed)?; swarm.add_address(alice_peer_id, alice_multiaddr); + let swap_id = Uuid::new_v4(); let (event_loop, mut event_loop_handle) = - EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; + EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?; let event_loop = tokio::spawn(event_loop.run()); let send_bitcoin = determine_btc_to_swap( @@ -128,7 +129,6 @@ async fn main() -> Result<()> { ) .await?; - let swap_id = Uuid::new_v4(); db.insert_peer_id(swap_id, alice_peer_id).await?; let swap = Builder::new( @@ -190,7 +190,7 @@ async fn main() -> Result<()> { swarm.add_address(alice_peer_id, alice_multiaddr); let (event_loop, event_loop_handle) = - EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; + EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?; let handle = tokio::spawn(event_loop.run()); let swap = Builder::new( diff --git a/swap/src/network/encrypted_signature.rs b/swap/src/network/encrypted_signature.rs index 1f5d94f9..3930e3eb 100644 --- a/swap/src/network/encrypted_signature.rs +++ b/swap/src/network/encrypted_signature.rs @@ -5,6 +5,7 @@ use libp2p::request_response::{ RequestResponseMessage, }; use serde::{Deserialize, Serialize}; +use uuid::Uuid; pub type OutEvent = RequestResponseEvent; @@ -19,6 +20,7 @@ impl ProtocolName for EncryptedSignatureProtocol { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Request { + pub swap_id: Uuid, pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, } diff --git a/swap/src/network/transfer_proof.rs b/swap/src/network/transfer_proof.rs index 3cf83adb..05d7ae7d 100644 --- a/swap/src/network/transfer_proof.rs +++ b/swap/src/network/transfer_proof.rs @@ -6,6 +6,7 @@ use libp2p::request_response::{ RequestResponseMessage, }; use serde::{Deserialize, Serialize}; +use uuid::Uuid; pub type OutEvent = RequestResponseEvent; @@ -20,6 +21,7 @@ impl ProtocolName for TransferProofProtocol { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Request { + pub swap_id: Uuid, pub tx_lock_proof: monero::TransferProof, } diff --git a/swap/src/protocol.rs b/swap/src/protocol.rs index fb3a35be..58b616b3 100644 --- a/swap/src/protocol.rs +++ b/swap/src/protocol.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof}; use sigma_fun::HashTranscript; +use uuid::Uuid; pub mod alice; pub mod bob; @@ -18,14 +19,9 @@ pub static CROSS_CURVE_PROOF_SYSTEM: Lazy< ) }); -#[derive(Debug, Copy, Clone)] -pub struct StartingBalances { - pub xmr: monero::Amount, - pub btc: bitcoin::Amount, -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message0 { + swap_id: Uuid, B: bitcoin::PublicKey, S_b_monero: monero::PublicKey, S_b_bitcoin: bitcoin::PublicKey, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 9ecaf49d..d5d59c3d 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -6,6 +6,7 @@ use libp2p::request_response::{ RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; use libp2p::{NetworkBehaviour, PeerId}; +use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { @@ -20,6 +21,7 @@ pub enum OutEvent { }, ExecutionSetupDone { bob_peer_id: PeerId, + swap_id: Uuid, state3: Box, }, TransferProofAcknowledged { @@ -157,9 +159,11 @@ impl From for OutEvent { match event { Done { bob_peer_id, + swap_id, state3, } => OutEvent::ExecutionSetupDone { bob_peer_id, + swap_id, state3: Box::new(state3), }, Failure { peer, error } => OutEvent::Failure { peer, error }, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index cfe20f14..930a435c 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,7 +3,7 @@ use crate::database::Database; use crate::env::Config; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, spot_price, transfer_proof}; +use crate::network::{spot_price, transfer_proof}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; @@ -43,9 +43,8 @@ pub struct EventLoop { swap_sender: mpsc::Sender, - /// Stores a sender per peer for incoming [`EncryptedSignature`]s. - recv_encrypted_signature: - HashMap>, + /// Stores incoming [`EncryptedSignature`]s per swap. + recv_encrypted_signature: HashMap>, inflight_encrypted_signatures: FuturesUnordered>>, send_transfer_proof: FuturesUnordered, @@ -120,7 +119,7 @@ where } }; - let handle = self.new_handle(peer_id); + let handle = self.new_handle(peer_id, swap_id); let swap = Swap { event_loop_handle: handle, @@ -186,25 +185,26 @@ where tracing::debug!(%peer, "Failed to respond with quote"); } } - SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { - let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, swap_id, state3}) => { + let _ = self.handle_execution_setup_done(bob_peer_id, swap_id, *state3).await; } SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => { - tracing::trace!(%peer, "Bob acknowledged transfer proof"); + tracing::debug!(%peer, "Bob acknowledged transfer proof"); if let Some(responder) = self.inflight_transfer_proofs.remove(&id) { let _ = responder.respond(()); } } SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { - let sender = match self.recv_encrypted_signature.remove(&peer) { + let sender = match self.recv_encrypted_signature.remove(&msg.swap_id) { Some(sender) => sender, None => { + // TODO: Don't just drop encsig if we currently don't have a running swap for it, save in db tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?"); continue; } }; - let mut responder = match sender.send(*msg).await { + let mut responder = match sender.send(msg.tx_redeem_encsig).await { Ok(responder) => responder, Err(_) => { tracing::warn!(%peer, "Failed to relay encrypted signature to swap"); @@ -262,8 +262,8 @@ where let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof); self.inflight_transfer_proofs.insert(id, responder); }, - Some(Err(_)) => { - tracing::debug!("A swap stopped without sending a transfer proof"); + Some(Err(e)) => { + tracing::debug!("A swap stopped without sending a transfer proof: {:#}", e); } None => { unreachable!("stream of transfer proof receivers must never terminate") @@ -319,9 +319,13 @@ where }) } - async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { - let swap_id = Uuid::new_v4(); - let handle = self.new_handle(bob_peer_id); + async fn handle_execution_setup_done( + &mut self, + bob_peer_id: PeerId, + swap_id: Uuid, + state3: State3, + ) { + let handle = self.new_handle(bob_peer_id, swap_id); let initial_state = AliceState::Started { state3: Box::new(state3), @@ -337,7 +341,7 @@ where swap_id, }; - // TODO: Consider adding separate components for start/rsume of swaps + // TODO: Consider adding separate components for start/resume of swaps // swaps save peer id so we can resume match self.db.insert_peer_id(swap_id, bob_peer_id).await { @@ -354,17 +358,24 @@ where /// Create a new [`EventLoopHandle`] that is scoped for communication with /// the given peer. - fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { + fn new_handle(&mut self, peer: PeerId, swap_id: Uuid) -> EventLoopHandle { // we deliberately don't put timeouts on these channels because the swap always // races these futures against a timelock + let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1); let encrypted_signature = bmrng::channel(1); self.recv_encrypted_signature - .insert(peer, encrypted_signature.0); + .insert(swap_id, encrypted_signature.0); + self.send_transfer_proof.push( async move { - let (request, responder) = transfer_proof_receiver.recv().await?; + let (transfer_proof, responder) = transfer_proof_receiver.recv().await?; + + let request = transfer_proof::Request { + swap_id, + tx_lock_proof: transfer_proof, + }; Ok((peer, request, responder)) } @@ -442,13 +453,13 @@ impl LatestRate for KrakenRate { #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: Option>, - send_transfer_proof: Option>, + recv_encrypted_signature: Option>, + send_transfer_proof: Option>, } impl EventLoopHandle { pub async fn recv_encrypted_signature(&mut self) -> Result { - let (request, responder) = self + let (tx_redeem_encsig, responder) = self .recv_encrypted_signature .take() .context("Encrypted signature was already received")? @@ -459,14 +470,14 @@ impl EventLoopHandle { .respond(()) .context("Failed to acknowledge receipt of encrypted signature")?; - Ok(request.tx_redeem_encsig) + Ok(tx_redeem_encsig) } pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { self.send_transfer_proof .take() .context("Transfer proof was already sent")? - .send_receive(transfer_proof::Request { tx_lock_proof: msg }) + .send_receive(msg) .await .context("Failed to send transfer proof")?; diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index a4b04aa0..4c576b0b 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -4,18 +4,27 @@ use crate::protocol::{Message0, Message2, Message4}; use anyhow::{Context, Error}; use libp2p::PeerId; use libp2p_async_await::BehaviourOutEvent; +use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { - Done { bob_peer_id: PeerId, state3: State3 }, - Failure { peer: PeerId, error: Error }, + Done { + bob_peer_id: PeerId, + swap_id: Uuid, + state3: State3, + }, + Failure { + peer: PeerId, + error: Error, + }, } -impl From> for OutEvent { - fn from(event: BehaviourOutEvent<(PeerId, State3), (), Error>) -> Self { +impl From> for OutEvent { + fn from(event: BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>) -> Self { match event { - BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, state3))) => OutEvent::Done { + BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, (swap_id, state3)))) => OutEvent::Done { bob_peer_id, + swap_id, state3, }, BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e }, @@ -27,7 +36,7 @@ impl From> for OutEvent { #[derive(libp2p::NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] pub struct Behaviour { - inner: libp2p_async_await::Behaviour<(PeerId, State3), (), anyhow::Error>, + inner: libp2p_async_await::Behaviour<(PeerId, (Uuid, State3)), (), anyhow::Error>, } impl Default for Behaviour { @@ -45,7 +54,7 @@ impl Behaviour { let message0 = serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) .context("Failed to deserialize message0")?; - let state1 = state0.receive(message0)?; + let (swap_id, state1) = state0.receive(message0)?; substream .write_message( @@ -73,7 +82,7 @@ impl Behaviour { .context("Failed to deserialize message4")?; let state3 = state2.receive(message4)?; - Ok((bob, state3)) + Ok((bob, (swap_id, state3))) }) } } diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 4a6194fb..86810e5b 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -13,6 +13,7 @@ use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use std::fmt; +use uuid::Uuid; #[derive(Debug)] pub enum AliceState { @@ -146,7 +147,7 @@ impl State0 { }) } - pub fn receive(self, msg: Message0) -> Result { + pub fn receive(self, msg: Message0) -> Result<(Uuid, State1)> { let valid = CROSS_CURVE_PROOF_SYSTEM.verify( &msg.dleq_proof_s_b, ( @@ -164,7 +165,7 @@ impl State0 { let v = self.v_a + msg.v_b; - Ok(State1 { + Ok((msg.swap_id, State1 { a: self.a, B: msg.B, s_a: self.s_a, @@ -182,7 +183,7 @@ impl State0 { refund_address: msg.refund_address, redeem_address: self.redeem_address, punish_address: self.punish_address, - }) + })) } } diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 2f2b1c86..da8012e5 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -5,7 +5,6 @@ use crate::env::Config; use crate::protocol::alice; use crate::protocol::alice::event_loop::EventLoopHandle; use crate::protocol::alice::AliceState; -use crate::protocol::alice::AliceState::XmrLockTransferProofSent; use crate::{bitcoin, database, monero}; use anyhow::{bail, Context, Result}; use tokio::select; @@ -16,7 +15,7 @@ pub async fn run(swap: alice::Swap) -> Result { run_until(swap, |_| false).await } -#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id))] +#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id), err)] pub async fn run_until( mut swap: alice::Swap, exit_early: fn(&AliceState) -> bool, @@ -127,7 +126,7 @@ async fn next_state( result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => { result?; - XmrLockTransferProofSent { + AliceState::XmrLockTransferProofSent { monero_wallet_restore_blockheight, transfer_proof, state3, diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 02d539ea..35daa041 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,6 +1,6 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, spot_price, transfer_proof}; +use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{Context, Result}; @@ -12,9 +12,11 @@ use libp2p::{PeerId, Swarm}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use uuid::Uuid; #[allow(missing_debug_implementations)] pub struct EventLoop { + swap_id: Uuid, swarm: libp2p::Swarm, bitcoin_wallet: Arc, alice_peer_id: PeerId, @@ -22,7 +24,7 @@ pub struct EventLoop { // these streams represents outgoing requests that we have to make quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, spot_price_requests: bmrng::RequestReceiverStream, - encrypted_signature_requests: bmrng::RequestReceiverStream, + encrypted_signatures: bmrng::RequestReceiverStream, execution_setup_requests: bmrng::RequestReceiverStream>, // these represents requests that are currently in-flight. @@ -34,7 +36,7 @@ pub struct EventLoop { inflight_execution_setup: Option>>, /// The sender we will use to relay incoming transfer proofs. - transfer_proof: bmrng::RequestSender, + transfer_proof: bmrng::RequestSender, /// The future representing the successful handling of an incoming transfer /// proof. /// @@ -47,6 +49,7 @@ pub struct EventLoop { impl EventLoop { pub fn new( + swap_id: Uuid, swarm: Swarm, alice_peer_id: PeerId, bitcoin_wallet: Arc, @@ -58,12 +61,13 @@ impl EventLoop { let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30)); let event_loop = EventLoop { + swap_id, swarm, alice_peer_id, bitcoin_wallet, execution_setup_requests: execution_setup.1.into(), transfer_proof: transfer_proof.0, - encrypted_signature_requests: encrypted_signature.1.into(), + encrypted_signatures: encrypted_signature.1.into(), spot_price_requests: spot_price.1.into(), quote_requests: quote.1.into(), inflight_spot_price_requests: HashMap::default(), @@ -108,10 +112,20 @@ impl EventLoop { } } SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => { - let mut responder = match self.transfer_proof.send(*msg).await { + if msg.swap_id != self.swap_id { + + // TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps + tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored.", msg.swap_id, self.swap_id); + + // When receiving a transfer proof that is unexpected we still have to acknowledge that it was received + let _ = self.swarm.transfer_proof.send_response(channel, ()); + continue; + } + + let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await { Ok(responder) => responder, - Err(_) => { - tracing::warn!("Failed to pass on transfer proof"); + Err(e) => { + tracing::warn!("Failed to pass on transfer proof: {:#}", e); continue; } }; @@ -180,7 +194,12 @@ impl EventLoop { self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone()); self.inflight_execution_setup = Some(responder); }, - Some((request, responder)) = self.encrypted_signature_requests.next().fuse(), if self.is_connected_to_alice() => { + Some((tx_redeem_encsig, responder)) = self.encrypted_signatures.next().fuse(), if self.is_connected_to_alice() => { + let request = encrypted_signature::Request { + swap_id: self.swap_id, + tx_redeem_encsig + }; + let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request); self.inflight_encrypted_signature_requests.insert(id, responder); }, @@ -202,8 +221,8 @@ impl EventLoop { #[derive(Debug)] pub struct EventLoopHandle { execution_setup: bmrng::RequestSender>, - transfer_proof: bmrng::RequestReceiver, - encrypted_signature: bmrng::RequestSender, + transfer_proof: bmrng::RequestReceiver, + encrypted_signature: bmrng::RequestSender, spot_price: bmrng::RequestSender, quote: bmrng::RequestSender<(), BidQuote>, } @@ -213,8 +232,8 @@ impl EventLoopHandle { self.execution_setup.send_receive(state0).await? } - pub async fn recv_transfer_proof(&mut self) -> Result { - let (request, responder) = self + pub async fn recv_transfer_proof(&mut self) -> Result { + let (transfer_proof, responder) = self .transfer_proof .recv() .await @@ -223,7 +242,7 @@ impl EventLoopHandle { .respond(()) .context("Failed to acknowledge receipt of transfer proof")?; - Ok(request) + Ok(transfer_proof) } pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { @@ -244,7 +263,7 @@ impl EventLoopHandle { ) -> Result<()> { Ok(self .encrypted_signature - .send_receive(encrypted_signature::Request { tx_redeem_encsig }) + .send_receive(tx_redeem_encsig) .await?) } } diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 512b6a25..df184f6e 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use std::fmt; +use uuid::Uuid; #[derive(Debug, Clone)] pub enum BobState { @@ -69,6 +70,7 @@ impl fmt::Display for BobState { #[derive(Clone, Debug, PartialEq)] pub struct State0 { + swap_id: Uuid, b: bitcoin::SecretKey, s_b: monero::Scalar, S_b_monero: monero::PublicKey, @@ -84,7 +86,9 @@ pub struct State0 { } impl State0 { + #[allow(clippy::too_many_arguments)] pub fn new( + swap_id: Uuid, rng: &mut R, btc: bitcoin::Amount, xmr: monero::Amount, @@ -101,6 +105,7 @@ impl State0 { let (dleq_proof_s_b, (S_b_bitcoin, S_b_monero)) = CROSS_CURVE_PROOF_SYSTEM.prove(&s_b, rng); Self { + swap_id, b, s_b, v_b, @@ -120,6 +125,7 @@ impl State0 { pub fn next_message(&self) -> Message0 { Message0 { + swap_id: self.swap_id, B: self.b.public(), S_b_monero: self.S_b_monero, S_b_bitcoin: self.S_b_bitcoin, diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index e926c12d..5a4f1a61 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -8,6 +8,7 @@ use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; use rand::rngs::OsRng; use tokio::select; +use uuid::Uuid; pub fn is_complete(state: &BobState) -> bool { matches!( @@ -32,6 +33,7 @@ pub async fn run_until( while !is_target_state(¤t_state) { current_state = next_state( + swap.swap_id, current_state, &mut swap.event_loop_handle, swap.bitcoin_wallet.as_ref(), @@ -51,6 +53,7 @@ pub async fn run_until( } async fn next_state( + swap_id: Uuid, state: BobState, event_loop_handle: &mut EventLoopHandle, bitcoin_wallet: &bitcoin::Wallet, @@ -65,6 +68,7 @@ async fn next_state( let bitcoin_refund_address = bitcoin_wallet.new_address().await?; let state2 = request_price_and_setup( + swap_id, btc_amount, event_loop_handle, env_config, @@ -103,7 +107,7 @@ async fn next_state( select! { transfer_proof = transfer_proof_watcher => { - let transfer_proof = transfer_proof?.tx_lock_proof; + let transfer_proof = transfer_proof?; tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero"); @@ -244,6 +248,7 @@ async fn next_state( } pub async fn request_price_and_setup( + swap_id: Uuid, btc: bitcoin::Amount, event_loop_handle: &mut EventLoopHandle, env_config: &Config, @@ -254,6 +259,7 @@ pub async fn request_price_and_setup( tracing::info!("Spot price for {} is {}", btc, xmr); let state0 = State0::new( + swap_id, &mut OsRng, btc, xmr, diff --git a/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs b/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs index 5ede52a3..0173b1d0 100644 --- a/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs +++ b/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs @@ -13,6 +13,7 @@ use swap::protocol::{alice, bob}; async fn alice_punishes_after_restart_if_punish_timelock_expired() { harness::setup_test(FastPunishConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -33,11 +34,7 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() { .wait_until_confirmed_with(state3.punish_timelock) .await?; } else { - panic!( - "\ - Alice in unexpected state {}", - alice_state - ); + panic!("Alice in unexpected state {}", alice_state); } ctx.restart_alice().await; @@ -49,7 +46,9 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() { // Restart Bob after Alice punished to ensure Bob transitions to // punished and does not run indefinitely - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); let bob_state = bob::run(bob_swap).await?; diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs index a12d5724..71a902a8 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { harness::setup_test(FastCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -17,7 +18,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; // Ensure cancel timelock is expired if let BobState::BtcLocked(state3) = bob_swap.state.clone() { @@ -43,7 +46,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { .await??; assert!(matches!(state, BobState::BtcCancelled { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. })); // Bob manually refunds diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs index 13793009..49d73a58 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs @@ -10,6 +10,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -18,7 +19,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob tries but fails to manually cancel @@ -35,7 +38,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { assert!(matches!(result, Error::CancelTimelockNotExpiredYet)); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob tries but fails to manually refund @@ -50,7 +55,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { .err() .unwrap(); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); Ok(()) diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs index 8a9d7f66..057db955 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -17,7 +18,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob forces a cancel that will fail @@ -33,7 +36,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { assert!(is_error); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob forces a refund that will fail @@ -48,7 +53,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { .is_err(); assert!(is_error); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); Ok(()) diff --git a/swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs b/swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs new file mode 100644 index 00000000..11ae81b9 --- /dev/null +++ b/swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs @@ -0,0 +1,60 @@ +pub mod harness; + +use harness::bob_run_until::is_xmr_locked; +use harness::SlowCancelConfig; +use swap::protocol::alice::AliceState; +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; + +#[tokio::test] +async fn concurrent_bobs_after_xmr_lock_proof_sent() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await; + + let swap_id = bob_swap_1.swap_id; + + let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_xmr_locked)); + + let alice_swap_1 = ctx.alice_next_swap().await; + let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1)); + + let bob_state_1 = bob_swap_1.await??; + assert!(matches!(bob_state_1, BobState::XmrLocked { .. })); + + // make sure bob_swap_1's event loop is gone + bob_join_handle_1.abort(); + + let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await; + let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2)); + + let alice_swap_2 = ctx.alice_next_swap().await; + let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2)); + + // The 2nd swap should ALWAYS finish successfully in this + // scenario + + let bob_state_2 = bob_swap_2.await??; + assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. })); + + let alice_state_2 = alice_swap_2.await??; + assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. })); + + let (bob_swap_1, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle_2, swap_id) + .await; + assert!(matches!(bob_swap_1.state, BobState::XmrLocked { .. })); + + // The 1st (paused) swap ALWAYS finishes successfully in this + // scenario, because it is ensured that Bob already received the + // transfer proof. + + let bob_state_1 = bob::run(bob_swap_1).await?; + assert!(matches!(bob_state_1, BobState::XmrRedeemed { .. })); + + let alice_state_1 = alice_swap_1.await??; + assert!(matches!(alice_state_1, AliceState::BtcRedeemed { .. })); + + Ok(()) + }) + .await; +} diff --git a/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs b/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs new file mode 100644 index 00000000..ed3efafe --- /dev/null +++ b/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs @@ -0,0 +1,61 @@ +pub mod harness; + +use harness::bob_run_until::is_btc_locked; +use harness::SlowCancelConfig; +use swap::protocol::alice::AliceState; +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; + +#[tokio::test] +async fn concurrent_bobs_before_xmr_lock_proof_sent() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await; + + let swap_id = bob_swap_1.swap_id; + + let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_btc_locked)); + + let alice_swap_1 = ctx.alice_next_swap().await; + let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1)); + + let bob_state_1 = bob_swap_1.await??; + assert!(matches!(bob_state_1, BobState::BtcLocked(_))); + + // make sure bob_swap_1's event loop is gone + bob_join_handle_1.abort(); + + let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await; + let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2)); + + let alice_swap_2 = ctx.alice_next_swap().await; + let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2)); + + // The 2nd swap ALWAYS finish successfully in this + // scenario, but will receive an "unwanted" transfer proof that is ignored in + // the event loop. + + let bob_state_2 = bob_swap_2.await??; + assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. })); + + let alice_state_2 = alice_swap_2.await??; + assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. })); + + let (bob_swap_1, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle_2, swap_id) + .await; + assert!(matches!(bob_state_1, BobState::BtcLocked(_))); + + // The 1st (paused) swap is expected to refund, because the transfer + // proof is delivered to the wrong swap, and we currently don't store it in the + // database for the other swap. + + let bob_state_1 = bob::run(bob_swap_1).await?; + assert!(matches!(bob_state_1, BobState::BtcRefunded { .. })); + + let alice_state_1 = alice_swap_1.await??; + assert!(matches!(alice_state_1, AliceState::XmrRefunded { .. })); + + Ok(()) + }) + .await; +} diff --git a/swap/tests/ensure_same_swap_id.rs b/swap/tests/ensure_same_swap_id.rs new file mode 100644 index 00000000..1dbc7046 --- /dev/null +++ b/swap/tests/ensure_same_swap_id.rs @@ -0,0 +1,21 @@ +pub mod harness; + +use harness::SlowCancelConfig; +use swap::protocol::bob; + +#[tokio::test] +async fn ensure_same_swap_id_for_alice_and_bob() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, _) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; + let _ = tokio::spawn(bob::run(bob_swap)); + + // once Bob's swap is spawned we can retrieve Alice's swap and assert on the + // swap ID + let alice_swap = ctx.alice_next_swap().await; + assert_eq!(alice_swap.swap_id, bob_swap_id); + + Ok(()) + }) + .await; +} diff --git a/swap/tests/happy_path_restart_bob_after_xmr_locked.rs b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs index 9967c117..8e18f5ed 100644 --- a/swap/tests/happy_path_restart_bob_after_xmr_locked.rs +++ b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { assert!(matches!(bob_state, BobState::XmrLocked { .. })); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); let bob_state = bob::run(bob_swap).await?; diff --git a/swap/tests/happy_path_restart_bob_before_xmr_locked.rs b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs index 9967c117..8e18f5ed 100644 --- a/swap/tests/happy_path_restart_bob_before_xmr_locked.rs +++ b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { assert!(matches!(bob_state, BobState::XmrLocked { .. })); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); let bob_state = bob::run(bob_swap).await?; diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index 8229d314..bdbf3bd9 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -43,13 +43,52 @@ const BITCOIN_TEST_WALLET_NAME: &str = "testwallet"; #[derive(Debug, Clone)] pub struct StartingBalances { pub xmr: monero::Amount, + pub xmr_outputs: Vec, pub btc: bitcoin::Amount, } +impl StartingBalances { + /// If monero_outputs is specified the monero balance will be: + /// monero_outputs * new_xmr = self_xmr + pub fn new(btc: bitcoin::Amount, xmr: monero::Amount, monero_outputs: Option) -> Self { + match monero_outputs { + None => { + if xmr == monero::Amount::ZERO { + return Self { + xmr, + xmr_outputs: vec![], + btc, + }; + } + + Self { + xmr, + xmr_outputs: vec![xmr], + btc, + } + } + Some(outputs) => { + let mut xmr_outputs = Vec::new(); + let mut sum_xmr = monero::Amount::ZERO; + + for _ in 0..outputs { + xmr_outputs.push(xmr); + sum_xmr = sum_xmr + xmr; + } + + Self { + xmr: sum_xmr, + xmr_outputs, + btc, + } + } + } + } +} + struct BobParams { seed: Seed, db_path: PathBuf, - swap_id: Uuid, bitcoin_wallet: Arc, monero_wallet: Arc, alice_address: Multiaddr, @@ -58,12 +97,16 @@ struct BobParams { } impl BobParams { - pub async fn builder(&self, event_loop_handle: bob::EventLoopHandle) -> Result { + pub async fn builder( + &self, + event_loop_handle: bob::EventLoopHandle, + swap_id: Uuid, + ) -> Result { let receive_address = self.monero_wallet.get_main_address(); Ok(bob::Builder::new( Database::open(&self.db_path.clone().as_path()).unwrap(), - self.swap_id, + swap_id, self.bitcoin_wallet.clone(), self.monero_wallet.clone(), self.env_config, @@ -72,11 +115,16 @@ impl BobParams { )) } - pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { + pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { let mut swarm = swarm::new::(&self.seed)?; swarm.add_address(self.alice_peer_id, self.alice_address.clone()); - bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone()) + bob::EventLoop::new( + swap_id, + swarm, + self.alice_peer_id, + self.bitcoin_wallet.clone(), + ) } } @@ -139,24 +187,28 @@ impl TestContext { } pub async fn alice_next_swap(&mut self) -> alice::Swap { - timeout(Duration::from_secs(10), self.alice_swap_handle.recv()) + timeout(Duration::from_secs(20), self.alice_swap_handle.recv()) .await - .expect("No Alice swap within 10 seconds, aborting because this test is waiting for a swap forever...") + .expect("No Alice swap within 20 seconds, aborting because this test is likely waiting for a swap forever...") .unwrap() } pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) { - let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); + let swap_id = Uuid::new_v4(); + let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap(); let swap = self .bob_params - .builder(event_loop_handle) + .builder(event_loop_handle, swap_id) .await .unwrap() .with_init_params(self.btc_amount) .build() .unwrap(); + // ensure the wallet is up to date for concurrent swap tests + swap.bitcoin_wallet.sync().await.unwrap(); + let join_handle = tokio::spawn(event_loop.run()); (swap, BobApplicationHandle(join_handle)) @@ -165,14 +217,15 @@ impl TestContext { pub async fn stop_and_resume_bob_from_db( &mut self, join_handle: BobApplicationHandle, + swap_id: Uuid, ) -> (bob::Swap, BobApplicationHandle) { join_handle.abort(); - let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); + let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap(); let swap = self .bob_params - .builder(event_loop_handle) + .builder(event_loop_handle, swap_id) .await .unwrap() .build() @@ -489,14 +542,13 @@ where let env_config = C::get_config(); let (monero, containers) = harness::init_containers(&cli).await; + monero.init_miner().await.unwrap(); let btc_amount = bitcoin::Amount::from_sat(1_000_000); let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap(); - let alice_starting_balances = StartingBalances { - xmr: xmr_amount * 10, - btc: bitcoin::Amount::ZERO, - }; + let alice_starting_balances = + StartingBalances::new(bitcoin::Amount::ZERO, xmr_amount, Some(10)); let electrs_rpc_port = containers .electrs @@ -532,10 +584,7 @@ where ); let bob_seed = Seed::random().unwrap(); - let bob_starting_balances = StartingBalances { - xmr: monero::Amount::ZERO, - btc: btc_amount * 10, - }; + let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None); let (bob_bitcoin_wallet, bob_monero_wallet) = init_test_wallets( MONERO_WALLET_NAME_BOB, @@ -552,7 +601,6 @@ where let bob_params = BobParams { seed: Seed::random().unwrap(), db_path: tempdir().unwrap().path().to_path_buf(), - swap_id: Uuid::new_v4(), bitcoin_wallet: bob_bitcoin_wallet.clone(), monero_wallet: bob_monero_wallet.clone(), alice_address: alice_listen_address.clone(), @@ -560,6 +608,8 @@ where env_config, }; + monero.start_miner().await.unwrap(); + let test = TestContext { env_config, btc_amount, @@ -774,7 +824,14 @@ async fn init_test_wallets( env_config: Config, ) -> (Arc, Arc) { monero - .init(vec![(name, starting_balances.xmr.as_piconero())]) + .init_wallet( + name, + starting_balances + .xmr_outputs + .into_iter() + .map(|amount| amount.as_piconero()) + .collect(), + ) .await .unwrap(); diff --git a/swap/tests/punish.rs b/swap/tests/punish.rs index f058dc79..bf053f5b 100644 --- a/swap/tests/punish.rs +++ b/swap/tests/punish.rs @@ -11,6 +11,7 @@ use swap::protocol::{alice, bob}; async fn alice_punishes_if_bob_never_acts_after_fund() { harness::setup_test(FastPunishConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -24,7 +25,9 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { // Restart Bob after Alice punished to ensure Bob transitions to // punished and does not run indefinitely - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); let bob_state = bob::run(bob_swap).await?;