diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 5574d1a5..da9a3f0e 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -27,7 +27,7 @@ use swap::database::Database; use swap::execution_params::{ExecutionParams, GetExecutionParams}; use swap::fs::default_config_path; use swap::monero::Amount; -use swap::protocol::alice::EventLoop; +use swap::protocol::alice::{run, EventLoop}; use swap::seed::Seed; use swap::trace::init_tracing; use swap::{bitcoin, execution_params, kraken, monero}; @@ -95,7 +95,7 @@ async fn main() -> Result<()> { let kraken_rate_updates = kraken::connect()?; - let (event_loop, _) = EventLoop::new( + let (event_loop, mut swap_receiver) = EventLoop::new( config.network.listen, seed, execution_params, @@ -107,6 +107,22 @@ async fn main() -> Result<()> { ) .unwrap(); + tokio::spawn(async move { + while let Some(swap) = swap_receiver.recv().await { + tokio::spawn(async move { + let swap_id = swap.swap_id; + match run(swap).await { + Ok(state) => { + tracing::debug!(%swap_id, "Swap finished with state {}", state) + } + Err(e) => { + tracing::error!(%swap_id, "Swap failed with {:#}", e) + } + } + }); + } + }); + info!("Our peer id is {}", event_loop.peer_id()); event_loop.run().await; diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index ad8b7c1f..84da64d9 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -4,20 +4,17 @@ use crate::execution_params::ExecutionParams; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; use crate::network::{spot_price, transport, TokioExecutor}; -use crate::protocol::alice; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof}; use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; -use futures::future::RemoteHandle; use libp2p::core::Multiaddr; use libp2p::futures::FutureExt; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; use std::convert::Infallible; use std::sync::Arc; -use tokio::sync::mpsc::error::SendError; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, trace}; use uuid::Uuid; @@ -39,7 +36,7 @@ pub struct EventLoop { // Only used to produce new handles send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>, - swap_handle_sender: mpsc::Sender>>, + swap_sender: mpsc::Sender, } #[derive(Debug)] @@ -62,7 +59,7 @@ where db: Arc, latest_rate: LR, max_buy: bitcoin::Amount, - ) -> Result<(Self, mpsc::Receiver>>)> { + ) -> Result<(Self, mpsc::Receiver)> { let identity = seed.derive_libp2p_identity(); let behaviour = Behaviour::default(); let transport = transport::build(&identity)?; @@ -79,7 +76,7 @@ where let recv_encrypted_signature = BroadcastChannels::default(); let send_transfer_proof = MpscChannels::default(); - let swap_handle = MpscChannels::default(); + let swap_channel = MpscChannels::default(); let event_loop = EventLoop { swarm, @@ -92,10 +89,10 @@ where recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof_sender: send_transfer_proof.sender, - swap_handle_sender: swap_handle.sender, + swap_sender: swap_channel.sender, max_buy, }; - Ok((event_loop, swap_handle.receiver)) + Ok((event_loop, swap_channel.receiver)) } pub fn new_handle(&self) -> EventLoopHandle { @@ -231,11 +228,7 @@ where }) } - async fn handle_execution_setup_done( - &mut self, - bob_peer_id: PeerId, - state3: State3, - ) -> Result<()> { + async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { let swap_id = Uuid::new_v4(); let handle = self.new_handle(); @@ -254,18 +247,9 @@ where swap_id, }; - let (swap, swap_handle) = alice::run(swap).remote_handle(); - tokio::spawn(swap); - - // For testing purposes the handle is currently sent via a channel so we can - // await it. If a remote handle is dropped, the future of the swap is - // also stopped. If we error upon sending the handle through the channel - // we have to call forget to detach the handle from the swap future. - if let Err(SendError(handle)) = self.swap_handle_sender.send(swap_handle).await { - handle.forget(); + if let Err(error) = self.swap_sender.send(swap).await { + tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error); } - - Ok(()) } } diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs index 1f464adb..240482bc 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs @@ -1,17 +1,20 @@ pub mod testutils; -use swap::protocol::bob; use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; use testutils::bob_run_until::is_btc_locked; use testutils::FastCancelConfig; #[tokio::test] async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { testutils::setup_test(FastCancelConfig, |mut ctx| async move { - let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); - let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); + let alice_swap = ctx.alice_next_swap().await; + let _ = tokio::spawn(alice::run(alice_swap)); + let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; @@ -20,8 +23,7 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { if let BobState::BtcLocked(state3) = bob_swap.state.clone() { state3 .wait_for_cancel_timelock_to_expire(bob_swap.bitcoin_wallet.as_ref()) - .await - .unwrap(); + .await?; } else { panic!("Bob in unexpected state {}", bob_swap.state); } @@ -35,9 +37,7 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { bob_swap.db, false, ) - .await - .unwrap() - .unwrap(); + .await??; assert!(matches!(state, BobState::BtcCancelled { .. })); let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; @@ -53,11 +53,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { bob_swap.db, false, ) - .await - .unwrap() - .unwrap(); + .await??; ctx.assert_bob_refunded(bob_state).await; + + Ok(()) }) - .await; + .await } diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs index d5f23aad..80256941 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired.rs @@ -1,17 +1,21 @@ pub mod testutils; use bob::cancel::Error; -use swap::protocol::bob; use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; use testutils::bob_run_until::is_btc_locked; use testutils::SlowCancelConfig; #[tokio::test] async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); - let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); + let alice_swap = ctx.alice_next_swap().await; + let _ = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; @@ -25,8 +29,7 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { bob_swap.db, false, ) - .await - .unwrap() + .await? .err() .unwrap(); @@ -44,13 +47,14 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { bob_swap.db, false, ) - .await - .unwrap() + .await? .err() .unwrap(); let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); + + Ok(()) }) .await; } diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs index 9ae6bf38..6f659b9e 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force.rs @@ -1,16 +1,20 @@ pub mod testutils; -use swap::protocol::bob; use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; use testutils::bob_run_until::is_btc_locked; use testutils::SlowCancelConfig; #[tokio::test] async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); - let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); + let alice_swap = ctx.alice_next_swap().await; + let _ = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::BtcLocked { .. })); let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; @@ -47,6 +51,8 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { assert!(is_error); let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); + + Ok(()) }) .await; } diff --git a/swap/tests/happy_path.rs b/swap/tests/happy_path.rs index 8ba5e903..fb665e73 100644 --- a/swap/tests/happy_path.rs +++ b/swap/tests/happy_path.rs @@ -1,19 +1,26 @@ pub mod testutils; -use swap::protocol::bob; +use swap::protocol::{alice, bob}; use testutils::SlowCancelConfig; +use tokio::join; /// Run the following tests with RUST_MIN_STACK=10000000 #[tokio::test] async fn happy_path() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (bob_swap, _) = ctx.new_swap_as_bob().await; + let (bob_swap, _) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run(bob_swap)); - let bob_state = bob::run(bob_swap).await; + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); - ctx.assert_alice_redeemed().await; - ctx.assert_bob_redeemed(bob_state.unwrap()).await; + let (bob_state, alice_state) = join!(bob_swap, alice_swap); + + ctx.assert_alice_redeemed(alice_state??).await; + ctx.assert_bob_redeemed(bob_state??).await; + + Ok(()) }) .await; } diff --git a/swap/tests/happy_path_restart_bob_before_comm.rs b/swap/tests/happy_path_restart_bob_before_comm.rs index 49b9c73b..6abba569 100644 --- a/swap/tests/happy_path_restart_bob_before_comm.rs +++ b/swap/tests/happy_path_restart_bob_before_comm.rs @@ -1,27 +1,34 @@ pub mod testutils; -use swap::protocol::bob; use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; use testutils::bob_run_until::is_xmr_locked; use testutils::SlowCancelConfig; #[tokio::test] async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); - let bob_state = bob::run_until(bob_swap, is_xmr_locked).await.unwrap(); + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; assert!(matches!(bob_state, BobState::XmrLocked { .. })); let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); - let bob_state = bob::run(bob_swap).await.unwrap(); + let bob_state = bob::run(bob_swap).await?; ctx.assert_bob_redeemed(bob_state).await; - ctx.assert_alice_redeemed().await; + let alice_state = alice_swap.await??; + ctx.assert_alice_redeemed(alice_state).await; + + Ok(()) }) .await; } diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 58efebe3..0f80fb69 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -4,7 +4,6 @@ mod electrs; use crate::testutils; use anyhow::{Context, Result}; use bitcoin_harness::{BitcoindRpcApi, Client}; -use futures::future::RemoteHandle; use futures::Future; use get_port::get_port; use libp2p::core::Multiaddr; @@ -18,7 +17,7 @@ use swap::asb::FixedRate; use swap::bitcoin::{CancelTimelock, PunishTimelock}; use swap::database::Database; use swap::execution_params::{ExecutionParams, GetExecutionParams}; -use swap::protocol::alice::AliceState; +use swap::protocol::alice::{AliceState, Swap}; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; use swap::seed::Seed; @@ -98,7 +97,7 @@ pub struct TestContext { alice_starting_balances: StartingBalances, alice_bitcoin_wallet: Arc, alice_monero_wallet: Arc, - alice_swap_handle: mpsc::Receiver>>, + alice_swap_handle: mpsc::Receiver, bob_params: BobParams, bob_starting_balances: StartingBalances, @@ -107,7 +106,11 @@ pub struct TestContext { } impl TestContext { - pub async fn new_swap_as_bob(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) { + pub async fn alice_next_swap(&mut self) -> alice::Swap { + self.alice_swap_handle.recv().await.unwrap() + } + + pub async fn bob_swap(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) { let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); let swap = self @@ -145,10 +148,7 @@ impl TestContext { (swap, BobEventLoopJoinHandle(join_handle)) } - pub async fn assert_alice_redeemed(&mut self) { - let swap_handle = self.alice_swap_handle.recv().await.unwrap(); - let state = swap_handle.await.unwrap(); - + pub async fn assert_alice_redeemed(&mut self, state: AliceState) { assert!(matches!(state, AliceState::BtcRedeemed)); self.alice_bitcoin_wallet.sync().await.unwrap(); @@ -175,10 +175,7 @@ impl TestContext { ); } - pub async fn assert_alice_refunded(&mut self) { - let swap_handle = self.alice_swap_handle.recv().await.unwrap(); - let state = swap_handle.await.unwrap(); - + pub async fn assert_alice_refunded(&mut self, state: AliceState) { assert!(matches!(state, AliceState::XmrRefunded)); self.alice_bitcoin_wallet.sync().await.unwrap(); @@ -313,7 +310,7 @@ impl TestContext { pub async fn setup_test(_config: C, testfn: T) where T: Fn(TestContext) -> F, - F: Future, + F: Future>, C: GetExecutionParams, { let cli = Cli::default(); @@ -426,7 +423,7 @@ where bob_monero_wallet, }; - testfn(test).await; + testfn(test).await.unwrap() } fn random_prefix() -> String {