diff --git a/swap/src/bin/nectar.rs b/swap/src/bin/nectar.rs index f91b7932..3b649a1f 100644 --- a/swap/src/bin/nectar.rs +++ b/swap/src/bin/nectar.rs @@ -82,7 +82,7 @@ async fn main() -> Result<()> { let (bitcoin_wallet, monero_wallet) = init_wallets(config.clone()).await?; - let mut event_loop = EventLoop::new( + let (mut event_loop, _) = EventLoop::new( config.network.listen, seed, execution_params, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 08576161..159c046f 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -6,20 +6,23 @@ use crate::{ network::{transport, TokioExecutor}, protocol::{ alice, - alice::{Behaviour, Builder, OutEvent, QuoteResponse, State0, State3, TransferProof}, + alice::{ + AliceState, Behaviour, Builder, OutEvent, QuoteResponse, State0, State3, TransferProof, + }, bob::{EncryptedSignature, QuoteRequest}, SwapAmounts, }, seed::Seed, }; use anyhow::{anyhow, Context, Result}; +use futures::future::RemoteHandle; use libp2p::{ core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, }; use rand::rngs::OsRng; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{broadcast, mpsc}; -use tracing::{debug, error, trace}; +use tracing::{debug, error, trace, warn}; use uuid::Uuid; // TODO: Use dynamic @@ -95,6 +98,8 @@ pub struct EventLoop { // Only used to produce new handles send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>, + + swap_handle_sender: mpsc::Sender>>, } impl EventLoop { @@ -105,7 +110,7 @@ impl EventLoop { bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - ) -> Result { + ) -> Result<(Self, mpsc::Receiver>>)> { let identity = network::Seed::new(seed).derive_libp2p_identity(); let behaviour = Behaviour::default(); let transport = transport::build(&identity)?; @@ -122,8 +127,9 @@ impl EventLoop { let recv_encrypted_signature = BroadcastChannels::default(); let send_transfer_proof = MpscChannels::default(); + let swap_handle = MpscChannels::default(); - Ok(EventLoop { + let event_loop = EventLoop { swarm, peer_id, execution_params, @@ -135,7 +141,9 @@ impl EventLoop { recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof_sender: send_transfer_proof.sender, - }) + swap_handle_sender: swap_handle.sender, + }; + Ok((event_loop, swap_handle.receiver)) } pub fn new_handle(&self) -> EventLoopHandle { @@ -257,7 +265,14 @@ impl EventLoop { .build() .await?; - tokio::spawn(async move { alice::run(swap).await }); + let (remote, remote_handle) = alice::run(swap).remote_handle(); + tokio::spawn(async move { remote.await }); + + let _ = self + .swap_handle_sender + .send(remote_handle) + .await + .map_err(|err| warn!("Could not send swap handle over channel: {:?}", err)); Ok(()) } diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index b18bf57f..444f4b9c 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -1,6 +1,6 @@ use crate::testutils; use bitcoin_harness::Bitcoind; -use futures::Future; +use futures::{future::RemoteHandle, Future}; use get_port::get_port; use libp2p::{core::Multiaddr, PeerId}; use monero_harness::{image, Monero}; @@ -8,7 +8,7 @@ use std::{path::PathBuf, sync::Arc}; use swap::{ bitcoin, bitcoin::Timelock, - database::{Database, Swap}, + database::Database, execution_params, execution_params::{ExecutionParams, GetExecutionParams}, monero, @@ -17,7 +17,7 @@ use swap::{ }; use tempfile::tempdir; use testcontainers::{clients::Cli, Container}; -use tokio::task::JoinHandle; +use tokio::{sync::mpsc, task::JoinHandle}; use tracing_core::dispatcher::DefaultGuard; use tracing_log::LogTracer; use uuid::Uuid; @@ -28,13 +28,6 @@ pub struct StartingBalances { pub btc: bitcoin::Amount, } -struct AliceParams { - seed: Seed, - execution_params: ExecutionParams, - db: Arc, - listen_address: Multiaddr, -} - #[derive(Debug, Clone)] struct BobParams { seed: Seed, @@ -75,10 +68,10 @@ pub struct AliceEventLoopJoinHandle(JoinHandle<()>); pub struct TestContext { swap_amounts: SwapAmounts, - alice_params: AliceParams, alice_starting_balances: StartingBalances, alice_bitcoin_wallet: Arc, alice_monero_wallet: Arc, + alice_swap_handle: mpsc::Receiver>>, bob_params: BobParams, bob_starting_balances: StartingBalances, @@ -114,16 +107,9 @@ impl TestContext { (swap, BobEventLoopJoinHandle(join_handle)) } - pub async fn assert_alice_redeemed(&self) { - let mut states = self.alice_params.db.all().unwrap(); - - assert_eq!(states.len(), 1, "Expected only one swap in Alice's db"); - - let (_swap_id, state) = states.pop().unwrap(); - let state = match state { - Swap::Alice(state) => state.into(), - Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"), - }; + pub async fn assert_alice_redeemed(&mut self) { + let swap_handle = self.alice_swap_handle.recv().await.unwrap(); + let state = swap_handle.await.unwrap(); assert!(matches!(state, AliceState::BtcRedeemed)); @@ -143,16 +129,9 @@ impl TestContext { assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.swap_amounts.xmr); } - pub async fn assert_alice_refunded(&self) { - let mut states = self.alice_params.db.all().unwrap(); - - assert_eq!(states.len(), 1, "Expected only one swap in Alice's db"); - - let (_swap_id, state) = states.pop().unwrap(); - let state = match state { - Swap::Alice(state) => state.into(), - Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"), - }; + pub async fn assert_alice_refunded(&mut self) { + let swap_handle = self.alice_swap_handle.recv().await.unwrap(); + let state = swap_handle.await.unwrap(); assert!( matches!(state, AliceState::XmrRefunded), @@ -313,7 +292,7 @@ where let port = get_port().expect("Failed to find a free port"); - let listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port) + let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port) .parse() .expect("failed to parse Alice's address"); @@ -328,12 +307,7 @@ where let db_path = tempdir().unwrap(); let alice_db = Arc::new(Database::open(db_path.path()).unwrap()); - let alice_params = AliceParams { - seed: Seed::random().unwrap(), - execution_params, - db: alice_db.clone(), - listen_address, - }; + let alice_seed = Seed::random().unwrap(); let bob_starting_balances = StartingBalances { xmr: monero::Amount::ZERO, @@ -348,10 +322,10 @@ where ) .await; - let mut alice_event_loop = alice::EventLoop::new( - alice_params.listen_address.clone(), - alice_params.seed, - alice_params.execution_params, + let (mut alice_event_loop, alice_swap_handle) = alice::EventLoop::new( + alice_listen_address.clone(), + alice_seed, + execution_params, alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), alice_db, @@ -370,17 +344,17 @@ where swap_id: Uuid::new_v4(), bitcoin_wallet: bob_bitcoin_wallet.clone(), monero_wallet: bob_monero_wallet.clone(), - alice_address: alice_params.listen_address.clone(), + alice_address: alice_listen_address, alice_peer_id, execution_params, }; let test = TestContext { swap_amounts, - alice_params, alice_starting_balances, alice_bitcoin_wallet, alice_monero_wallet, + alice_swap_handle, bob_params, bob_starting_balances, bob_bitcoin_wallet,