From c976358c374184c62ea3e08b0d55637f2b0a9899 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 8 Apr 2021 18:56:26 +1000 Subject: [PATCH] Multiple swaps with the same peer - Swap-id is exchanged during execution setup. CLI (Bob) sends the swap-id to be used in his first message. - Transfer poof and encryption signature messages include the swap-id so it can be properly associated with the correct swap. - ASB: Encryption signatures are associated with swaps by swap-id, not peer-id. - ASB: Transfer proofs are still associated to peer-ids (because they have to be sent to the respective peer), but the ASB can buffer multiple - CLI: Incoming transfer proofs are checked for matching swap-id. If a transfer proof with a different swap-id than the current executing swap is received it will be ignored. We can change this to saving into the database. Includes concurrent swap tests with the same Bob. - One test that pauses and starts an additional swap after the transfer proof was received. Results in both swaps being redeemed after resuming the first swap. - One test that pauses and starts an additional swap before the transfer proof is sent (just after BTC locked). Results in the second swap redeeming and the first swap being refunded (because the transfer proof on Bob's side is lost). Once we store transfer proofs that we receive during executing a different swap into the database both swaps should redeem. Note that the monero harness was adapted to allow creating wallets with multiple outputs, which is needed for Alice. --- .github/workflows/ci.yml | 5 +- CHANGELOG.md | 6 ++ bors.toml | 5 +- monero-harness/src/lib.rs | 36 +++++-- monero-harness/tests/monerod.rs | 2 +- monero-harness/tests/wallet.rs | 9 +- swap/src/bin/swap.rs | 6 +- swap/src/network/encrypted_signature.rs | 2 + swap/src/network/transfer_proof.rs | 2 + swap/src/protocol.rs | 8 +- swap/src/protocol/alice/behaviour.rs | 4 + swap/src/protocol/alice/event_loop.rs | 59 ++++++----- swap/src/protocol/alice/execution_setup.rs | 25 +++-- swap/src/protocol/alice/state.rs | 7 +- swap/src/protocol/alice/swap.rs | 5 +- swap/src/protocol/bob/event_loop.rs | 47 ++++++--- swap/src/protocol/bob/state.rs | 6 ++ swap/src/protocol/bob/swap.rs | 8 +- ...s_after_restart_punish_timelock_expired.rs | 11 +-- ...refunds_using_cancel_and_refund_command.rs | 9 +- ...and_refund_command_timelock_not_expired.rs | 13 ++- ...fund_command_timelock_not_expired_force.rs | 13 ++- ...ncurrent_bobs_after_xmr_lock_proof_sent.rs | 60 +++++++++++ ...current_bobs_before_xmr_lock_proof_sent.rs | 61 ++++++++++++ swap/tests/ensure_same_swap_id.rs | 21 ++++ ...happy_path_restart_bob_after_xmr_locked.rs | 5 +- ...appy_path_restart_bob_before_xmr_locked.rs | 5 +- swap/tests/harness/mod.rs | 99 +++++++++++++++---- swap/tests/punish.rs | 5 +- 29 files changed, 430 insertions(+), 114 deletions(-) create mode 100644 swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs create mode 100644 swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs create mode 100644 swap/tests/ensure_same_swap_id.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8e557eff..51d1a674 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -116,7 +116,10 @@ jobs: bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, punish, alice_punishes_after_restart_punish_timelock_expired, - alice_refunds_after_restart_bob_refunded + alice_refunds_after_restart_bob_refunded, + ensure_same_swap_id, + concurrent_bobs_after_xmr_lock_proof_sent, + concurrent_bobs_before_xmr_lock_proof_sent ] runs-on: ubuntu-latest steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b986773..fe44ac8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Allow multiple concurrent swaps with the same peer on the ASB. + This is a breaking change because the swap ID is now agreed upon between CLI and ASB during swap setup. + Resuming swaps started prior to this change can result in unexpected behaviour. + ## [0.4.0] - 2021-04-06 ### Changed diff --git a/bors.toml b/bors.toml index 6d83fcd2..04973080 100644 --- a/bors.toml +++ b/bors.toml @@ -15,5 +15,8 @@ status = [ "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", "docker_tests (punish)", "docker_tests (alice_punishes_after_restart_punish_timelock_expired)", - "docker_tests (alice_refunds_after_restart_bob_refunded)" + "docker_tests (alice_refunds_after_restart_bob_refunded)", + "docker_tests (ensure_same_swap_id)", + "docker_tests (concurrent_bobs_after_xmr_lock_proof_sent)", + "docker_tests (concurrent_bobs_before_xmr_lock_proof_sent)" ] diff --git a/monero-harness/src/lib.rs b/monero-harness/src/lib.rs index 93a1579d..48615b92 100644 --- a/monero-harness/src/lib.rs +++ b/monero-harness/src/lib.rs @@ -98,7 +98,7 @@ impl<'c> Monero { Ok(wallet) } - pub async fn init(&self, wallet_amount: Vec<(&str, u64)>) -> Result<()> { + pub async fn init_miner(&self) -> Result<()> { let miner_wallet = self.wallet("miner")?; let miner_address = miner_wallet.address().await?.address; @@ -108,17 +108,34 @@ impl<'c> Monero { tracing::info!("Generated {:?} blocks", res.blocks.len()); miner_wallet.refresh().await?; - for (wallet, amount) in wallet_amount.iter() { - if *amount > 0 { - let wallet = self.wallet(wallet)?; - let address = wallet.address().await?.address; - miner_wallet.transfer(&address, *amount).await?; + Ok(()) + } + + pub async fn init_wallet(&self, name: &str, amount_in_outputs: Vec) -> Result<()> { + let miner_wallet = self.wallet("miner")?; + let miner_address = miner_wallet.address().await?.address; + let monerod = &self.monerod; + + let wallet = self.wallet(name)?; + let address = wallet.address().await?.address; + + for amount in amount_in_outputs { + if amount > 0 { + miner_wallet.transfer(&address, amount).await?; tracing::info!("Funded {} wallet with {}", wallet.name, amount); monerod.client().generate_blocks(10, &miner_address).await?; wallet.refresh().await?; } } + Ok(()) + } + + pub async fn start_miner(&self) -> Result<()> { + let miner_wallet = self.wallet("miner")?; + let miner_address = miner_wallet.address().await?.address; + let monerod = &self.monerod; + monerod.start_miner(&miner_address).await?; tracing::info!("Waiting for miner wallet to catch up..."); @@ -130,6 +147,13 @@ impl<'c> Monero { Ok(()) } + + pub async fn init_and_start_miner(&self) -> Result<()> { + self.init_miner().await?; + self.start_miner().await?; + + Ok(()) + } } fn random_prefix() -> String { diff --git a/monero-harness/tests/monerod.rs b/monero-harness/tests/monerod.rs index 40901b50..257e888d 100644 --- a/monero-harness/tests/monerod.rs +++ b/monero-harness/tests/monerod.rs @@ -14,7 +14,7 @@ async fn init_miner_and_mine_to_miner_address() { let tc = Cli::default(); let (monero, _monerod_container) = Monero::new(&tc, vec![]).await.unwrap(); - monero.init(vec![]).await.unwrap(); + monero.init_and_start_miner().await.unwrap(); let monerod = monero.monerod(); let miner_wallet = monero.wallet("miner").unwrap(); diff --git a/monero-harness/tests/wallet.rs b/monero-harness/tests/wallet.rs index 15fb3698..a3f4e52e 100644 --- a/monero-harness/tests/wallet.rs +++ b/monero-harness/tests/wallet.rs @@ -22,11 +22,10 @@ async fn fund_transfer_and_check_tx_key() { let alice_wallet = monero.wallet("alice").unwrap(); let bob_wallet = monero.wallet("bob").unwrap(); - // fund alice - monero - .init(vec![("alice", fund_alice), ("bob", fund_bob)]) - .await - .unwrap(); + monero.init_miner().await.unwrap(); + monero.init_wallet("alice", vec![fund_alice]).await.unwrap(); + monero.init_wallet("bob", vec![fund_bob]).await.unwrap(); + monero.start_miner().await.unwrap(); // check alice balance let got_alice_balance = alice_wallet.balance().await.unwrap(); diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index ed74a4c0..c881c21a 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -107,8 +107,9 @@ async fn main() -> Result<()> { let mut swarm = swarm::new::(&seed)?; swarm.add_address(alice_peer_id, alice_multiaddr); + let swap_id = Uuid::new_v4(); let (event_loop, mut event_loop_handle) = - EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; + EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?; let event_loop = tokio::spawn(event_loop.run()); let send_bitcoin = determine_btc_to_swap( @@ -128,7 +129,6 @@ async fn main() -> Result<()> { ) .await?; - let swap_id = Uuid::new_v4(); db.insert_peer_id(swap_id, alice_peer_id).await?; let swap = Builder::new( @@ -190,7 +190,7 @@ async fn main() -> Result<()> { swarm.add_address(alice_peer_id, alice_multiaddr); let (event_loop, event_loop_handle) = - EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; + EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?; let handle = tokio::spawn(event_loop.run()); let swap = Builder::new( diff --git a/swap/src/network/encrypted_signature.rs b/swap/src/network/encrypted_signature.rs index 1f5d94f9..3930e3eb 100644 --- a/swap/src/network/encrypted_signature.rs +++ b/swap/src/network/encrypted_signature.rs @@ -5,6 +5,7 @@ use libp2p::request_response::{ RequestResponseMessage, }; use serde::{Deserialize, Serialize}; +use uuid::Uuid; pub type OutEvent = RequestResponseEvent; @@ -19,6 +20,7 @@ impl ProtocolName for EncryptedSignatureProtocol { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Request { + pub swap_id: Uuid, pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, } diff --git a/swap/src/network/transfer_proof.rs b/swap/src/network/transfer_proof.rs index 3cf83adb..05d7ae7d 100644 --- a/swap/src/network/transfer_proof.rs +++ b/swap/src/network/transfer_proof.rs @@ -6,6 +6,7 @@ use libp2p::request_response::{ RequestResponseMessage, }; use serde::{Deserialize, Serialize}; +use uuid::Uuid; pub type OutEvent = RequestResponseEvent; @@ -20,6 +21,7 @@ impl ProtocolName for TransferProofProtocol { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Request { + pub swap_id: Uuid, pub tx_lock_proof: monero::TransferProof, } diff --git a/swap/src/protocol.rs b/swap/src/protocol.rs index fb3a35be..58b616b3 100644 --- a/swap/src/protocol.rs +++ b/swap/src/protocol.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof}; use sigma_fun::HashTranscript; +use uuid::Uuid; pub mod alice; pub mod bob; @@ -18,14 +19,9 @@ pub static CROSS_CURVE_PROOF_SYSTEM: Lazy< ) }); -#[derive(Debug, Copy, Clone)] -pub struct StartingBalances { - pub xmr: monero::Amount, - pub btc: bitcoin::Amount, -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message0 { + swap_id: Uuid, B: bitcoin::PublicKey, S_b_monero: monero::PublicKey, S_b_bitcoin: bitcoin::PublicKey, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 9ecaf49d..d5d59c3d 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -6,6 +6,7 @@ use libp2p::request_response::{ RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; use libp2p::{NetworkBehaviour, PeerId}; +use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { @@ -20,6 +21,7 @@ pub enum OutEvent { }, ExecutionSetupDone { bob_peer_id: PeerId, + swap_id: Uuid, state3: Box, }, TransferProofAcknowledged { @@ -157,9 +159,11 @@ impl From for OutEvent { match event { Done { bob_peer_id, + swap_id, state3, } => OutEvent::ExecutionSetupDone { bob_peer_id, + swap_id, state3: Box::new(state3), }, Failure { peer, error } => OutEvent::Failure { peer, error }, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index cfe20f14..930a435c 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,7 +3,7 @@ use crate::database::Database; use crate::env::Config; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, spot_price, transfer_proof}; +use crate::network::{spot_price, transfer_proof}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; @@ -43,9 +43,8 @@ pub struct EventLoop { swap_sender: mpsc::Sender, - /// Stores a sender per peer for incoming [`EncryptedSignature`]s. - recv_encrypted_signature: - HashMap>, + /// Stores incoming [`EncryptedSignature`]s per swap. + recv_encrypted_signature: HashMap>, inflight_encrypted_signatures: FuturesUnordered>>, send_transfer_proof: FuturesUnordered, @@ -120,7 +119,7 @@ where } }; - let handle = self.new_handle(peer_id); + let handle = self.new_handle(peer_id, swap_id); let swap = Swap { event_loop_handle: handle, @@ -186,25 +185,26 @@ where tracing::debug!(%peer, "Failed to respond with quote"); } } - SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { - let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, swap_id, state3}) => { + let _ = self.handle_execution_setup_done(bob_peer_id, swap_id, *state3).await; } SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => { - tracing::trace!(%peer, "Bob acknowledged transfer proof"); + tracing::debug!(%peer, "Bob acknowledged transfer proof"); if let Some(responder) = self.inflight_transfer_proofs.remove(&id) { let _ = responder.respond(()); } } SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { - let sender = match self.recv_encrypted_signature.remove(&peer) { + let sender = match self.recv_encrypted_signature.remove(&msg.swap_id) { Some(sender) => sender, None => { + // TODO: Don't just drop encsig if we currently don't have a running swap for it, save in db tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?"); continue; } }; - let mut responder = match sender.send(*msg).await { + let mut responder = match sender.send(msg.tx_redeem_encsig).await { Ok(responder) => responder, Err(_) => { tracing::warn!(%peer, "Failed to relay encrypted signature to swap"); @@ -262,8 +262,8 @@ where let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof); self.inflight_transfer_proofs.insert(id, responder); }, - Some(Err(_)) => { - tracing::debug!("A swap stopped without sending a transfer proof"); + Some(Err(e)) => { + tracing::debug!("A swap stopped without sending a transfer proof: {:#}", e); } None => { unreachable!("stream of transfer proof receivers must never terminate") @@ -319,9 +319,13 @@ where }) } - async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { - let swap_id = Uuid::new_v4(); - let handle = self.new_handle(bob_peer_id); + async fn handle_execution_setup_done( + &mut self, + bob_peer_id: PeerId, + swap_id: Uuid, + state3: State3, + ) { + let handle = self.new_handle(bob_peer_id, swap_id); let initial_state = AliceState::Started { state3: Box::new(state3), @@ -337,7 +341,7 @@ where swap_id, }; - // TODO: Consider adding separate components for start/rsume of swaps + // TODO: Consider adding separate components for start/resume of swaps // swaps save peer id so we can resume match self.db.insert_peer_id(swap_id, bob_peer_id).await { @@ -354,17 +358,24 @@ where /// Create a new [`EventLoopHandle`] that is scoped for communication with /// the given peer. - fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { + fn new_handle(&mut self, peer: PeerId, swap_id: Uuid) -> EventLoopHandle { // we deliberately don't put timeouts on these channels because the swap always // races these futures against a timelock + let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1); let encrypted_signature = bmrng::channel(1); self.recv_encrypted_signature - .insert(peer, encrypted_signature.0); + .insert(swap_id, encrypted_signature.0); + self.send_transfer_proof.push( async move { - let (request, responder) = transfer_proof_receiver.recv().await?; + let (transfer_proof, responder) = transfer_proof_receiver.recv().await?; + + let request = transfer_proof::Request { + swap_id, + tx_lock_proof: transfer_proof, + }; Ok((peer, request, responder)) } @@ -442,13 +453,13 @@ impl LatestRate for KrakenRate { #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: Option>, - send_transfer_proof: Option>, + recv_encrypted_signature: Option>, + send_transfer_proof: Option>, } impl EventLoopHandle { pub async fn recv_encrypted_signature(&mut self) -> Result { - let (request, responder) = self + let (tx_redeem_encsig, responder) = self .recv_encrypted_signature .take() .context("Encrypted signature was already received")? @@ -459,14 +470,14 @@ impl EventLoopHandle { .respond(()) .context("Failed to acknowledge receipt of encrypted signature")?; - Ok(request.tx_redeem_encsig) + Ok(tx_redeem_encsig) } pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { self.send_transfer_proof .take() .context("Transfer proof was already sent")? - .send_receive(transfer_proof::Request { tx_lock_proof: msg }) + .send_receive(msg) .await .context("Failed to send transfer proof")?; diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index a4b04aa0..4c576b0b 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -4,18 +4,27 @@ use crate::protocol::{Message0, Message2, Message4}; use anyhow::{Context, Error}; use libp2p::PeerId; use libp2p_async_await::BehaviourOutEvent; +use uuid::Uuid; #[derive(Debug)] pub enum OutEvent { - Done { bob_peer_id: PeerId, state3: State3 }, - Failure { peer: PeerId, error: Error }, + Done { + bob_peer_id: PeerId, + swap_id: Uuid, + state3: State3, + }, + Failure { + peer: PeerId, + error: Error, + }, } -impl From> for OutEvent { - fn from(event: BehaviourOutEvent<(PeerId, State3), (), Error>) -> Self { +impl From> for OutEvent { + fn from(event: BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>) -> Self { match event { - BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, state3))) => OutEvent::Done { + BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, (swap_id, state3)))) => OutEvent::Done { bob_peer_id, + swap_id, state3, }, BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e }, @@ -27,7 +36,7 @@ impl From> for OutEvent { #[derive(libp2p::NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] pub struct Behaviour { - inner: libp2p_async_await::Behaviour<(PeerId, State3), (), anyhow::Error>, + inner: libp2p_async_await::Behaviour<(PeerId, (Uuid, State3)), (), anyhow::Error>, } impl Default for Behaviour { @@ -45,7 +54,7 @@ impl Behaviour { let message0 = serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) .context("Failed to deserialize message0")?; - let state1 = state0.receive(message0)?; + let (swap_id, state1) = state0.receive(message0)?; substream .write_message( @@ -73,7 +82,7 @@ impl Behaviour { .context("Failed to deserialize message4")?; let state3 = state2.receive(message4)?; - Ok((bob, state3)) + Ok((bob, (swap_id, state3))) }) } } diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 4a6194fb..86810e5b 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -13,6 +13,7 @@ use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use std::fmt; +use uuid::Uuid; #[derive(Debug)] pub enum AliceState { @@ -146,7 +147,7 @@ impl State0 { }) } - pub fn receive(self, msg: Message0) -> Result { + pub fn receive(self, msg: Message0) -> Result<(Uuid, State1)> { let valid = CROSS_CURVE_PROOF_SYSTEM.verify( &msg.dleq_proof_s_b, ( @@ -164,7 +165,7 @@ impl State0 { let v = self.v_a + msg.v_b; - Ok(State1 { + Ok((msg.swap_id, State1 { a: self.a, B: msg.B, s_a: self.s_a, @@ -182,7 +183,7 @@ impl State0 { refund_address: msg.refund_address, redeem_address: self.redeem_address, punish_address: self.punish_address, - }) + })) } } diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 2f2b1c86..da8012e5 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -5,7 +5,6 @@ use crate::env::Config; use crate::protocol::alice; use crate::protocol::alice::event_loop::EventLoopHandle; use crate::protocol::alice::AliceState; -use crate::protocol::alice::AliceState::XmrLockTransferProofSent; use crate::{bitcoin, database, monero}; use anyhow::{bail, Context, Result}; use tokio::select; @@ -16,7 +15,7 @@ pub async fn run(swap: alice::Swap) -> Result { run_until(swap, |_| false).await } -#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id))] +#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id), err)] pub async fn run_until( mut swap: alice::Swap, exit_early: fn(&AliceState) -> bool, @@ -127,7 +126,7 @@ async fn next_state( result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => { result?; - XmrLockTransferProofSent { + AliceState::XmrLockTransferProofSent { monero_wallet_restore_blockheight, transfer_proof, state3, diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 02d539ea..35daa041 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,6 +1,6 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, spot_price, transfer_proof}; +use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{Context, Result}; @@ -12,9 +12,11 @@ use libp2p::{PeerId, Swarm}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use uuid::Uuid; #[allow(missing_debug_implementations)] pub struct EventLoop { + swap_id: Uuid, swarm: libp2p::Swarm, bitcoin_wallet: Arc, alice_peer_id: PeerId, @@ -22,7 +24,7 @@ pub struct EventLoop { // these streams represents outgoing requests that we have to make quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, spot_price_requests: bmrng::RequestReceiverStream, - encrypted_signature_requests: bmrng::RequestReceiverStream, + encrypted_signatures: bmrng::RequestReceiverStream, execution_setup_requests: bmrng::RequestReceiverStream>, // these represents requests that are currently in-flight. @@ -34,7 +36,7 @@ pub struct EventLoop { inflight_execution_setup: Option>>, /// The sender we will use to relay incoming transfer proofs. - transfer_proof: bmrng::RequestSender, + transfer_proof: bmrng::RequestSender, /// The future representing the successful handling of an incoming transfer /// proof. /// @@ -47,6 +49,7 @@ pub struct EventLoop { impl EventLoop { pub fn new( + swap_id: Uuid, swarm: Swarm, alice_peer_id: PeerId, bitcoin_wallet: Arc, @@ -58,12 +61,13 @@ impl EventLoop { let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30)); let event_loop = EventLoop { + swap_id, swarm, alice_peer_id, bitcoin_wallet, execution_setup_requests: execution_setup.1.into(), transfer_proof: transfer_proof.0, - encrypted_signature_requests: encrypted_signature.1.into(), + encrypted_signatures: encrypted_signature.1.into(), spot_price_requests: spot_price.1.into(), quote_requests: quote.1.into(), inflight_spot_price_requests: HashMap::default(), @@ -108,10 +112,20 @@ impl EventLoop { } } SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => { - let mut responder = match self.transfer_proof.send(*msg).await { + if msg.swap_id != self.swap_id { + + // TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps + tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored.", msg.swap_id, self.swap_id); + + // When receiving a transfer proof that is unexpected we still have to acknowledge that it was received + let _ = self.swarm.transfer_proof.send_response(channel, ()); + continue; + } + + let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await { Ok(responder) => responder, - Err(_) => { - tracing::warn!("Failed to pass on transfer proof"); + Err(e) => { + tracing::warn!("Failed to pass on transfer proof: {:#}", e); continue; } }; @@ -180,7 +194,12 @@ impl EventLoop { self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone()); self.inflight_execution_setup = Some(responder); }, - Some((request, responder)) = self.encrypted_signature_requests.next().fuse(), if self.is_connected_to_alice() => { + Some((tx_redeem_encsig, responder)) = self.encrypted_signatures.next().fuse(), if self.is_connected_to_alice() => { + let request = encrypted_signature::Request { + swap_id: self.swap_id, + tx_redeem_encsig + }; + let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request); self.inflight_encrypted_signature_requests.insert(id, responder); }, @@ -202,8 +221,8 @@ impl EventLoop { #[derive(Debug)] pub struct EventLoopHandle { execution_setup: bmrng::RequestSender>, - transfer_proof: bmrng::RequestReceiver, - encrypted_signature: bmrng::RequestSender, + transfer_proof: bmrng::RequestReceiver, + encrypted_signature: bmrng::RequestSender, spot_price: bmrng::RequestSender, quote: bmrng::RequestSender<(), BidQuote>, } @@ -213,8 +232,8 @@ impl EventLoopHandle { self.execution_setup.send_receive(state0).await? } - pub async fn recv_transfer_proof(&mut self) -> Result { - let (request, responder) = self + pub async fn recv_transfer_proof(&mut self) -> Result { + let (transfer_proof, responder) = self .transfer_proof .recv() .await @@ -223,7 +242,7 @@ impl EventLoopHandle { .respond(()) .context("Failed to acknowledge receipt of transfer proof")?; - Ok(request) + Ok(transfer_proof) } pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { @@ -244,7 +263,7 @@ impl EventLoopHandle { ) -> Result<()> { Ok(self .encrypted_signature - .send_receive(encrypted_signature::Request { tx_redeem_encsig }) + .send_receive(tx_redeem_encsig) .await?) } } diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 512b6a25..df184f6e 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use std::fmt; +use uuid::Uuid; #[derive(Debug, Clone)] pub enum BobState { @@ -69,6 +70,7 @@ impl fmt::Display for BobState { #[derive(Clone, Debug, PartialEq)] pub struct State0 { + swap_id: Uuid, b: bitcoin::SecretKey, s_b: monero::Scalar, S_b_monero: monero::PublicKey, @@ -84,7 +86,9 @@ pub struct State0 { } impl State0 { + #[allow(clippy::too_many_arguments)] pub fn new( + swap_id: Uuid, rng: &mut R, btc: bitcoin::Amount, xmr: monero::Amount, @@ -101,6 +105,7 @@ impl State0 { let (dleq_proof_s_b, (S_b_bitcoin, S_b_monero)) = CROSS_CURVE_PROOF_SYSTEM.prove(&s_b, rng); Self { + swap_id, b, s_b, v_b, @@ -120,6 +125,7 @@ impl State0 { pub fn next_message(&self) -> Message0 { Message0 { + swap_id: self.swap_id, B: self.b.public(), S_b_monero: self.S_b_monero, S_b_bitcoin: self.S_b_bitcoin, diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index e926c12d..5a4f1a61 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -8,6 +8,7 @@ use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; use rand::rngs::OsRng; use tokio::select; +use uuid::Uuid; pub fn is_complete(state: &BobState) -> bool { matches!( @@ -32,6 +33,7 @@ pub async fn run_until( while !is_target_state(¤t_state) { current_state = next_state( + swap.swap_id, current_state, &mut swap.event_loop_handle, swap.bitcoin_wallet.as_ref(), @@ -51,6 +53,7 @@ pub async fn run_until( } async fn next_state( + swap_id: Uuid, state: BobState, event_loop_handle: &mut EventLoopHandle, bitcoin_wallet: &bitcoin::Wallet, @@ -65,6 +68,7 @@ async fn next_state( let bitcoin_refund_address = bitcoin_wallet.new_address().await?; let state2 = request_price_and_setup( + swap_id, btc_amount, event_loop_handle, env_config, @@ -103,7 +107,7 @@ async fn next_state( select! { transfer_proof = transfer_proof_watcher => { - let transfer_proof = transfer_proof?.tx_lock_proof; + let transfer_proof = transfer_proof?; tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero"); @@ -244,6 +248,7 @@ async fn next_state( } pub async fn request_price_and_setup( + swap_id: Uuid, btc: bitcoin::Amount, event_loop_handle: &mut EventLoopHandle, env_config: &Config, @@ -254,6 +259,7 @@ pub async fn request_price_and_setup( tracing::info!("Spot price for {} is {}", btc, xmr); let state0 = State0::new( + swap_id, &mut OsRng, btc, xmr, diff --git a/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs b/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs index 5ede52a3..0173b1d0 100644 --- a/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs +++ b/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs @@ -13,6 +13,7 @@ use swap::protocol::{alice, bob}; async fn alice_punishes_after_restart_if_punish_timelock_expired() { harness::setup_test(FastPunishConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -33,11 +34,7 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() { .wait_until_confirmed_with(state3.punish_timelock) .await?; } else { - panic!( - "\ - Alice in unexpected state {}", - alice_state - ); + panic!("Alice in unexpected state {}", alice_state); } ctx.restart_alice().await; @@ -49,7 +46,9 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() { // Restart Bob after Alice punished to ensure Bob transitions to // punished and does not run indefinitely - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); let bob_state = bob::run(bob_swap).await?; diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs index a12d5724..71a902a8 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { harness::setup_test(FastCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -17,7 +18,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; // Ensure cancel timelock is expired if let BobState::BtcLocked(state3) = bob_swap.state.clone() { @@ -43,7 +46,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { .await??; assert!(matches!(state, BobState::BtcCancelled { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. })); // Bob manually refunds diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs index 13793009..49d73a58 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs @@ -10,6 +10,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -18,7 +19,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob tries but fails to manually cancel @@ -35,7 +38,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { assert!(matches!(result, Error::CancelTimelockNotExpiredYet)); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob tries but fails to manually refund @@ -50,7 +55,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { .err() .unwrap(); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); Ok(()) diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs index 8a9d7f66..057db955 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -17,7 +18,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob forces a cancel that will fail @@ -33,7 +36,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { assert!(is_error); - let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); // Bob forces a refund that will fail @@ -48,7 +53,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { .is_err(); assert!(is_error); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); Ok(()) diff --git a/swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs b/swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs new file mode 100644 index 00000000..11ae81b9 --- /dev/null +++ b/swap/tests/concurrent_bobs_after_xmr_lock_proof_sent.rs @@ -0,0 +1,60 @@ +pub mod harness; + +use harness::bob_run_until::is_xmr_locked; +use harness::SlowCancelConfig; +use swap::protocol::alice::AliceState; +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; + +#[tokio::test] +async fn concurrent_bobs_after_xmr_lock_proof_sent() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await; + + let swap_id = bob_swap_1.swap_id; + + let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_xmr_locked)); + + let alice_swap_1 = ctx.alice_next_swap().await; + let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1)); + + let bob_state_1 = bob_swap_1.await??; + assert!(matches!(bob_state_1, BobState::XmrLocked { .. })); + + // make sure bob_swap_1's event loop is gone + bob_join_handle_1.abort(); + + let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await; + let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2)); + + let alice_swap_2 = ctx.alice_next_swap().await; + let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2)); + + // The 2nd swap should ALWAYS finish successfully in this + // scenario + + let bob_state_2 = bob_swap_2.await??; + assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. })); + + let alice_state_2 = alice_swap_2.await??; + assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. })); + + let (bob_swap_1, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle_2, swap_id) + .await; + assert!(matches!(bob_swap_1.state, BobState::XmrLocked { .. })); + + // The 1st (paused) swap ALWAYS finishes successfully in this + // scenario, because it is ensured that Bob already received the + // transfer proof. + + let bob_state_1 = bob::run(bob_swap_1).await?; + assert!(matches!(bob_state_1, BobState::XmrRedeemed { .. })); + + let alice_state_1 = alice_swap_1.await??; + assert!(matches!(alice_state_1, AliceState::BtcRedeemed { .. })); + + Ok(()) + }) + .await; +} diff --git a/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs b/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs new file mode 100644 index 00000000..ed3efafe --- /dev/null +++ b/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs @@ -0,0 +1,61 @@ +pub mod harness; + +use harness::bob_run_until::is_btc_locked; +use harness::SlowCancelConfig; +use swap::protocol::alice::AliceState; +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; + +#[tokio::test] +async fn concurrent_bobs_before_xmr_lock_proof_sent() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await; + + let swap_id = bob_swap_1.swap_id; + + let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_btc_locked)); + + let alice_swap_1 = ctx.alice_next_swap().await; + let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1)); + + let bob_state_1 = bob_swap_1.await??; + assert!(matches!(bob_state_1, BobState::BtcLocked(_))); + + // make sure bob_swap_1's event loop is gone + bob_join_handle_1.abort(); + + let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await; + let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2)); + + let alice_swap_2 = ctx.alice_next_swap().await; + let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2)); + + // The 2nd swap ALWAYS finish successfully in this + // scenario, but will receive an "unwanted" transfer proof that is ignored in + // the event loop. + + let bob_state_2 = bob_swap_2.await??; + assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. })); + + let alice_state_2 = alice_swap_2.await??; + assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. })); + + let (bob_swap_1, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle_2, swap_id) + .await; + assert!(matches!(bob_state_1, BobState::BtcLocked(_))); + + // The 1st (paused) swap is expected to refund, because the transfer + // proof is delivered to the wrong swap, and we currently don't store it in the + // database for the other swap. + + let bob_state_1 = bob::run(bob_swap_1).await?; + assert!(matches!(bob_state_1, BobState::BtcRefunded { .. })); + + let alice_state_1 = alice_swap_1.await??; + assert!(matches!(alice_state_1, AliceState::XmrRefunded { .. })); + + Ok(()) + }) + .await; +} diff --git a/swap/tests/ensure_same_swap_id.rs b/swap/tests/ensure_same_swap_id.rs new file mode 100644 index 00000000..1dbc7046 --- /dev/null +++ b/swap/tests/ensure_same_swap_id.rs @@ -0,0 +1,21 @@ +pub mod harness; + +use harness::SlowCancelConfig; +use swap::protocol::bob; + +#[tokio::test] +async fn ensure_same_swap_id_for_alice_and_bob() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, _) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; + let _ = tokio::spawn(bob::run(bob_swap)); + + // once Bob's swap is spawned we can retrieve Alice's swap and assert on the + // swap ID + let alice_swap = ctx.alice_next_swap().await; + assert_eq!(alice_swap.swap_id, bob_swap_id); + + Ok(()) + }) + .await; +} diff --git a/swap/tests/happy_path_restart_bob_after_xmr_locked.rs b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs index 9967c117..8e18f5ed 100644 --- a/swap/tests/happy_path_restart_bob_after_xmr_locked.rs +++ b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { assert!(matches!(bob_state, BobState::XmrLocked { .. })); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); let bob_state = bob::run(bob_swap).await?; diff --git a/swap/tests/happy_path_restart_bob_before_xmr_locked.rs b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs index 9967c117..8e18f5ed 100644 --- a/swap/tests/happy_path_restart_bob_before_xmr_locked.rs +++ b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs @@ -9,6 +9,7 @@ use swap::protocol::{alice, bob}; async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { harness::setup_test(SlowCancelConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { assert!(matches!(bob_state, BobState::XmrLocked { .. })); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); let bob_state = bob::run(bob_swap).await?; diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index 8229d314..bdbf3bd9 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -43,13 +43,52 @@ const BITCOIN_TEST_WALLET_NAME: &str = "testwallet"; #[derive(Debug, Clone)] pub struct StartingBalances { pub xmr: monero::Amount, + pub xmr_outputs: Vec, pub btc: bitcoin::Amount, } +impl StartingBalances { + /// If monero_outputs is specified the monero balance will be: + /// monero_outputs * new_xmr = self_xmr + pub fn new(btc: bitcoin::Amount, xmr: monero::Amount, monero_outputs: Option) -> Self { + match monero_outputs { + None => { + if xmr == monero::Amount::ZERO { + return Self { + xmr, + xmr_outputs: vec![], + btc, + }; + } + + Self { + xmr, + xmr_outputs: vec![xmr], + btc, + } + } + Some(outputs) => { + let mut xmr_outputs = Vec::new(); + let mut sum_xmr = monero::Amount::ZERO; + + for _ in 0..outputs { + xmr_outputs.push(xmr); + sum_xmr = sum_xmr + xmr; + } + + Self { + xmr: sum_xmr, + xmr_outputs, + btc, + } + } + } + } +} + struct BobParams { seed: Seed, db_path: PathBuf, - swap_id: Uuid, bitcoin_wallet: Arc, monero_wallet: Arc, alice_address: Multiaddr, @@ -58,12 +97,16 @@ struct BobParams { } impl BobParams { - pub async fn builder(&self, event_loop_handle: bob::EventLoopHandle) -> Result { + pub async fn builder( + &self, + event_loop_handle: bob::EventLoopHandle, + swap_id: Uuid, + ) -> Result { let receive_address = self.monero_wallet.get_main_address(); Ok(bob::Builder::new( Database::open(&self.db_path.clone().as_path()).unwrap(), - self.swap_id, + swap_id, self.bitcoin_wallet.clone(), self.monero_wallet.clone(), self.env_config, @@ -72,11 +115,16 @@ impl BobParams { )) } - pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { + pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { let mut swarm = swarm::new::(&self.seed)?; swarm.add_address(self.alice_peer_id, self.alice_address.clone()); - bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone()) + bob::EventLoop::new( + swap_id, + swarm, + self.alice_peer_id, + self.bitcoin_wallet.clone(), + ) } } @@ -139,24 +187,28 @@ impl TestContext { } pub async fn alice_next_swap(&mut self) -> alice::Swap { - timeout(Duration::from_secs(10), self.alice_swap_handle.recv()) + timeout(Duration::from_secs(20), self.alice_swap_handle.recv()) .await - .expect("No Alice swap within 10 seconds, aborting because this test is waiting for a swap forever...") + .expect("No Alice swap within 20 seconds, aborting because this test is likely waiting for a swap forever...") .unwrap() } pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) { - let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); + let swap_id = Uuid::new_v4(); + let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap(); let swap = self .bob_params - .builder(event_loop_handle) + .builder(event_loop_handle, swap_id) .await .unwrap() .with_init_params(self.btc_amount) .build() .unwrap(); + // ensure the wallet is up to date for concurrent swap tests + swap.bitcoin_wallet.sync().await.unwrap(); + let join_handle = tokio::spawn(event_loop.run()); (swap, BobApplicationHandle(join_handle)) @@ -165,14 +217,15 @@ impl TestContext { pub async fn stop_and_resume_bob_from_db( &mut self, join_handle: BobApplicationHandle, + swap_id: Uuid, ) -> (bob::Swap, BobApplicationHandle) { join_handle.abort(); - let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); + let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap(); let swap = self .bob_params - .builder(event_loop_handle) + .builder(event_loop_handle, swap_id) .await .unwrap() .build() @@ -489,14 +542,13 @@ where let env_config = C::get_config(); let (monero, containers) = harness::init_containers(&cli).await; + monero.init_miner().await.unwrap(); let btc_amount = bitcoin::Amount::from_sat(1_000_000); let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap(); - let alice_starting_balances = StartingBalances { - xmr: xmr_amount * 10, - btc: bitcoin::Amount::ZERO, - }; + let alice_starting_balances = + StartingBalances::new(bitcoin::Amount::ZERO, xmr_amount, Some(10)); let electrs_rpc_port = containers .electrs @@ -532,10 +584,7 @@ where ); let bob_seed = Seed::random().unwrap(); - let bob_starting_balances = StartingBalances { - xmr: monero::Amount::ZERO, - btc: btc_amount * 10, - }; + let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None); let (bob_bitcoin_wallet, bob_monero_wallet) = init_test_wallets( MONERO_WALLET_NAME_BOB, @@ -552,7 +601,6 @@ where let bob_params = BobParams { seed: Seed::random().unwrap(), db_path: tempdir().unwrap().path().to_path_buf(), - swap_id: Uuid::new_v4(), bitcoin_wallet: bob_bitcoin_wallet.clone(), monero_wallet: bob_monero_wallet.clone(), alice_address: alice_listen_address.clone(), @@ -560,6 +608,8 @@ where env_config, }; + monero.start_miner().await.unwrap(); + let test = TestContext { env_config, btc_amount, @@ -774,7 +824,14 @@ async fn init_test_wallets( env_config: Config, ) -> (Arc, Arc) { monero - .init(vec![(name, starting_balances.xmr.as_piconero())]) + .init_wallet( + name, + starting_balances + .xmr_outputs + .into_iter() + .map(|amount| amount.as_piconero()) + .collect(), + ) .await .unwrap(); diff --git a/swap/tests/punish.rs b/swap/tests/punish.rs index f058dc79..bf053f5b 100644 --- a/swap/tests/punish.rs +++ b/swap/tests/punish.rs @@ -11,6 +11,7 @@ use swap::protocol::{alice, bob}; async fn alice_punishes_if_bob_never_acts_after_fund() { harness::setup_test(FastPunishConfig, |mut ctx| async move { let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap_id = bob_swap.swap_id; let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let alice_swap = ctx.alice_next_swap().await; @@ -24,7 +25,9 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { // Restart Bob after Alice punished to ensure Bob transitions to // punished and does not run indefinitely - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, _) = ctx + .stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id) + .await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); let bob_state = bob::run(bob_swap).await?;