From 6e6dc320b438a760b420f4896a1a00348b98ded0 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 9 Feb 2021 11:15:55 +1100 Subject: [PATCH] Alice event loop now handles the creation of new swaps --- bors.toml | 3 + swap/src/database/alice.rs | 18 +- swap/src/network/transport.rs | 4 +- swap/src/protocol/alice.rs | 132 ++++++------- swap/src/protocol/alice/behaviour.rs | 31 +++- swap/src/protocol/alice/event_loop.rs | 175 +++++++++++++----- swap/src/protocol/alice/execution_setup.rs | 15 +- swap/src/protocol/alice/state.rs | 5 - swap/src/protocol/alice/swap.rs | 26 +-- swap/src/protocol/alice/swap_response.rs | 4 +- swap/src/protocol/bob.rs | 2 +- ...refunds_using_cancel_and_refund_command.rs | 13 +- ..._and_refund_command_timelock_not_exired.rs | 6 +- ...efund_command_timelock_not_exired_force.rs | 6 +- swap/tests/happy_path.rs | 11 +- .../happy_path_restart_bob_before_comm.rs | 9 +- swap/tests/testutils/mod.rs | 117 ++++++------ 17 files changed, 306 insertions(+), 271 deletions(-) diff --git a/bors.toml b/bors.toml index 27bf2b72..8d33e57c 100644 --- a/bors.toml +++ b/bors.toml @@ -4,4 +4,7 @@ status = [ "build_test (x86_64-apple-darwin)", "docker_tests (happy_path)", "docker_tests (happy_path_restart_bob_before_comm)", + "docker_tests (bob_refunds_using_cancel_and_refund_command.rs)", + "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_exired_force.rs)", + "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_exired.rs)" ] diff --git a/swap/src/database/alice.rs b/swap/src/database/alice.rs index 9a9c4383..3eadc70d 100644 --- a/swap/src/database/alice.rs +++ b/swap/src/database/alice.rs @@ -14,10 +14,6 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum Alice { Started { - amounts: SwapAmounts, - state0: alice::State0, - }, - Negotiated { state3: alice::State3, #[serde(with = "crate::serde_peer_id")] bob_peer_id: PeerId, @@ -54,11 +50,11 @@ pub enum AliceEndState { impl From<&AliceState> for Alice { fn from(alice_state: &AliceState) -> Self { match alice_state { - AliceState::Negotiated { + AliceState::Started { state3, bob_peer_id, .. - } => Alice::Negotiated { + } => Alice::Started { state3: state3.as_ref().clone(), bob_peer_id: *bob_peer_id, }, @@ -93,10 +89,6 @@ impl From<&AliceState> for Alice { } AliceState::BtcPunished => Alice::Done(AliceEndState::BtcPunished), AliceState::SafelyAborted => Alice::Done(AliceEndState::SafelyAborted), - AliceState::Started { amounts, state0 } => Alice::Started { - amounts: *amounts, - state0: state0.clone(), - }, } } } @@ -104,11 +96,10 @@ impl From<&AliceState> for Alice { impl From for AliceState { fn from(db_state: Alice) -> Self { match db_state { - Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 }, - Alice::Negotiated { + Alice::Started { state3, bob_peer_id, - } => AliceState::Negotiated { + } => AliceState::Started { bob_peer_id, amounts: SwapAmounts { btc: state3.btc, @@ -186,7 +177,6 @@ impl Display for Alice { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Alice::Started { .. } => write!(f, "Started"), - Alice::Negotiated { .. } => f.write_str("Negotiated"), Alice::BtcLocked { .. } => f.write_str("Bitcoin locked"), Alice::XmrLocked(_) => f.write_str("Monero locked"), Alice::CancelTimelockExpired(_) => f.write_str("Cancel timelock is expired"), diff --git a/swap/src/network/transport.rs b/swap/src/network/transport.rs index a8ba1be3..09bd8e9b 100644 --- a/swap/src/network/transport.rs +++ b/swap/src/network/transport.rs @@ -18,10 +18,10 @@ use libp2p::{ /// - DNS name resolution /// - authentication via noise /// - multiplexing via yamux or mplex -pub fn build(id_keys: identity::Keypair) -> Result { +pub fn build(id_keys: &identity::Keypair) -> Result { use libp2p::tcp::TokioTcpConfig; - let dh_keys = noise::Keypair::::new().into_authentic(&id_keys)?; + let dh_keys = noise::Keypair::::new().into_authentic(id_keys)?; let noise = NoiseConfig::xx(dh_keys).into_authenticated(); let tcp = TokioTcpConfig::new().nodelay(true); diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index e83329ff..2c9d86f3 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -2,15 +2,15 @@ //! Alice holds XMR and wishes receive BTC. use crate::{ bitcoin, database, database::Database, execution_params::ExecutionParams, monero, - network::Seed as NetworkSeed, protocol::SwapAmounts, seed::Seed, + protocol::SwapAmounts, }; use anyhow::{bail, Result}; -use libp2p::{core::Multiaddr, identity::Keypair, PeerId}; -use rand::rngs::OsRng; +use libp2p::{core::Multiaddr, PeerId}; use std::sync::Arc; use uuid::Uuid; pub use self::{ + behaviour::{Behaviour, OutEvent}, event_loop::{EventLoop, EventLoopHandle}, execution_setup::Message1, state::*, @@ -37,16 +37,15 @@ pub struct Swap { pub monero_wallet: Arc, pub execution_params: ExecutionParams, pub swap_id: Uuid, - pub db: Database, + pub db: Arc, } pub struct Builder { swap_id: Uuid, - identity: Keypair, peer_id: PeerId, - db: Database, + db: Arc, execution_params: ExecutionParams, - + event_loop_handle: EventLoopHandle, listen_address: Multiaddr, bitcoin_wallet: Arc, @@ -57,29 +56,31 @@ pub struct Builder { enum InitParams { None, - New { swap_amounts: SwapAmounts }, + New { + swap_amounts: SwapAmounts, + bob_peer_id: PeerId, + state3: Box, + }, } impl Builder { + #[allow(clippy::too_many_arguments)] pub fn new( - seed: Seed, + self_peer_id: PeerId, execution_params: ExecutionParams, swap_id: Uuid, bitcoin_wallet: Arc, monero_wallet: Arc, - db: Database, + db: Arc, listen_address: Multiaddr, + event_loop_handle: EventLoopHandle, ) -> Self { - let network_seed = NetworkSeed::new(seed); - let identity = network_seed.derive_libp2p_identity(); - let peer_id = PeerId::from(identity.public()); - Self { swap_id, - identity, - peer_id, + peer_id: self_peer_id, db, execution_params, + event_loop_handle, listen_address, bitcoin_wallet, monero_wallet, @@ -87,35 +88,44 @@ impl Builder { } } - pub fn with_init_params(self, swap_amounts: SwapAmounts) -> Self { + pub fn with_init_params( + self, + swap_amounts: SwapAmounts, + bob_peer_id: PeerId, + state3: State3, + ) -> Self { Self { - init_params: InitParams::New { swap_amounts }, + init_params: InitParams::New { + swap_amounts, + bob_peer_id, + state3: Box::new(state3), + }, ..self } } - pub async fn build(self) -> Result<(Swap, EventLoop)> { + pub async fn build(self) -> Result { match self.init_params { - InitParams::New { swap_amounts } => { - let initial_state = self - .make_initial_state(swap_amounts.btc, swap_amounts.xmr) - .await?; + InitParams::New { + swap_amounts, + bob_peer_id, + ref state3, + } => { + let initial_state = AliceState::Started { + amounts: swap_amounts, + state3: state3.clone(), + bob_peer_id, + }; - let (event_loop, event_loop_handle) = - EventLoop::new(self.identity.clone(), self.listen_address(), self.peer_id)?; - - Ok(( - Swap { - event_loop_handle, - bitcoin_wallet: self.bitcoin_wallet, - monero_wallet: self.monero_wallet, - execution_params: self.execution_params, - db: self.db, - state: initial_state, - swap_id: self.swap_id, - }, - event_loop, - )) + Ok(Swap { + event_loop_handle: self.event_loop_handle, + bitcoin_wallet: self.bitcoin_wallet, + monero_wallet: self.monero_wallet, + execution_params: self.execution_params, + db: self.db, + state: initial_state, + swap_id: self.swap_id, + }) } InitParams::None => { let resume_state = @@ -128,21 +138,15 @@ impl Builder { ) }; - let (event_loop, event_loop_handle) = - EventLoop::new(self.identity.clone(), self.listen_address(), self.peer_id)?; - - Ok(( - Swap { - state: resume_state, - event_loop_handle, - bitcoin_wallet: self.bitcoin_wallet, - monero_wallet: self.monero_wallet, - execution_params: self.execution_params, - swap_id: self.swap_id, - db: self.db, - }, - event_loop, - )) + Ok(Swap { + state: resume_state, + event_loop_handle: self.event_loop_handle, + bitcoin_wallet: self.bitcoin_wallet, + monero_wallet: self.monero_wallet, + execution_params: self.execution_params, + swap_id: self.swap_id, + db: self.db, + }) } } } @@ -154,26 +158,4 @@ impl Builder { pub fn listen_address(&self) -> Multiaddr { self.listen_address.clone() } - - async fn make_initial_state( - &self, - btc_to_swap: bitcoin::Amount, - xmr_to_swap: monero::Amount, - ) -> Result { - let amounts = SwapAmounts { - btc: btc_to_swap, - xmr: xmr_to_swap, - }; - - let state0 = State0::new( - amounts.btc, - amounts.xmr, - self.execution_params, - self.bitcoin_wallet.as_ref(), - &mut OsRng, - ) - .await?; - - Ok(AliceState::Started { amounts, state0 }) - } } diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 14e46000..4e978f35 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,7 +1,6 @@ use crate::{ network::{peer_tracker, peer_tracker::PeerTracker}, protocol::{ - alice, alice::{ encrypted_signature, execution_setup, swap_response, transfer_proof, State0, State3, SwapResponse, TransferProof, @@ -19,8 +18,12 @@ pub enum OutEvent { SwapRequest { msg: SwapRequest, channel: ResponseChannel, + bob_peer_id: PeerId, + }, + ExecutionSetupDone { + bob_peer_id: PeerId, + state3: Box, }, - ExecutionSetupDone(Box), TransferProofAcknowledged, EncryptedSignature { msg: Box, @@ -40,11 +43,19 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: alice::OutEvent) -> Self { - use crate::protocol::alice::OutEvent::*; +impl From for OutEvent { + fn from(event: swap_response::OutEvent) -> Self { + use crate::protocol::alice::swap_response::OutEvent::*; match event { - MsgReceived { msg, channel } => OutEvent::SwapRequest { msg, channel }, + MsgReceived { + msg, + channel, + bob_peer_id, + } => OutEvent::SwapRequest { + msg, + channel, + bob_peer_id, + }, ResponseSent => OutEvent::ResponseSent, Failure(err) => OutEvent::Failure(err.context("Swap Request/Response failure")), } @@ -55,7 +66,13 @@ impl From for OutEvent { fn from(event: execution_setup::OutEvent) -> Self { use crate::protocol::alice::execution_setup::OutEvent::*; match event { - Done(state3) => OutEvent::ExecutionSetupDone(Box::new(state3)), + Done { + bob_peer_id, + state3, + } => OutEvent::ExecutionSetupDone { + bob_peer_id, + state3: Box::new(state3), + }, Failure(err) => OutEvent::Failure(err), } } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index bcc4dc32..02bd17b3 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,19 +1,29 @@ use crate::{ + bitcoin, + database::Database, + execution_params::ExecutionParams, + monero, network, network::{transport, TokioExecutor}, protocol::{ - alice::{ - behaviour::{Behaviour, OutEvent}, - State3, SwapResponse, TransferProof, - }, + alice, + alice::{Behaviour, Builder, OutEvent, State0, State3, SwapResponse, TransferProof}, bob::{EncryptedSignature, SwapRequest}, + SwapAmounts, }, + seed::Seed, }; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use libp2p::{ core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, }; +use rand::rngs::OsRng; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, trace}; +use uuid::Uuid; + +// TODO: Use dynamic +const RATE: u32 = 100; #[allow(missing_debug_implementations)] pub struct MpscChannels { @@ -34,7 +44,6 @@ where T: Clone, { sender: broadcast::Sender, - receiver: broadcast::Receiver, } impl Default for BroadcastChannels @@ -42,8 +51,8 @@ where T: Clone, { fn default() -> Self { - let (sender, receiver) = broadcast::channel(100); - BroadcastChannels { sender, receiver } + let (sender, _receiver) = broadcast::channel(100); + BroadcastChannels { sender } } } @@ -70,21 +79,37 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, + peer_id: PeerId, + execution_params: ExecutionParams, + bitcoin_wallet: Arc, + monero_wallet: Arc, + db: Arc, + listen_address: Multiaddr, + + // Amounts agreed upon for swaps currently in the execution setup phase + // Note: We can do one execution setup per peer at a given time. + swap_amounts: HashMap, + recv_encrypted_signature: broadcast::Sender, send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, - // Only used to clone further handles - handle: EventLoopHandle, + // Only used to produce new handles + send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>, } impl EventLoop { pub fn new( - identity: libp2p::identity::Keypair, - listen: Multiaddr, - peer_id: PeerId, - ) -> Result<(Self, EventLoopHandle)> { + listen_address: Multiaddr, + seed: Seed, + execution_params: ExecutionParams, + bitcoin_wallet: Arc, + monero_wallet: Arc, + db: Arc, + ) -> Result { + let identity = network::Seed::new(seed).derive_libp2p_identity(); let behaviour = Behaviour::default(); - let transport = transport::build(identity)?; + let transport = transport::build(&identity)?; + let peer_id = PeerId::from(identity.public()); let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) .executor(Box::new(TokioExecutor { @@ -92,39 +117,38 @@ impl EventLoop { })) .build(); - Swarm::listen_on(&mut swarm, listen.clone()) - .with_context(|| format!("Address is not supported: {:#}", listen))?; + Swarm::listen_on(&mut swarm, listen_address.clone()) + .with_context(|| format!("Address is not supported: {:#}", listen_address))?; let recv_encrypted_signature = BroadcastChannels::default(); let send_transfer_proof = MpscChannels::default(); - let handle_clone = EventLoopHandle { - recv_encrypted_signature: recv_encrypted_signature.sender.subscribe(), - send_transfer_proof: send_transfer_proof.sender.clone(), - }; - - let driver = EventLoop { + Ok(EventLoop { swarm, + peer_id, + execution_params, + bitcoin_wallet, + monero_wallet, + db, + listen_address, + swap_amounts: Default::default(), recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, - handle: handle_clone, - }; - - let handle = EventLoopHandle { - recv_encrypted_signature: recv_encrypted_signature.receiver, - send_transfer_proof: send_transfer_proof.sender, - }; - - Ok((driver, handle)) + send_transfer_proof_sender: send_transfer_proof.sender, + }) } - pub fn clone_handle(&self) -> EventLoopHandle { + pub fn new_handle(&self) -> EventLoopHandle { EventLoopHandle { recv_encrypted_signature: self.recv_encrypted_signature.subscribe(), - send_transfer_proof: self.handle.send_transfer_proof.clone(), + send_transfer_proof: self.send_transfer_proof_sender.clone(), } } + pub fn peer_id(&self) -> PeerId { + self.peer_id + } + pub async fn run(&mut self) { loop { tokio::select! { @@ -133,11 +157,11 @@ impl EventLoop { OutEvent::ConnectionEstablished(alice) => { debug!("Connection Established with {}", alice); } - OutEvent::SwapRequest { msg, channel } => { - let _ = self.handle_swap_request(msg, channel).await; + OutEvent::SwapRequest { msg, channel, bob_peer_id } => { + let _ = self.handle_swap_request(msg, channel, bob_peer_id).await; } - OutEvent::ExecutionSetupDone(state3) => { - let _ = self.handle_execution_setup_done(*state3).await; + OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { + let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; } OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); @@ -165,11 +189,76 @@ impl EventLoop { } async fn handle_swap_request( - &self, - _msg: SwapRequest, - _channel: ResponseChannel, - ) { + &mut self, + swap_request: SwapRequest, + channel: ResponseChannel, + bob_peer_id: PeerId, + ) -> Result<()> { + // 1. Check if acceptable request + // 2. Send response + + let btc_amount = swap_request.btc_amount; + let xmr_amount = btc_amount.as_btc() * RATE as f64; + let xmr_amount = monero::Amount::from_monero(xmr_amount)?; + let swap_response = SwapResponse { xmr_amount }; + + self.swarm + .send_swap_response(channel, swap_response) + .context("Failed to send swap response")?; + + // 3. Start setup execution + + let state0 = State0::new( + btc_amount, + xmr_amount, + self.execution_params, + self.bitcoin_wallet.as_ref(), + &mut OsRng, + ) + .await?; + + // if a node restart during execution setup, the swap is aborted (safely). + self.swap_amounts.insert(bob_peer_id, SwapAmounts { + btc: btc_amount, + xmr: xmr_amount, + }); + + self.swarm.start_execution_setup(bob_peer_id, state0); + // Continues once the execution setup protocol is done + Ok(()) } - async fn handle_execution_setup_done(&self, _state3: State3) {} + async fn handle_execution_setup_done( + &mut self, + bob_peer_id: PeerId, + state3: State3, + ) -> Result<()> { + let swap_id = Uuid::new_v4(); + let handle = self.new_handle(); + + let swap_amounts = self.swap_amounts.remove(&bob_peer_id).ok_or_else(|| { + anyhow!( + "execution setup done for an unknown peer id: {}, node restarted in between?", + bob_peer_id + ) + })?; + + let swap = Builder::new( + self.peer_id, + self.execution_params, + swap_id, + self.bitcoin_wallet.clone(), + self.monero_wallet.clone(), + self.db.clone(), + self.listen_address.clone(), + handle, + ) + .with_init_params(swap_amounts, bob_peer_id, state3) + .build() + .await?; + + tokio::spawn(async move { alice::run(swap).await }); + + Ok(()) + } } diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index e05964a2..af1e2350 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -32,14 +32,17 @@ pub struct Message3 { #[derive(Debug)] pub enum OutEvent { - Done(State3), + Done { bob_peer_id: PeerId, state3: State3 }, Failure(Error), } -impl From> for OutEvent { - fn from(event: BehaviourOutEvent) -> Self { +impl From> for OutEvent { + fn from(event: BehaviourOutEvent<(PeerId, State3), (), Error>) -> Self { match event { - BehaviourOutEvent::Inbound(_, Ok(State3)) => OutEvent::Done(State3), + BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, state3))) => OutEvent::Done { + bob_peer_id, + state3, + }, BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Failure(e), BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"), } @@ -49,7 +52,7 @@ impl From> for OutEvent { #[derive(libp2p::NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] pub struct Behaviour { - inner: libp2p_async_await::Behaviour, + inner: libp2p_async_await::Behaviour<(PeerId, State3), (), anyhow::Error>, } impl Default for Behaviour { @@ -93,7 +96,7 @@ impl Behaviour { .context("failed to deserialize message4")?; let state3 = state2.receive(message4)?; - Ok(state3) + Ok((bob, state3)) }) } } diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 7127ba36..9905a069 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -25,10 +25,6 @@ use std::fmt; #[derive(Debug)] pub enum AliceState { Started { - amounts: SwapAmounts, - state0: State0, - }, - Negotiated { bob_peer_id: PeerId, amounts: SwapAmounts, state3: Box, @@ -70,7 +66,6 @@ impl fmt::Display for AliceState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { AliceState::Started { .. } => write!(f, "started"), - AliceState::Negotiated { .. } => write!(f, "negotiated"), AliceState::BtcLocked { .. } => write!(f, "btc is locked"), AliceState::XmrLocked { .. } => write!(f, "xmr is locked"), AliceState::EncSigLearned { .. } => write!(f, "encrypted signature is learned"), diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index d01df0c4..4bd6ef6a 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -84,36 +84,14 @@ async fn run_until_internal( monero_wallet: Arc, execution_params: ExecutionParams, swap_id: Uuid, - db: Database, + db: Arc, ) -> Result { info!("Current state:{}", state); if is_target_state(&state) { Ok(state) } else { match state { - AliceState::Started { amounts, state0 } => { - let state = AliceState::Negotiated { - bob_peer_id: todo!(), - amounts, - state3: todo!(), - }; - - let db_state = (&state).into(); - db.insert_latest_state(swap_id, database::Swap::Alice(db_state)) - .await?; - run_until_internal( - state, - is_target_state, - event_loop_handle, - bitcoin_wallet, - monero_wallet, - execution_params, - swap_id, - db, - ) - .await - } - AliceState::Negotiated { + AliceState::Started { state3, bob_peer_id, amounts, diff --git a/swap/src/protocol/alice/swap_response.rs b/swap/src/protocol/alice/swap_response.rs index 69138642..551d3b11 100644 --- a/swap/src/protocol/alice/swap_response.rs +++ b/swap/src/protocol/alice/swap_response.rs @@ -9,7 +9,7 @@ use libp2p::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }, - NetworkBehaviour, + NetworkBehaviour, PeerId, }; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -20,6 +20,7 @@ pub enum OutEvent { MsgReceived { msg: SwapRequest, channel: ResponseChannel, + bob_peer_id: PeerId, }, ResponseSent, Failure(Error), @@ -45,6 +46,7 @@ impl From> for OutEvent { OutEvent::MsgReceived { msg: request, channel, + bob_peer_id: peer, } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 4807d16e..d3010801 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -168,7 +168,7 @@ impl Builder { &self, ) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> { let bob_behaviour = bob::Behaviour::default(); - let bob_transport = build(self.identity.clone())?; + let bob_transport = build(&self.identity)?; bob::event_loop::EventLoop::new( bob_transport, diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs index 9b64d826..43007177 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs @@ -1,17 +1,13 @@ pub mod testutils; -use swap::protocol::{alice, bob, bob::BobState}; +use swap::protocol::{bob, bob::BobState}; use testutils::{bob_run_until::is_btc_locked, FastCancelConfig}; #[tokio::test] async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { testutils::setup_test(FastCancelConfig, |mut ctx| async move { - let (alice_swap, _) = ctx.new_swap_as_alice().await; let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; - let alice_handle = alice::run(alice_swap); - let alice_swap_handle = tokio::spawn(alice_handle); - let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); assert!(matches!(bob_state, BobState::BtcLocked { .. })); @@ -29,6 +25,7 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { } // Bob manually cancels + bob_join_handle.abort(); let (_, state) = bob::cancel( bob_swap.swap_id, bob_swap.state, @@ -41,10 +38,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { .unwrap(); assert!(matches!(state, BobState::BtcCancelled { .. })); - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. })); // Bob manually refunds + bob_join_handle.abort(); let bob_state = bob::refund( bob_swap.swap_id, bob_swap.state, @@ -58,9 +56,6 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { .unwrap(); ctx.assert_bob_refunded(bob_state).await; - - let alice_state = alice_swap_handle.await.unwrap().unwrap(); - ctx.assert_alice_refunded(alice_state).await; }) .await; } diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired.rs index 07b69590..9e8a79aa 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired.rs @@ -1,18 +1,14 @@ pub mod testutils; use bob::cancel::CancelError; -use swap::protocol::{alice, bob, bob::BobState}; +use swap::protocol::{bob, bob::BobState}; use testutils::{bob_run_until::is_btc_locked, SlowCancelConfig}; #[tokio::test] async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (alice_swap, _) = ctx.new_swap_as_alice().await; let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; - let alice_handle = alice::run(alice_swap); - tokio::spawn(alice_handle); - let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); assert!(matches!(bob_state, BobState::BtcLocked { .. })); diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired_force.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired_force.rs index 098f8752..fd3465d7 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired_force.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_exired_force.rs @@ -1,17 +1,13 @@ pub mod testutils; -use swap::protocol::{alice, bob, bob::BobState}; +use swap::protocol::{bob, bob::BobState}; use testutils::{bob_run_until::is_btc_locked, SlowCancelConfig}; #[tokio::test] async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (alice_swap, _) = ctx.new_swap_as_alice().await; let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; - let alice_handle = alice::run(alice_swap); - tokio::spawn(alice_handle); - let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); assert!(matches!(bob_state, BobState::BtcLocked { .. })); diff --git a/swap/tests/happy_path.rs b/swap/tests/happy_path.rs index 548aee86..8ba5e903 100644 --- a/swap/tests/happy_path.rs +++ b/swap/tests/happy_path.rs @@ -1,23 +1,18 @@ pub mod testutils; -use swap::protocol::{alice, bob}; +use swap::protocol::bob; use testutils::SlowCancelConfig; -use tokio::join; /// Run the following tests with RUST_MIN_STACK=10000000 #[tokio::test] async fn happy_path() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (alice_swap, _) = ctx.new_swap_as_alice().await; let (bob_swap, _) = ctx.new_swap_as_bob().await; - let alice = alice::run(alice_swap); - let bob = bob::run(bob_swap); + let bob_state = bob::run(bob_swap).await; - let (alice_state, bob_state) = join!(alice, bob); - - ctx.assert_alice_redeemed(alice_state.unwrap()).await; + ctx.assert_alice_redeemed().await; ctx.assert_bob_redeemed(bob_state.unwrap()).await; }) .await; diff --git a/swap/tests/happy_path_restart_bob_before_comm.rs b/swap/tests/happy_path_restart_bob_before_comm.rs index 7c72adf9..6235b0ec 100644 --- a/swap/tests/happy_path_restart_bob_before_comm.rs +++ b/swap/tests/happy_path_restart_bob_before_comm.rs @@ -1,17 +1,13 @@ pub mod testutils; -use swap::protocol::{alice, bob, bob::BobState}; +use swap::protocol::{bob, bob::BobState}; use testutils::{bob_run_until::is_xmr_locked, SlowCancelConfig}; #[tokio::test] async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (alice_swap, _) = ctx.new_swap_as_alice().await; let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; - let alice_handle = alice::run(alice_swap); - let alice_swap_handle = tokio::spawn(alice_handle); - let bob_state = bob::run_until(bob_swap, is_xmr_locked).await.unwrap(); assert!(matches!(bob_state, BobState::XmrLocked { .. })); @@ -23,8 +19,7 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { ctx.assert_bob_redeemed(bob_state).await; - let alice_state = alice_swap_handle.await.unwrap(); - ctx.assert_alice_redeemed(alice_state.unwrap()).await; + ctx.assert_alice_redeemed().await; }) .await; } diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 91a7949f..bec889af 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -8,6 +8,7 @@ use std::{path::PathBuf, sync::Arc}; use swap::{ bitcoin, bitcoin::Timelock, + database::{Database, Swap}, execution_params, execution_params::{ExecutionParams, GetExecutionParams}, monero, @@ -30,31 +31,10 @@ pub struct StartingBalances { struct AliceParams { seed: Seed, execution_params: ExecutionParams, - swap_id: Uuid, - bitcoin_wallet: Arc, - monero_wallet: Arc, - db_path: PathBuf, + db: Arc, listen_address: Multiaddr, } -impl AliceParams { - pub fn builder(&self) -> alice::Builder { - alice::Builder::new( - self.seed, - self.execution_params, - self.swap_id, - self.bitcoin_wallet.clone(), - self.monero_wallet.clone(), - self.db_path.clone(), - self.listen_address.clone(), - ) - } - - fn peer_id(&self) -> PeerId { - self.builder().peer_id() - } -} - #[derive(Debug, Clone)] struct BobParams { seed: Seed, @@ -84,6 +64,12 @@ impl BobParams { pub struct BobEventLoopJoinHandle(JoinHandle<()>); +impl BobEventLoopJoinHandle { + pub fn abort(&self) { + self.0.abort() + } +} + pub struct AliceEventLoopJoinHandle(JoinHandle<()>); pub struct TestContext { @@ -101,20 +87,6 @@ pub struct TestContext { } impl TestContext { - pub async fn new_swap_as_alice(&mut self) -> (alice::Swap, AliceEventLoopJoinHandle) { - let (swap, mut event_loop) = self - .alice_params - .builder() - .with_init_params(self.swap_amounts) - .build() - .await - .unwrap(); - - let join_handle = tokio::spawn(async move { event_loop.run().await }); - - (swap, AliceEventLoopJoinHandle(join_handle)) - } - pub async fn new_swap_as_bob(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) { let (swap, event_loop) = self .bob_params @@ -129,24 +101,11 @@ impl TestContext { (swap, BobEventLoopJoinHandle(join_handle)) } - pub async fn stop_and_resume_alice_from_db( - &mut self, - join_handle: AliceEventLoopJoinHandle, - ) -> alice::Swap { - join_handle.0.abort(); - - let (swap, mut event_loop) = self.alice_params.builder().build().await.unwrap(); - - tokio::spawn(async move { event_loop.run().await }); - - swap - } - pub async fn stop_and_resume_bob_from_db( &mut self, join_handle: BobEventLoopJoinHandle, ) -> (bob::Swap, BobEventLoopJoinHandle) { - join_handle.0.abort(); + join_handle.abort(); let (swap, event_loop) = self.bob_params.builder().build().await.unwrap(); @@ -155,7 +114,17 @@ impl TestContext { (swap, BobEventLoopJoinHandle(join_handle)) } - pub async fn assert_alice_redeemed(&self, state: AliceState) { + pub async fn assert_alice_redeemed(&self) { + let mut states = self.alice_params.db.all().unwrap(); + + assert_eq!(states.len(), 1, "Expected only one swap in Alice's db"); + + let (_swap_id, state) = states.pop().unwrap(); + let state = match state { + Swap::Alice(state) => state.into(), + Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"), + }; + assert!(matches!(state, AliceState::BtcRedeemed)); let btc_balance_after_swap = self.alice_bitcoin_wallet.as_ref().balance().await.unwrap(); @@ -174,8 +143,22 @@ impl TestContext { assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.swap_amounts.xmr); } - pub async fn assert_alice_refunded(&self, state: AliceState) { - assert!(matches!(state, AliceState::XmrRefunded)); + pub async fn assert_alice_refunded(&self) { + let mut states = self.alice_params.db.all().unwrap(); + + assert_eq!(states.len(), 1, "Expected only one swap in Alice's db"); + + let (_swap_id, state) = states.pop().unwrap(); + let state = match state { + Swap::Alice(state) => state.into(), + Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"), + }; + + assert!( + matches!(state, AliceState::XmrRefunded), + "Alice state is not XmrRefunded: {}", + state + ); let btc_balance_after_swap = self.alice_bitcoin_wallet.as_ref().balance().await.unwrap(); assert_eq!(btc_balance_after_swap, self.alice_starting_balances.btc); @@ -342,13 +325,13 @@ where ) .await; + let db_path = tempdir().unwrap(); + let alice_db = Arc::new(Database::open(db_path.path()).unwrap()); + let alice_params = AliceParams { seed: Seed::random().unwrap(), execution_params, - swap_id: Uuid::new_v4(), - bitcoin_wallet: alice_bitcoin_wallet.clone(), - monero_wallet: alice_monero_wallet.clone(), - db_path: tempdir().unwrap().path().to_path_buf(), + db: alice_db.clone(), listen_address, }; @@ -365,6 +348,22 @@ where ) .await; + let mut alice_event_loop = alice::EventLoop::new( + alice_params.listen_address.clone(), + alice_params.seed, + alice_params.execution_params, + alice_bitcoin_wallet.clone(), + alice_monero_wallet.clone(), + alice_db, + ) + .unwrap(); + + let alice_peer_id = alice_event_loop.peer_id(); + + tokio::spawn(async move { + alice_event_loop.run().await; + }); + let bob_params = BobParams { seed: Seed::random().unwrap(), db_path: tempdir().unwrap().path().to_path_buf(), @@ -372,7 +371,7 @@ where bitcoin_wallet: bob_bitcoin_wallet.clone(), monero_wallet: bob_monero_wallet.clone(), alice_address: alice_params.listen_address.clone(), - alice_peer_id: alice_params.peer_id(), + alice_peer_id, execution_params, }; @@ -388,7 +387,7 @@ where bob_monero_wallet, }; - testfn(test).await + testfn(test).await; } async fn init_containers(cli: &Cli) -> (Monero, Containers<'_>) {