diff --git a/swap/src/alice/execution.rs b/swap/src/alice/execution.rs index d9f5efe4..43ffbcc7 100644 --- a/swap/src/alice/execution.rs +++ b/swap/src/alice/execution.rs @@ -1,5 +1,5 @@ use crate::{ - alice::swarm_driver::SwarmDriver, bitcoin, monero, network::request_response::AliceToBob, + alice::swarm_driver::SwarmDriverHandle, bitcoin, monero, network::request_response::AliceToBob, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, }; use anyhow::{bail, Context, Result}; @@ -31,7 +31,7 @@ pub async fn negotiate( a: bitcoin::SecretKey, s_a: cross_curve_dleq::Scalar, v_a: monero::PrivateViewKey, - swarm: &mut SwarmDriver, + swarm: &mut SwarmDriverHandle, bitcoin_wallet: Arc, config: Config, ) -> Result<(ResponseChannel, State3)> { @@ -52,7 +52,7 @@ pub async fn negotiate( ); } - swarm.send_amounts(event.channel, amounts); + swarm.send_amounts(event.channel, amounts).await?; let redeem_address = bitcoin_wallet.as_ref().new_address().await?; let punish_address = redeem_address.clone(); @@ -70,7 +70,7 @@ pub async fn negotiate( ); // TODO(Franck): Understand why this is needed. - swarm.swarm.set_state0(state0.clone()); + // swarm.swarm.set_state0(state0.clone()); let bob_message0 = timeout(config.bob_time_to_act, swarm.recv_message0()).await??; @@ -80,7 +80,7 @@ pub async fn negotiate( let state2 = state1.receive(bob_message1); - swarm.send_message1(channel, state2.next_message()); + swarm.send_message1(channel, state2.next_message()).await?; let (bob_message2, channel) = timeout(config.bob_time_to_act, swarm.recv_message2()).await??; @@ -117,7 +117,7 @@ pub async fn lock_xmr( channel: ResponseChannel, amounts: SwapAmounts, state3: State3, - swarm: &mut SwarmDriver, + swarm: &mut SwarmDriverHandle, monero_wallet: Arc, ) -> Result<()> where @@ -136,15 +136,17 @@ where // TODO(Franck): Wait for Monero to be confirmed once - swarm.send_message2(channel, alice::Message2 { - tx_lock_proof: transfer_proof, - }); + swarm + .send_message2(channel, alice::Message2 { + tx_lock_proof: transfer_proof, + }) + .await?; Ok(()) } pub async fn wait_for_bitcoin_encrypted_signature( - swarm: &mut SwarmDriver, + swarm: &mut SwarmDriverHandle, timeout_duration: Duration, ) -> Result { let msg3 = timeout(timeout_duration, swarm.recv_message3()) diff --git a/swap/src/alice/swap.rs b/swap/src/alice/swap.rs index 66299c5d..ca5d84dd 100644 --- a/swap/src/alice/swap.rs +++ b/swap/src/alice/swap.rs @@ -8,7 +8,7 @@ use crate::{ publish_bitcoin_redeem_transaction, publish_cancel_transaction, wait_for_bitcoin_encrypted_signature, wait_for_bitcoin_refund, wait_for_locked_bitcoin, }, - swarm_driver::SwarmDriver, + swarm_driver::SwarmDriverHandle, }, bitcoin, bitcoin::EncryptedSignature, @@ -109,11 +109,11 @@ impl fmt::Display for AliceState { pub async fn swap( state: AliceState, - swarm: SwarmDriver, + swarm: SwarmDriverHandle, bitcoin_wallet: Arc, monero_wallet: Arc, config: Config, -) -> Result<(AliceState, SwarmDriver)> { +) -> Result<(AliceState, SwarmDriverHandle)> { run_until( state, is_complete, @@ -147,11 +147,11 @@ pub fn is_xmr_locked(state: &AliceState) -> bool { pub async fn run_until( state: AliceState, is_target_state: fn(&AliceState) -> bool, - mut swarm: SwarmDriver, + mut swarm: SwarmDriverHandle, bitcoin_wallet: Arc, monero_wallet: Arc, config: Config, -) -> Result<(AliceState, SwarmDriver)> { +) -> Result<(AliceState, SwarmDriverHandle)> { info!("Current state:{}", state); if is_target_state(&state) { Ok((state, swarm)) diff --git a/swap/src/alice/swarm_driver.rs b/swap/src/alice/swarm_driver.rs index b0947920..65365aaa 100644 --- a/swap/src/alice/swarm_driver.rs +++ b/swap/src/alice/swarm_driver.rs @@ -6,7 +6,6 @@ use crate::{ use anyhow::{Context, Result}; use libp2p::{core::Multiaddr, request_response::ResponseChannel, PeerId, Swarm}; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::info; use xmr_btc::{alice, bob}; pub struct Channels { @@ -27,18 +26,107 @@ impl Default for Channels { } } +pub struct SwarmDriverHandle { + pub msg0: Receiver, + pub msg1: Receiver<(bob::Message1, ResponseChannel)>, + pub msg2: Receiver<(bob::Message2, ResponseChannel)>, + pub msg3: Receiver, + pub request: Receiver, + pub conn_established: Receiver, + pub send_amounts: Sender<(ResponseChannel, SwapAmounts)>, + pub send_msg1: Sender<(ResponseChannel, alice::Message1)>, + pub send_msg2: Sender<(ResponseChannel, alice::Message2)>, +} + +impl SwarmDriverHandle { + pub async fn recv_conn_established(&mut self) -> Result { + self.conn_established + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive connection established from Bob")) + } + + pub async fn recv_message0(&mut self) -> Result { + self.msg0 + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Bob")) + } + + pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel)> { + self.msg1 + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Bob")) + } + + pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel)> { + self.msg2 + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed o receive message 2 from Bob")) + } + + pub async fn recv_message3(&mut self) -> Result { + self.msg3.recv().await.ok_or_else(|| { + anyhow::Error::msg("Failed to receive Bitcoin encrypted signature from Bob") + }) + } + + pub async fn recv_request(&mut self) -> Result { + self.request + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive amounts request from Bob")) + } + + pub async fn send_amounts( + &mut self, + channel: ResponseChannel, + amounts: SwapAmounts, + ) -> Result<()> { + let _ = self.send_amounts.send((channel, amounts)).await?; + Ok(()) + } + + pub async fn send_message1( + &mut self, + channel: ResponseChannel, + msg: alice::Message1, + ) -> Result<()> { + let _ = self.send_msg1.send((channel, msg)).await?; + Ok(()) + } + + pub async fn send_message2( + &mut self, + channel: ResponseChannel, + msg: alice::Message2, + ) -> Result<()> { + let _ = self.send_msg2.send((channel, msg)).await?; + Ok(()) + } +} + pub struct SwarmDriver { pub swarm: libp2p::Swarm, - pub msg0: Channels, - pub msg1: Channels<(bob::Message1, ResponseChannel)>, - pub msg2: Channels<(bob::Message2, ResponseChannel)>, - pub msg3: Channels, - pub request: Channels, - pub conn_established: Channels, + pub msg0: Sender, + pub msg1: Sender<(bob::Message1, ResponseChannel)>, + pub msg2: Sender<(bob::Message2, ResponseChannel)>, + pub msg3: Sender, + pub request: Sender, + pub conn_established: Sender, + pub send_amounts: Receiver<(ResponseChannel, SwapAmounts)>, + pub send_msg1: Receiver<(ResponseChannel, alice::Message1)>, + pub send_msg2: Receiver<(ResponseChannel, alice::Message2)>, } impl SwarmDriver { - pub fn new(transport: SwapTransport, behaviour: Behaviour, listen: Multiaddr) -> Result { + pub fn new( + transport: SwapTransport, + behaviour: Behaviour, + listen: Multiaddr, + ) -> Result<(Self, SwarmDriverHandle)> { let local_peer_id = behaviour.peer_id(); let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id) @@ -50,99 +138,78 @@ impl SwarmDriver { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Address is not supported: {:#}", listen))?; - Ok(SwarmDriver { + let msg0 = Channels::new(); + let msg1 = Channels::new(); + let msg2 = Channels::new(); + let msg3 = Channels::new(); + let request = Channels::new(); + let conn_established = Channels::new(); + let send_amounts = Channels::new(); + let send_msg1 = Channels::new(); + let send_msg2 = Channels::new(); + + let driver = SwarmDriver { swarm, - msg0: Channels::new(), - msg1: Channels::new(), - msg2: Channels::new(), - msg3: Channels::new(), - request: Channels::new(), - conn_established: Channels::new(), - }) + msg0: msg0.sender, + msg1: msg1.sender, + msg2: msg2.sender, + msg3: msg3.sender, + request: request.sender, + conn_established: conn_established.sender, + send_amounts: send_amounts.receiver, + send_msg1: send_msg1.receiver, + send_msg2: send_msg2.receiver, + }; + + let handle = SwarmDriverHandle { + msg0: msg0.receiver, + msg1: msg1.receiver, + msg2: msg2.receiver, + msg3: msg3.receiver, + request: request.receiver, + conn_established: conn_established.receiver, + send_amounts: send_amounts.sender, + send_msg1: send_msg1.sender, + send_msg2: send_msg2.sender, + }; + + Ok((driver, handle)) } - pub async fn poll_swarm(mut self) { + pub async fn run(&mut self) { loop { match self.swarm.next().await { OutEvent::ConnectionEstablished(alice) => { - let _ = self.conn_established.sender.send(alice).await; + let _ = self.conn_established.send(alice).await; } OutEvent::Message0(msg) => { - let _ = self.msg0.sender.send(msg).await; + let _ = self.msg0.send(msg).await; } OutEvent::Message1 { msg, channel } => { - let _ = self.msg1.sender.send((msg, channel)).await; + let _ = self.msg1.send((msg, channel)).await; } OutEvent::Message2 { msg, channel } => { - let _ = self.msg2.sender.send((msg, channel)).await; + let _ = self.msg2.send((msg, channel)).await; } OutEvent::Message3(msg) => { - let _ = self.msg3.sender.send(msg).await; + let _ = self.msg3.send(msg).await; } OutEvent::Request(event) => { - let _ = self.request.sender.send(event).await; + let _ = self.request.send(event).await; } }; + + if let Ok((channel, amounts)) = self.send_amounts.try_recv() { + self.swarm.send_amounts(channel, amounts); + } + + if let Ok((channel, msg)) = self.send_msg1.try_recv() { + self.swarm.send_message1(channel, msg); + } + + if let Ok((channel, msg)) = self.send_msg2.try_recv() { + self.swarm.send_message2(channel, msg); + } } } - - pub fn send_amounts(&mut self, channel: ResponseChannel, amounts: SwapAmounts) { - let msg = AliceToBob::Amounts(amounts); - self.swarm.amounts.send(channel, msg); - info!("Sent amounts response"); - } - - pub fn send_message1(&mut self, channel: ResponseChannel, msg: alice::Message1) { - self.swarm.send_message1(channel, msg); - } - - pub fn send_message2(&mut self, channel: ResponseChannel, msg: alice::Message2) { - self.swarm.send_message2(channel, msg); - } - - pub async fn recv_conn_established(&mut self) -> Result { - self.conn_established - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive connection established from Bob")) - } - - pub async fn recv_message0(&mut self) -> Result { - self.msg0 - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Bob")) - } - - pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel)> { - self.msg1 - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Bob")) - } - - pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel)> { - self.msg2 - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed o receive message 2 from Bob")) - } - - pub async fn recv_message3(&mut self) -> Result { - self.msg3.receiver.recv().await.ok_or_else(|| { - anyhow::Error::msg("Failed to receive Bitcoin encrypted signature from Bob") - }) - } - - pub async fn recv_request(&mut self) -> Result { - self.request - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive amounts request from Bob")) - } } diff --git a/swap/src/bob/execution.rs b/swap/src/bob/execution.rs index 8d0babe7..3c0e6adb 100644 --- a/swap/src/bob/execution.rs +++ b/swap/src/bob/execution.rs @@ -1,4 +1,4 @@ -use crate::{bob::swarm_driver::SwarmDriver, SwapAmounts}; +use crate::{bob::swarm_driver::SwarmDriverHandle, SwapAmounts}; use anyhow::Result; use libp2p::core::Multiaddr; use rand::{CryptoRng, RngCore}; @@ -7,8 +7,8 @@ use xmr_btc::bob::State2; pub async fn negotiate( state0: xmr_btc::bob::State0, - _amounts: SwapAmounts, - swarm: &mut SwarmDriver, + amounts: SwapAmounts, + swarm: &mut SwarmDriverHandle, addr: Multiaddr, mut rng: R, bitcoin_wallet: Arc, @@ -16,22 +16,27 @@ pub async fn negotiate( where R: RngCore + CryptoRng + Send, { - swarm.dial_alice(addr)?; + swarm.dial_alice(addr).await?; let alice = swarm.recv_conn_established().await?; - swarm.request_amounts(alice.clone()); - swarm.recv_amounts().await?; + swarm.request_amounts(alice.clone(), amounts.btc).await?; - swarm.send_message0(alice.clone(), state0.next_message(&mut rng)); + swarm + .send_message0(alice.clone(), state0.next_message(&mut rng)) + .await?; let msg0 = swarm.recv_message0().await?; let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?; - swarm.send_message1(alice.clone(), state1.next_message()); + swarm + .send_message1(alice.clone(), state1.next_message()) + .await?; let msg1 = swarm.recv_message1().await?; let state2 = state1.receive(msg1)?; - swarm.send_message2(alice.clone(), state2.next_message()); + swarm + .send_message2(alice.clone(), state2.next_message()) + .await?; Ok(state2) } diff --git a/swap/src/bob/swap.rs b/swap/src/bob/swap.rs index f87bc106..a517ca94 100644 --- a/swap/src/bob/swap.rs +++ b/swap/src/bob/swap.rs @@ -1,5 +1,5 @@ use crate::{ - bob::{execution::negotiate, swarm_driver::SwarmDriver}, + bob::{execution::negotiate, swarm_driver::SwarmDriverHandle}, storage::Database, SwapAmounts, }; @@ -53,7 +53,7 @@ impl fmt::Display for BobState { pub async fn swap( state: BobState, - swarm: SwarmDriver, + swarm: SwarmDriverHandle, db: Database, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -100,7 +100,7 @@ pub fn is_xmr_locked(state: &BobState) -> bool { pub async fn run_until( state: BobState, is_target_state: fn(&BobState) -> bool, - mut swarm: SwarmDriver, + mut swarm: SwarmDriverHandle, db: Database, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -187,7 +187,9 @@ where // What if Alice fails to receive this? Should we always resend? // todo: If we cannot dial Alice we should go to EncSigSent. Maybe dialing // should happen in this arm? - swarm.send_message3(alice_peer_id.clone(), tx_redeem_encsig); + swarm + .send_message3(alice_peer_id.clone(), tx_redeem_encsig) + .await?; run_until( BobState::EncSigSent(state, alice_peer_id), diff --git a/swap/src/bob/swarm_driver.rs b/swap/src/bob/swarm_driver.rs index 69b42c36..70946892 100644 --- a/swap/src/bob/swarm_driver.rs +++ b/swap/src/bob/swarm_driver.rs @@ -1,7 +1,6 @@ use crate::{ bob::{Behaviour, OutEvent}, network::{transport::SwapTransport, TokioExecutor}, - SwapAmounts, }; use anyhow::Result; use libp2p::{core::Multiaddr, PeerId}; @@ -27,17 +26,106 @@ impl Default for Channels { } } +pub struct SwarmDriverHandle { + pub msg0: Receiver, + pub msg1: Receiver, + pub msg2: Receiver, + pub request_amounts: Sender<(PeerId, ::bitcoin::Amount)>, + pub conn_established: Receiver, + pub dial_alice: Sender, + pub send_msg0: Sender<(PeerId, bob::Message0)>, + pub send_msg1: Sender<(PeerId, bob::Message1)>, + pub send_msg2: Sender<(PeerId, bob::Message2)>, + pub send_msg3: Sender<(PeerId, EncryptedSignature)>, +} + +impl SwarmDriverHandle { + pub async fn recv_conn_established(&mut self) -> Result { + self.conn_established + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive connection established from Bob")) + } + + pub async fn recv_message0(&mut self) -> Result { + self.msg0 + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Bob")) + } + + pub async fn recv_message1(&mut self) -> Result { + self.msg1 + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Bob")) + } + + pub async fn recv_message2(&mut self) -> Result { + self.msg2 + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed o receive message 2 from Bob")) + } + + pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { + let _ = self.dial_alice.send(addr).await?; + Ok(()) + } + + pub async fn request_amounts( + &mut self, + peer_id: PeerId, + btc_amount: ::bitcoin::Amount, + ) -> Result<()> { + let _ = self.request_amounts.send((peer_id, btc_amount)).await?; + Ok(()) + } + + pub async fn send_message0(&mut self, peer_id: PeerId, msg: bob::Message0) -> Result<()> { + let _ = self.send_msg0.send((peer_id, msg)).await?; + Ok(()) + } + + pub async fn send_message1(&mut self, peer_id: PeerId, msg: bob::Message1) -> Result<()> { + let _ = self.send_msg1.send((peer_id, msg)).await?; + Ok(()) + } + + pub async fn send_message2(&mut self, peer_id: PeerId, msg: bob::Message2) -> Result<()> { + let _ = self.send_msg2.send((peer_id, msg)).await?; + Ok(()) + } + + pub async fn send_message3( + &mut self, + peer_id: PeerId, + tx_redeem_encsig: EncryptedSignature, + ) -> Result<()> { + let _ = self.send_msg3.send((peer_id, tx_redeem_encsig)).await?; + Ok(()) + } +} + pub struct SwarmDriver { pub swarm: libp2p::Swarm, - pub amounts: Channels, - pub msg0: Channels, - pub msg1: Channels, - pub msg2: Channels, - pub conn_established: Channels, + pub msg0: Sender, + pub msg1: Sender, + pub msg2: Sender, + pub conn_established: Sender, + pub request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>, + pub dial_alice: Receiver, + pub send_msg0: Receiver<(PeerId, bob::Message0)>, + pub send_msg1: Receiver<(PeerId, bob::Message1)>, + pub send_msg2: Receiver<(PeerId, bob::Message2)>, + pub send_msg3: Receiver<(PeerId, EncryptedSignature)>, } impl SwarmDriver { - pub fn new(transport: SwapTransport, behaviour: Behaviour) -> Self { + pub fn new( + transport: SwapTransport, + behaviour: Behaviour, + ) -> Result<(Self, SwarmDriverHandle)> { let local_peer_id = behaviour.peer_id(); let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id) @@ -46,100 +134,89 @@ impl SwarmDriver { })) .build(); - SwarmDriver { + let amounts = Channels::new(); + let msg0 = Channels::new(); + let msg1 = Channels::new(); + let msg2 = Channels::new(); + let conn_established = Channels::new(); + let dial_alice = Channels::new(); + let send_msg0 = Channels::new(); + let send_msg1 = Channels::new(); + let send_msg2 = Channels::new(); + let send_msg3 = Channels::new(); + + let driver = SwarmDriver { swarm, - amounts: Channels::new(), - msg0: Channels::new(), - msg1: Channels::new(), - msg2: Channels::new(), - conn_established: Channels::new(), - } + request_amounts: amounts.receiver, + msg0: msg0.sender, + msg1: msg1.sender, + msg2: msg2.sender, + conn_established: conn_established.sender, + dial_alice: dial_alice.receiver, + send_msg0: send_msg0.receiver, + send_msg1: send_msg1.receiver, + send_msg2: send_msg2.receiver, + send_msg3: send_msg3.receiver, + }; + + let handle = SwarmDriverHandle { + request_amounts: amounts.sender, + msg0: msg0.receiver, + msg1: msg1.receiver, + msg2: msg2.receiver, + conn_established: conn_established.receiver, + dial_alice: dial_alice.sender, + send_msg0: send_msg0.sender, + send_msg1: send_msg1.sender, + send_msg2: send_msg2.sender, + send_msg3: send_msg3.sender, + }; + + Ok((driver, handle)) } - pub async fn poll_swarm(mut self) { + pub async fn run(mut self) { loop { match self.swarm.next().await { OutEvent::ConnectionEstablished(alice) => { - let _ = self.conn_established.sender.send(alice).await; - } - OutEvent::Amounts(amounts) => { - let _ = self.amounts.sender.send(amounts).await; + let _ = self.conn_established.send(alice).await; } + OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"), OutEvent::Message0(msg) => { - let _ = self.msg0.sender.send(msg).await; + let _ = self.msg0.send(msg).await; } OutEvent::Message1(msg) => { - let _ = self.msg1.sender.send(msg).await; + let _ = self.msg1.send(msg).await; } OutEvent::Message2(msg) => { - let _ = self.msg2.sender.send(msg).await; + let _ = self.msg2.send(msg).await; } OutEvent::Message3 => info!("Alice acknowledged message 3 received"), }; + + if let Ok(addr) = self.dial_alice.try_recv() { + libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice"); + } + + if let Ok((peer_id, btc_amount)) = self.request_amounts.try_recv() { + self.swarm.request_amounts(peer_id, btc_amount.as_sat()); + } + + if let Ok((peer_id, msg)) = self.send_msg0.try_recv() { + self.swarm.send_message0(peer_id, msg); + } + + if let Ok((peer_id, msg)) = self.send_msg1.try_recv() { + self.swarm.send_message1(peer_id, msg); + } + + if let Ok((peer_id, msg)) = self.send_msg2.try_recv() { + self.swarm.send_message2(peer_id, msg); + } + + if let Ok((peer_id, tx_redeem_encsig)) = self.send_msg3.try_recv() { + self.swarm.send_message3(peer_id, tx_redeem_encsig); + } } } - - // todo: Remove this - pub fn request_amounts(&mut self, alice_peer_id: PeerId) { - self.swarm.request_amounts(alice_peer_id, 0); - } - - pub fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { - let _ = libp2p::Swarm::dial_addr(&mut self.swarm, addr)?; - Ok(()) - } - - pub fn send_message0(&mut self, peer_id: PeerId, msg: bob::Message0) { - self.swarm.send_message0(peer_id, msg); - } - - pub fn send_message1(&mut self, peer_id: PeerId, msg: bob::Message1) { - self.swarm.send_message1(peer_id, msg); - } - - pub fn send_message2(&mut self, peer_id: PeerId, msg: bob::Message2) { - self.swarm.send_message2(peer_id, msg); - } - - pub fn send_message3(&mut self, peer_id: PeerId, tx_redeem_encsig: EncryptedSignature) { - self.swarm.send_message3(peer_id, tx_redeem_encsig); - } - - pub async fn recv_conn_established(&mut self) -> Result { - self.conn_established.receiver.recv().await.ok_or_else(|| { - anyhow::Error::msg("Failed to receive connection established from Alice") - }) - } - - pub async fn recv_amounts(&mut self) -> Result { - self.amounts - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive amounts from Alice")) - } - - pub async fn recv_message0(&mut self) -> Result { - self.msg0 - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Alice")) - } - - pub async fn recv_message1(&mut self) -> Result { - self.msg1 - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Alice")) - } - - pub async fn recv_message2(&mut self) -> Result { - self.msg2 - .receiver - .recv() - .await - .ok_or_else(|| anyhow::Error::msg("Failed to receive message 2 from Alice")) - } } diff --git a/swap/tests/e2e.rs b/swap/tests/e2e.rs index 13a402c4..f6299268 100644 --- a/swap/tests/e2e.rs +++ b/swap/tests/e2e.rs @@ -46,7 +46,14 @@ async fn happy_path() { .parse() .expect("failed to parse Alice's address"); - let (alice_state, alice_swarm, alice_btc_wallet, alice_xmr_wallet, alice_peer_id) = init_alice( + let ( + alice_state, + mut alice_swarm, + alice_swarm_handle, + alice_btc_wallet, + alice_xmr_wallet, + alice_peer_id, + ) = init_alice( &bitcoind, &monero, btc_to_swap, @@ -57,29 +64,32 @@ async fn happy_path() { ) .await; - let (bob_state, bob_swarm, bob_btc_wallet, bob_xmr_wallet, bob_db) = init_bob( - alice_multiaddr, - alice_peer_id, - &bitcoind, - &monero, - btc_to_swap, - btc_bob, - xmr_to_swap, - xmr_bob, - ) - .await; + let (bob_state, bob_swarm_driver, bob_swarm_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = + init_bob( + alice_multiaddr, + alice_peer_id, + &bitcoind, + &monero, + btc_to_swap, + btc_bob, + xmr_to_swap, + xmr_bob, + ) + .await; - let alice_swap = alice::swap::swap( + let alice_swap_fut = alice::swap::swap( alice_state, - alice_swarm, + alice_swarm_handle, alice_btc_wallet.clone(), alice_xmr_wallet.clone(), Config::regtest(), ); - let bob_swap = bob::swap::swap( + let _alice_swarm_fut = tokio::spawn(async move { alice_swarm.run().await }); + + let bob_swap_fut = bob::swap::swap( bob_state, - bob_swarm, + bob_swarm_handle, bob_db, bob_btc_wallet.clone(), bob_xmr_wallet.clone(), @@ -87,7 +97,9 @@ async fn happy_path() { Uuid::new_v4(), ); - try_join(alice_swap, bob_swap).await.unwrap(); + let _bob_swarm_fut = tokio::spawn(async move { bob_swarm_driver.run().await }); + + try_join(alice_swap_fut, bob_swap_fut).await.unwrap(); let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap(); let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap(); @@ -138,7 +150,14 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { .parse() .expect("failed to parse Alice's address"); - let (alice_state, alice_swarm, alice_btc_wallet, alice_xmr_wallet, alice_peer_id) = init_alice( + let ( + alice_state, + mut alice_swarm, + alice_swarm_handle, + alice_btc_wallet, + alice_xmr_wallet, + alice_peer_id, + ) = init_alice( &bitcoind, &monero, btc_to_swap, @@ -149,22 +168,23 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { ) .await; - let (bob_state, bob_swarm, bob_btc_wallet, bob_xmr_wallet, bob_db) = init_bob( - alice_multiaddr, - alice_peer_id, - &bitcoind, - &monero, - btc_to_swap, - bob_btc_starting_balance, - xmr_to_swap, - bob_xmr_starting_balance, - ) - .await; + let (bob_state, bob_swarm_driver, bob_swarm_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = + init_bob( + alice_multiaddr, + alice_peer_id, + &bitcoind, + &monero, + btc_to_swap, + bob_btc_starting_balance, + xmr_to_swap, + bob_xmr_starting_balance, + ) + .await; let bob_xmr_locked_fut = bob::swap::run_until( bob_state, bob::swap::is_xmr_locked, - bob_swarm, + bob_swarm_handle, bob_db, bob_btc_wallet.clone(), bob_xmr_wallet.clone(), @@ -172,14 +192,18 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { Uuid::new_v4(), ); + let _bob_swarm_fut = tokio::spawn(async move { bob_swarm_driver.run().await }); + let alice_fut = alice::swap::swap( alice_state, - alice_swarm, + alice_swarm_handle, alice_btc_wallet.clone(), alice_xmr_wallet.clone(), Config::regtest(), ); + let _alice_swarm_fut = tokio::spawn(async move { alice_swarm.run().await }); + // Wait until alice has locked xmr and bob h as locked btc let ((alice_state, _), _bob_state) = try_join(alice_fut, bob_xmr_locked_fut).await.unwrap(); @@ -200,6 +224,7 @@ async fn init_alice( ) -> ( AliceState, alice::swarm_driver::SwarmDriver, + alice::swarm_driver::SwarmDriverHandle, Arc, Arc, PeerId, @@ -240,13 +265,13 @@ async fn init_alice( } }; - let alice_swarm = - alice::swarm_driver::SwarmDriver::new(alice_transport, alice_behaviour, listen) - .expect("Could not init alice"); + let (swarm_driver, handle) = + alice::swarm_driver::SwarmDriver::new(alice_transport, alice_behaviour, listen).unwrap(); ( alice_state, - alice_swarm, + swarm_driver, + handle, alice_btc_wallet, alice_xmr_wallet, alice_peer_id, @@ -266,6 +291,7 @@ async fn init_bob( ) -> ( BobState, bob::swarm_driver::SwarmDriver, + bob::swarm_driver::SwarmDriverHandle, Arc, Arc, Database, @@ -316,7 +342,15 @@ async fn init_bob( addr: alice_multiaddr, }; - let bob_swarm = bob::swarm_driver::SwarmDriver::new(bob_transport, bob_behaviour); + let (swarm_driver, swarm_handle) = + bob::swarm_driver::SwarmDriver::new(bob_transport, bob_behaviour).unwrap(); - (bob_state, bob_swarm, bob_btc_wallet, bob_xmr_wallet, bob_db) + ( + bob_state, + swarm_driver, + swarm_handle, + bob_btc_wallet, + bob_xmr_wallet, + bob_db, + ) }