diff --git a/swap/src/bin/swap_cli.rs b/swap/src/bin/swap_cli.rs index 13a2725c..653c5800 100644 --- a/swap/src/bin/swap_cli.rs +++ b/swap/src/bin/swap_cli.rs @@ -118,8 +118,16 @@ async fn main() -> Result<()> { ); let (swap, event_loop) = bob_factory.with_init_params(send_bitcoin).build().await?; - tokio::spawn(async move { event_loop.run().await }); - bob::run(swap).await?; + let handle = tokio::spawn(async move { event_loop.run().await }); + let swap = bob::run(swap); + tokio::select! { + event_loop_result = handle => { + event_loop_result??; + }, + swap_result = swap => { + swap_result?; + } + } } Command::History => { let mut table = Table::new(); @@ -159,9 +167,16 @@ async fn main() -> Result<()> { execution_params, ); let (swap, event_loop) = bob_factory.build().await?; - - tokio::spawn(async move { event_loop.run().await }); - bob::run(swap).await?; + let handle = tokio::spawn(async move { event_loop.run().await }); + let swap = bob::run(swap); + tokio::select! { + event_loop_result = handle => { + event_loop_result??; + }, + swap_result = swap => { + swap_result?; + } + } } Command::Cancel(Cancel::BuyXmr { swap_id, @@ -191,27 +206,33 @@ async fn main() -> Result<()> { execution_params, ); let (swap, event_loop) = bob_factory.build().await?; + let handle = tokio::spawn(async move { event_loop.run().await }); - tokio::spawn(async move { event_loop.run().await }); - - match bob::cancel( + let cancel = bob::cancel( swap.swap_id, swap.state, swap.bitcoin_wallet, swap.db, force, - ) - .await? - { - Ok((txid, _)) => { - info!("Cancel transaction successfully published with id {}", txid) - } - Err(CancelError::CancelTimelockNotExpiredYet) => error!( - "The Cancel Transaction cannot be published yet, \ - because the timelock has not expired. Please try again later." - ), - Err(CancelError::CancelTxAlreadyPublished) => { - warn!("The Cancel Transaction has already been published.") + ); + + tokio::select! { + event_loop_result = handle => { + event_loop_result??; + }, + cancel_result = cancel => { + match cancel_result? { + Ok((txid, _)) => { + info!("Cancel transaction successfully published with id {}", txid) + } + Err(CancelError::CancelTimelockNotExpiredYet) => error!( + "The Cancel Transaction cannot be published yet, \ + because the timelock has not expired. Please try again later." + ), + Err(CancelError::CancelTxAlreadyPublished) => { + warn!("The Cancel Transaction has already been published.") + } + } } } } @@ -244,19 +265,26 @@ async fn main() -> Result<()> { ); let (swap, event_loop) = bob_factory.build().await?; - tokio::spawn(async move { event_loop.run().await }); - bob::refund( + let handle = tokio::spawn(async move { event_loop.run().await }); + let refund = bob::refund( swap.swap_id, swap.state, swap.execution_params, swap.bitcoin_wallet, swap.db, force, - ) - .await??; + ); + + tokio::select! { + event_loop_result = handle => { + event_loop_result??; + }, + refund_result = refund => { + refund_result??; + } + } } }; - Ok(()) } diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 5ca668bd..4a3e8d38 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -225,7 +225,9 @@ impl From for OutEvent { channel, }, AckSent => OutEvent::ResponseSent, - Failure(err) => OutEvent::CommunicationError(err.context("Failure with Transfer Proof")), + Failure(err) => { + OutEvent::CommunicationError(err.context("Failure with Transfer Proof")) + } } } } @@ -235,7 +237,9 @@ impl From for OutEvent { use encrypted_signature::OutEvent::*; match event { Acknowledged => OutEvent::EncryptedSignatureAcknowledged, - Failure(err) => OutEvent::CommunicationError(err.context("Failure with Encrypted Signature")), + Failure(err) => { + OutEvent::CommunicationError(err.context("Failure with Encrypted Signature")) + } } } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 23ca57e1..57a99675 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -7,10 +7,10 @@ use crate::{ bob::{Behaviour, OutEvent, QuoteRequest, State0, State2}, }, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use futures::FutureExt; use libp2p::{core::Multiaddr, PeerId}; -use std::sync::Arc; +use std::{convert::Infallible, sync::Arc}; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, error, info}; @@ -167,7 +167,7 @@ impl EventLoop { Ok((event_loop, handle)) } - pub async fn run(mut self) { + pub async fn run(mut self) -> Result { loop { tokio::select! { swarm_event = self.swarm.next().fuse() => { @@ -193,7 +193,7 @@ impl EventLoop { } OutEvent::ResponseSent => {} OutEvent::CommunicationError(err) => { - error!("Communication error: {:#}", err) + bail!("Communication error: {:#}", err) } } }, diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 646a3cc4..7619e18b 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -9,6 +9,7 @@ use get_port::get_port; use libp2p::{core::Multiaddr, PeerId}; use monero_harness::{image, Monero}; use std::{ + convert::Infallible, path::{Path, PathBuf}, sync::Arc, time::Duration, @@ -67,7 +68,7 @@ impl BobParams { } } -pub struct BobEventLoopJoinHandle(JoinHandle<()>); +pub struct BobEventLoopJoinHandle(JoinHandle>); impl BobEventLoopJoinHandle { pub fn abort(&self) {