From 849e6e7a147bf8707d2f244c716a83a6ad2db7cc Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Fri, 11 Aug 2023 11:53:07 +0200 Subject: [PATCH] Disallow concurrent swaps --- swap/src/api/request.rs | 54 +++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index d7a58aa1..00ba2440 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -22,6 +22,11 @@ use structopt::lazy_static::lazy_static; use tokio::sync::broadcast::Receiver; use tracing::{debug_span, Instrument}; use uuid::Uuid; +use tokio::sync::Mutex; + +lazy_static! { + static ref SWAP_MUTEX: Mutex> = Mutex::new(None); +} #[derive(PartialEq, Debug)] pub struct Request { @@ -161,7 +166,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, behaviour, ) - .await?; + .await?; swarm.behaviour_mut().add_address(seller_peer_id, seller); tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); @@ -182,7 +187,7 @@ impl Request { || bitcoin_wallet.sync(), estimate_fee, ) - .await + .await { Ok(val) => val, Err(error) => match error.downcast::() { @@ -382,7 +387,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, behaviour, ) - .await?; + .await?; let our_peer_id = swarm.local_peer_id(); tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); @@ -417,7 +422,7 @@ impl Request { event_loop_handle, monero_receive_address, ) - .await?; + .await?; tokio::select! { event_loop_result = handle => { @@ -442,7 +447,7 @@ impl Request { Arc::clone(bitcoin_wallet), Arc::clone(&context.db), ) - .await?; + .await?; Ok(json!({ "result": state, @@ -470,7 +475,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, identity, ) - .await?; + .await?; for seller in &sellers { match seller.status { @@ -558,10 +563,26 @@ impl Request { pub async fn call(self, context: Arc) -> Result { // If the swap ID is set, we add it to the span let call_span = debug_span!( - "call", + "cmd", method = ?self.cmd, ); + if let Some(swap_id) = self.has_lockable_swap_id() { + println!("taking lock for swap_id: {}", swap_id); + let mut guard = SWAP_MUTEX.try_lock().context("Another swap is already running")?; + if guard.is_some() { + bail!("Another swap is already running"); + } + + let _ = guard.insert(swap_id.clone()); + + let result = self.handle_cmd(context).instrument(call_span).await; + guard.take(); + + println!("releasing lock for swap_id: {}", swap_id); + + return result; + } self.handle_cmd(context).instrument(call_span).await } } @@ -584,15 +605,15 @@ pub async fn determine_btc_to_swap( sync: FS, estimate_fee: FFE, ) -> Result<(bitcoin::Amount, bitcoin::Amount)> -where - TB: Future>, - FB: Fn() -> TB, - TMG: Future>, - FMG: Fn() -> TMG, - TS: Future>, - FS: Fn() -> TS, - FFE: Fn(bitcoin::Amount) -> TFE, - TFE: Future>, + where + TB: Future>, + FB: Fn() -> TB, + TMG: Future>, + FMG: Fn() -> TMG, + TS: Future>, + FS: Fn() -> TS, + FFE: Fn(bitcoin::Amount) -> TFE, + TFE: Future>, { tracing::debug!("Requesting quote"); let bid_quote = bid_quote.await?; @@ -667,3 +688,4 @@ where Ok((btc_swap_amount, fees)) } +