diff --git a/swap/src/api.rs b/swap/src/api.rs index b9f81437..6dbf14f2 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -7,12 +7,12 @@ use crate::network::rendezvous::XmrBtcNamespace; use crate::protocol::Database; use crate::seed::Seed; use crate::{bitcoin, cli, monero}; -use anyhow::{Context as AnyContext, Result}; +use anyhow::{bail, Context as AnyContext, Error, Result}; use std::fmt; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Once}; -use tokio::sync::{broadcast, Mutex, broadcast::Receiver, broadcast::Sender}; +use tokio::sync::{broadcast, broadcast::Receiver, broadcast::Sender, RwLock}; use url::Url; static START: Once = Once::new(); @@ -30,43 +30,72 @@ pub struct Config { pub is_testnet: bool, } -impl Shutdown { - pub fn new(listen: Receiver<()>) -> Shutdown { - let (notify, _) = broadcast::channel(16); - Shutdown { - shutdown: Mutex::new(false), - listen: Mutex::new(listen), - notify +use uuid::Uuid; + +pub struct SwapLock { + current_swap: RwLock>, + _suspension_rec: Receiver<()>, + suspension_trigger: Sender<()>, +} + +impl SwapLock { + pub fn new() -> Self { + let (suspension_trigger, _suspension_rec) = broadcast::channel(10); + SwapLock { + current_swap: RwLock::new(None), + _suspension_rec, + suspension_trigger, } } - /// Receive the shutdown notice, waiting if necessary. - pub async fn recv(&self) { - // If the shutdown signal has already been received, then return - // immediately. - let mut guard_shutdown = self.shutdown.lock().await; - if *guard_shutdown { - return; + pub async fn listen_for_swap_force_suspension(&self) -> Result<(), Error> { + let mut listener = self.suspension_trigger.subscribe(); + let event = listener.recv().await; + match event { + Ok(_) => Ok(()), + Err(e) => { + tracing::error!("Error receiving swap suspension signal: {}", e); + bail!(e) + } + } + } + + pub async fn acquire_swap_lock(&self, swap_id: Uuid) -> Result<(), Error> { + let mut current_swap = self.current_swap.write().await; + if current_swap.is_some() { + bail!("There already exists an active swap lock"); } - let _ = self.listen.lock().await.recv().await; + tracing::debug!(swap_id = %swap_id, "Acquiring swap lock"); + *current_swap = Some(swap_id); + Ok(()) + } - // Remember that the signal has been received. - *guard_shutdown = true; + pub async fn get_current_swap_id(&self) -> Option { + let current_swap = self.current_swap.read().await.clone(); + current_swap + } - // Send shutdown request to child tasks + pub async fn send_suspend_signal(&self) -> Result<(), Error> { + let _ = self.suspension_trigger.send(())?; + Ok(()) + } + pub async fn release_swap_lock(&self) -> Result { + let mut current_swap = self.current_swap.write().await; + if let Some(swap_id) = current_swap.as_ref() { + tracing::debug!(swap_id = %swap_id, "Releasing swap lock"); + + let prev_swap_id = swap_id.clone(); + *current_swap = None; + drop(current_swap); + Ok(prev_swap_id) + } else { + bail!("There is no current swap lock to release"); + } } } -#[derive(Debug)] -pub struct Shutdown { - shutdown: Mutex, - listen: Mutex>, - notify: Sender<()>, -} - - // workaround for warning over monero_rpc_process which we must own but not read #[allow(dead_code)] pub struct Context { @@ -74,8 +103,8 @@ pub struct Context { bitcoin_wallet: Option>, monero_wallet: Option>, monero_rpc_process: Option, + swap_lock: Arc, pub config: Config, - pub shutdown: Arc, } impl Context { @@ -88,7 +117,6 @@ impl Context { debug: bool, json: bool, server_address: Option, - sender: broadcast::Sender<()>, ) -> Result { let data_dir = data::data_dir_from(data, is_testnet)?; let env_config = env_config_from(is_testnet); @@ -148,7 +176,7 @@ impl Context { is_testnet, data_dir, }, - shutdown: Arc::new(Shutdown::new(sender.subscribe())), + swap_lock: Arc::new(SwapLock::new()), }; Ok(context) @@ -239,7 +267,7 @@ fn env_config_from(testnet: bool) -> EnvConfig { #[cfg(test)] pub mod api_test { use super::*; - use crate::api::request::{Method, Params, Request}; + use crate::api::request::{Method, Request}; use libp2p::Multiaddr; use std::str::FromStr; diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index 0c47b36c..ba16cf1c 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -19,21 +19,14 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use structopt::lazy_static::lazy_static; use tracing::{debug_span, Instrument}; use uuid::Uuid; -use tokio::sync::RwLock; - -lazy_static! { - static ref SWAP_LOCK: RwLock> = RwLock::new(None); -} #[derive(PartialEq, Debug)] pub struct Request { - pub cmd: Method + pub cmd: Method, } - #[derive(Debug, PartialEq)] pub enum Method { BuyXmr { @@ -70,26 +63,25 @@ pub enum Method { GetSwapInfo { swap_id: Uuid, }, + SuspendCurrentSwap, } impl Request { pub fn new(cmd: Method) -> Request { - Request { - cmd - } - } - - fn has_lockable_swap_id(&self) -> Option { - match self.cmd { - Method::BuyXmr { swap_id, .. } - | Method::Resume { swap_id } - | Method::CancelAndRefund { swap_id } => Some(swap_id), - _ => None, - } + Request { cmd } } async fn handle_cmd(self, context: Arc) -> Result { match self.cmd { + Method::SuspendCurrentSwap => { + context.swap_lock.send_suspend_signal().await?; + let swap_id = context.swap_lock.get_current_swap_id().await; + + Ok(json!({ + "swapId": swap_id, + "success": true + })) + } Method::GetSwapInfo { swap_id } => { let bitcoin_wallet = context .bitcoin_wallet @@ -157,125 +149,139 @@ impl Request { monero_receive_address, swap_id, } => { - let seed = context.config.seed.as_ref().context("Could not get seed")?; - let env_config = context.config.env_config; - let btc = context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?; + context.swap_lock.acquire_swap_lock(swap_id).await?; - let bitcoin_wallet = btc; - let seller_peer_id = seller - .extract_peer_id() - .context("Seller address must contain peer ID")?; - context - .db - .insert_address(seller_peer_id, seller.clone()) - .await?; - - let behaviour = cli::Behaviour::new( - seller_peer_id, - env_config, - bitcoin_wallet.clone(), - (seed.derive_libp2p_identity(), context.config.namespace), - ); - let mut swarm = swarm::cli( - seed.derive_libp2p_identity(), - context - .config - .tor_socks5_port - .context("Could not get Tor SOCKS5 port")?, - behaviour, - ) - .await?; - swarm.behaviour_mut().add_address(seller_peer_id, seller); - - tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); - - let (event_loop, mut event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; - let event_loop = tokio::spawn(event_loop.run()); - - let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size()); - let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount); - - let (amount, fees) = match determine_btc_to_swap( - context.config.json, - event_loop_handle.request_quote(), - bitcoin_wallet.new_address(), - || bitcoin_wallet.balance(), - max_givable, - || bitcoin_wallet.sync(), - estimate_fee, - ) - .await - { - Ok(val) => val, - Err(error) => match error.downcast::() { - Ok(_) => { - bail!("Seller's XMR balance is currently too low to initiate a swap, please try again later") - } - Err(other) => bail!(other), - }, - }; - - tracing::info!(%amount, %fees, "Determined swap amount"); - - context.db.insert_peer_id(swap_id, seller_peer_id).await?; - - context - .db - .insert_monero_address(swap_id, monero_receive_address) - .await?; - let monero_wallet = context - .monero_wallet - .as_ref() - .context("Could not get Monero wallet")?; - - let swap = Swap::new( - Arc::clone(&context.db), - swap_id, - Arc::clone(bitcoin_wallet), - Arc::clone(monero_wallet), - env_config, - event_loop_handle, - monero_receive_address, - bitcoin_change_address, - amount, - ); - let mut halt = context.shutdown.notify.subscribe(); - - // execution will halt if the server daemon is stopped or a cancel running swap - // request is sent tokio::spawn(async move { tokio::select! { - result = event_loop => { - match result { - Ok(_) => { - tracing::debug!(%swap_id, "EventLoop completed") - } - Err(error) => { - tracing::error!(%swap_id, "EventLoop failed: {:#}", error) - } - } + biased; + _ = context.swap_lock.listen_for_swap_force_suspension() => { + tracing::info!("Shutdown signal received, exiting"); + () }, - result = bob::run(swap) => { - match result { - Ok(state) => { - tracing::debug!(%swap_id, state=%state, "Swap completed") - } - Err(error) => { - tracing::error!(%swap_id, "Failed to complete swap: {:#}", error) - } + _ = async { + let seed = context.config.seed.as_ref().context("Could not get seed")?; + let env_config = context.config.env_config; + let bitcoin_wallet = context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?; + + let seller_peer_id = seller + .extract_peer_id() + .context("Seller address must contain peer ID")?; + context + .db + .insert_address(seller_peer_id, seller.clone()) + .await?; + + let behaviour = cli::Behaviour::new( + seller_peer_id, + env_config, + bitcoin_wallet.clone(), + (seed.derive_libp2p_identity(), context.config.namespace), + ); + let mut swarm = swarm::cli( + seed.derive_libp2p_identity(), + context + .config + .tor_socks5_port + .context("Could not get Tor SOCKS5 port")?, + behaviour, + ) + .await?; + swarm.behaviour_mut().add_address(seller_peer_id, seller); + + tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); + + let (event_loop, mut event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id)?; + let event_loop = tokio::spawn(event_loop.run()); + + let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size()); + let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount); + + let (amount, fees) = match determine_btc_to_swap( + context.config.json, + event_loop_handle.request_quote(), + bitcoin_wallet.new_address(), + || bitcoin_wallet.balance(), + max_givable, + || bitcoin_wallet.sync(), + estimate_fee, + ) + .await + { + Ok(val) => val, + Err(error) => match error.downcast::() { + Ok(_) => { + bail!("Seller's XMR balance is currently too low to initiate a swap, please try again later") + } + Err(other) => bail!(other), + }, + }; + + tracing::info!(%amount, %fees, "Determined swap amount"); + + context.db.insert_peer_id(swap_id, seller_peer_id).await?; + + context + .db + .insert_monero_address(swap_id, monero_receive_address) + .await?; + let monero_wallet = context + .monero_wallet + .as_ref() + .context("Could not get Monero wallet")?; + + let swap = Swap::new( + Arc::clone(&context.db), + swap_id, + Arc::clone(bitcoin_wallet), + Arc::clone(monero_wallet), + env_config, + event_loop_handle, + monero_receive_address, + bitcoin_change_address, + amount, + ); + + tokio::select! { + result = event_loop => { + match result { + Ok(_) => { + tracing::debug!(%swap_id, "EventLoop completed") + } + Err(error) => { + tracing::error!(%swap_id, "EventLoop failed: {:#}", error) + } + } + }, + result = bob::run(swap) => { + match result { + Ok(state) => { + tracing::debug!(%swap_id, state=%state, "Swap completed") + } + Err(error) => { + tracing::error!(%swap_id, "Failed to complete swap: {:#}", error) + } + } + }, } + tracing::debug!(%swap_id, "Swap completed"); + Ok(()) + } => { + () } - _ = halt.recv() => { - tracing::debug!(%swap_id, "Swap cancel signal received while running swap") - } - } + }; + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); }); + Ok(json!({ - "empty": "true" + "swapId": swap_id.to_string(), })) } Method::History => { @@ -343,21 +349,16 @@ impl Request { // Default to 127.0.0.1:1234 let server_address = server_address.unwrap_or("127.0.0.1:1234".parse().unwrap()); - let (_, server_handle) = + let (addr, server_handle) = rpc::run_server(server_address, Arc::clone(&context)).await?; - loop { - let shutdown = Arc::clone(&context.shutdown); - tokio::select! { - _ = shutdown.recv() => { - server_handle.stop()?; - context.shutdown.notify.send(())?; - return Ok(json!({ - "result": [] - })) - } - } - } + tracing::info!(%addr, "Started RPC server"); + + server_handle.stopped().await; + + tracing::info!("Server RPC server"); + + Ok(json!({})) } Method::Balance => { let bitcoin_wallet = context @@ -376,101 +377,131 @@ impl Request { "balance": bitcoin_balance.to_sat() })) } - Method::Resume {swap_id} => { - let seller_peer_id = context.db.get_peer_id(swap_id).await?; - let seller_addresses = context.db.get_addresses(seller_peer_id).await?; + Method::Resume { swap_id } => { + context.swap_lock.acquire_swap_lock(swap_id).await?; - let seed = context - .config - .seed - .as_ref() - .context("Could not get seed")? - .derive_libp2p_identity(); + tokio::spawn(async move { + tokio::select! { + _ = async { + let seller_peer_id = context.db.get_peer_id(swap_id).await?; + let seller_addresses = context.db.get_addresses(seller_peer_id).await?; - let behaviour = cli::Behaviour::new( - seller_peer_id, - context.config.env_config, - Arc::clone( - context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?, - ), - (seed.clone(), context.config.namespace), - ); - let mut swarm = swarm::cli( - seed.clone(), - context - .config - .tor_socks5_port - .context("Could not get Tor SOCKS5 port")?, - behaviour, - ) - .await?; - let our_peer_id = swarm.local_peer_id(); + let seed = context + .config + .seed + .as_ref() + .context("Could not get seed")? + .derive_libp2p_identity(); - tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); + let behaviour = cli::Behaviour::new( + seller_peer_id, + context.config.env_config, + Arc::clone( + context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?, + ), + (seed.clone(), context.config.namespace), + ); + let mut swarm = swarm::cli( + seed.clone(), + context + .config + .tor_socks5_port + .context("Could not get Tor SOCKS5 port")?, + behaviour, + ) + .await?; + let our_peer_id = swarm.local_peer_id(); - for seller_address in seller_addresses { - swarm - .behaviour_mut() - .add_address(seller_peer_id, seller_address); - } + tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); - let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; - let handle = tokio::spawn(event_loop.run()); + for seller_address in seller_addresses { + swarm + .behaviour_mut() + .add_address(seller_peer_id, seller_address); + } - let monero_receive_address = context.db.get_monero_address(swap_id).await?; - let swap = Swap::from_db( - Arc::clone(&context.db), - swap_id, - Arc::clone( - context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?, - ), - Arc::clone( - context - .monero_wallet - .as_ref() - .context("Could not get Monero wallet")?, - ), - context.config.env_config, - event_loop_handle, - monero_receive_address, - ) - .await?; + let (event_loop, event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id)?; + let handle = tokio::spawn(event_loop.run()); - tokio::select! { - event_loop_result = handle => { - event_loop_result?; - }, - swap_result = bob::run(swap) => { - swap_result?; + let monero_receive_address = context.db.get_monero_address(swap_id).await?; + let swap = Swap::from_db( + Arc::clone(&context.db), + swap_id, + Arc::clone( + context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?, + ), + Arc::clone( + context + .monero_wallet + .as_ref() + .context("Could not get Monero wallet")?, + ), + context.config.env_config, + event_loop_handle, + monero_receive_address, + ) + .await?; + + tokio::select! { + event_loop_result = handle => { + event_loop_result?; + }, + swap_result = bob::run(swap) => { + swap_result?; + } + }; + Ok::<(), anyhow::Error>(()) + } => { + () + }, + _ = context.swap_lock.listen_for_swap_force_suspension() => { + tracing::info!("Shutdown signal received, exiting"); + () + } } - } + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); + }); Ok(json!({ - "result": [] + "result": "ok", })) } - Method::CancelAndRefund {swap_id} => { + Method::CancelAndRefund { swap_id } => { let bitcoin_wallet = context .bitcoin_wallet .as_ref() .context("Could not get Bitcoin wallet")?; + context.swap_lock.acquire_swap_lock(swap_id).await?; + let state = cli::cancel_and_refund( swap_id, Arc::clone(bitcoin_wallet), Arc::clone(&context.db), ) - .await?; + .await; - Ok(json!({ - "result": state, - })) + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); + + state.map(|state| { + json!({ + "result": state, + }) + }) } Method::ListSellers { rendezvous_point } => { let rendezvous_node_peer_id = rendezvous_point @@ -494,7 +525,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, identity, ) - .await?; + .await?; for seller in &sellers { match seller.status { @@ -571,7 +602,7 @@ impl Request { })) } Method::GetCurrentSwap => Ok(json!({ - "swap_id": SWAP_LOCK.read().await.clone() + "swap_id": context.swap_lock.get_current_swap_id().await })), } } @@ -583,23 +614,6 @@ impl Request { method = ?self.cmd, ); - if let Some(swap_id) = self.has_lockable_swap_id() { - println!("taking lock for swap_id: {}", swap_id); - let mut guard = SWAP_LOCK.write().await; - if let Some(running_swap_id) = guard.as_ref() { - bail!("Another swap is already running: {}", running_swap_id); - } - let _ = guard.insert(swap_id.clone()); - drop(guard); - - let result = self.handle_cmd(context).instrument(call_span).await; - - SWAP_LOCK.write().await.take(); - - println!("releasing lock for swap_id: {}", swap_id); - - return result; - } self.handle_cmd(context).instrument(call_span).await } } diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 54a1447f..4d920fae 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -22,7 +22,7 @@ use tokio::sync::broadcast; #[tokio::main] async fn main() -> Result<()> { let (tx, _) = broadcast::channel(1); - let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx.clone()).await? { + let (context, request) = match parse_args_and_apply_defaults(env::args_os()).await? { ParseResult::Context(context, request) => (context, request), ParseResult::PrintAndExitZero { message } => { println!("{}", message); diff --git a/swap/src/bitcoin.rs b/swap/src/bitcoin.rs index 7d50354e..683f1dd5 100644 --- a/swap/src/bitcoin.rs +++ b/swap/src/bitcoin.rs @@ -246,7 +246,7 @@ pub fn current_epoch( if tx_lock_status.is_confirmed_with(cancel_timelock) { return ExpiredTimelocks::Cancel { blocks_left: tx_cancel_status.blocks_left_until(punish_timelock), - } + }; } ExpiredTimelocks::None { diff --git a/swap/src/bitcoin/timelocks.rs b/swap/src/bitcoin/timelocks.rs index dee8b4a0..427bef80 100644 --- a/swap/src/bitcoin/timelocks.rs +++ b/swap/src/bitcoin/timelocks.rs @@ -39,11 +39,7 @@ impl Add for BlockHeight { #[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum ExpiredTimelocks { - None { - blocks_left: u32, - }, - Cancel { - blocks_left: u32, - }, + None { blocks_left: u32 }, + Cancel { blocks_left: u32 }, Punish, } diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index ec3bba11..6aed30da 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -926,13 +926,16 @@ impl Confirmed { } pub fn meets_target(&self, target: T) -> bool - where T: Into + where + T: Into, { self.confirmations() >= target.into() } pub fn blocks_left_until(&self, target: T) -> u32 - where T: Into, T: Copy + where + T: Into, + T: Copy, { if self.meets_target(target) { 0 @@ -950,7 +953,8 @@ impl ScriptStatus { /// Check if the script has met the given confirmation target. pub fn is_confirmed_with(&self, target: T) -> bool - where T: Into + where + T: Into, { match self { ScriptStatus::Confirmed(inner) => inner.meets_target(target), @@ -960,12 +964,12 @@ impl ScriptStatus { // Calculate the number of blocks left until the target is met. pub fn blocks_left_until(&self, target: T) -> u32 - where T: Into, T: Copy + where + T: Into, + T: Copy, { match self { - ScriptStatus::Confirmed(inner) => { - inner.blocks_left_until(target) - } + ScriptStatus::Confirmed(inner) => inner.blocks_left_until(target), _ => target.into(), } } diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index b558671d..6f21e98a 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -11,7 +11,6 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use structopt::{clap, StructOpt}; -use tokio::sync::broadcast; use url::Url; use uuid::Uuid; @@ -42,10 +41,7 @@ pub enum ParseResult { PrintAndExitZero { message: String }, } -pub async fn parse_args_and_apply_defaults( - raw_args: I, - rx: broadcast::Sender<()>, -) -> Result +pub async fn parse_args_and_apply_defaults(raw_args: I) -> Result where I: IntoIterator, T: Into + Clone, @@ -78,14 +74,12 @@ where let bitcoin_change_address = bitcoin_address::validate(bitcoin_change_address, is_testnet)?; - let request = Request::new( - Method::BuyXmr { - seller, - bitcoin_change_address, - monero_receive_address, - swap_id: Uuid::new_v4(), - } - ); + let request = Request::new(Method::BuyXmr { + seller, + bitcoin_change_address, + monero_receive_address, + swap_id: Uuid::new_v4(), + }); let context = Context::build( Some(bitcoin), @@ -96,7 +90,6 @@ where debug, json, None, - rx, ) .await?; (context, request) @@ -105,14 +98,14 @@ where let request = Request::new(Method::History); let context = - Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; + Context::build(None, None, None, data, is_testnet, debug, json, None).await?; (context, request) } CliCommand::Config => { let request = Request::new(Method::Config); let context = - Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; + Context::build(None, None, None, data, is_testnet, debug, json, None).await?; (context, request) } CliCommand::Balance { bitcoin } => { @@ -127,7 +120,6 @@ where debug, json, None, - rx, ) .await?; (context, request) @@ -138,11 +130,7 @@ where monero, tor, } => { - let request = Request::new( - Method::StartDaemon { - server_address - } - ); + let request = Request::new(Method::StartDaemon { server_address }); let context = Context::build( Some(bitcoin), @@ -153,7 +141,6 @@ where debug, json, server_address, - rx, ) .await?; (context, request) @@ -165,12 +152,7 @@ where } => { let address = bitcoin_address::validate(address, is_testnet)?; - let request = Request::new( - Method::WithdrawBtc { - amount, - address, - }, - ); + let request = Request::new(Method::WithdrawBtc { amount, address }); let context = Context::build( Some(bitcoin), @@ -181,7 +163,6 @@ where debug, json, None, - rx, ) .await?; (context, request) @@ -192,11 +173,7 @@ where monero, tor, } => { - let request = Request::new( - Method::Resume { - swap_id - } - ); + let request = Request::new(Method::Resume { swap_id }); let context = Context::build( Some(bitcoin), @@ -207,7 +184,6 @@ where debug, json, None, - rx, ) .await?; (context, request) @@ -217,11 +193,7 @@ where bitcoin, tor, } => { - let request = Request::new( - Method::CancelAndRefund { - swap_id - } - ); + let request = Request::new(Method::CancelAndRefund { swap_id }); let context = Context::build( Some(bitcoin), @@ -232,7 +204,6 @@ where debug, json, None, - rx, ) .await?; (context, request) @@ -241,31 +212,15 @@ where rendezvous_point, tor, } => { - let request = Request::new( - Method::ListSellers { - rendezvous_point - } - ); + let request = Request::new(Method::ListSellers { rendezvous_point }); - let context = Context::build( - None, - None, - Some(tor), - data, - is_testnet, - debug, - json, - None, - rx, - ) - .await?; + let context = + Context::build(None, None, Some(tor), data, is_testnet, debug, json, None).await?; (context, request) } CliCommand::ExportBitcoinWallet { bitcoin } => { - let request = Request::new( - Method::ExportBitcoinWallet, - ); + let request = Request::new(Method::ExportBitcoinWallet); let context = Context::build( Some(bitcoin), @@ -276,7 +231,6 @@ where debug, json, None, - rx, ) .await?; (context, request) @@ -284,14 +238,10 @@ where CliCommand::MoneroRecovery { swap_id: SwapId { swap_id }, } => { - let request = Request::new( - Method::MoneroRecovery { - swap_id - } - ); + let request = Request::new(Method::MoneroRecovery { swap_id }); let context = - Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; + Context::build(None, None, None, data, is_testnet, debug, json, None).await?; (context, request) } @@ -570,15 +520,12 @@ mod tests { MULTI_ADDRESS, ]; - let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -605,15 +552,12 @@ mod tests { MULTI_ADDRESS, ]; - let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -639,10 +583,7 @@ mod tests { MULTI_ADDRESS, ]; - let (tx, _) = broadcast::channel(1); - let err = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap_err(); + let err = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); assert_eq!( err.downcast_ref::().unwrap(), @@ -668,10 +609,7 @@ mod tests { MULTI_ADDRESS, ]; - let (tx, _) = broadcast::channel(1); - let err = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap_err(); + let err = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); assert_eq!( err.downcast_ref::().unwrap(), @@ -687,15 +625,12 @@ mod tests { async fn given_resume_on_mainnet_then_defaults_to_mainnet() { let raw_ars = vec![BINARY_NAME, "resume", "--swap-id", SWAP_ID]; - let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -713,14 +648,12 @@ mod tests { let raw_ars = vec![BINARY_NAME, "--testnet", "resume", "--swap-id", SWAP_ID]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -738,15 +671,13 @@ mod tests { let raw_ars = vec![BINARY_NAME, "cancel", "--swap-id", SWAP_ID]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::cancel(tx.clone()), + Request::cancel(), ); let (actual_config, actual_request) = match args { @@ -764,14 +695,12 @@ mod tests { let raw_ars = vec![BINARY_NAME, "--testnet", "cancel", "--swap-id", SWAP_ID]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::cancel(tx.clone()), + Request::cancel(), ); let (actual_config, actual_request) = match args { @@ -789,14 +718,12 @@ mod tests { let raw_ars = vec![BINARY_NAME, "refund", "--swap-id", SWAP_ID]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::refund(tx.clone()), + Request::refund(), ); let (actual_config, actual_request) = match args { @@ -814,14 +741,12 @@ mod tests { let raw_ars = vec![BINARY_NAME, "--testnet", "refund", "--swap-id", SWAP_ID]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::refund(tx.clone()), + Request::refund(), ); let (actual_config, actual_request) = match args { @@ -850,15 +775,13 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -889,14 +812,12 @@ mod tests { let (tx, _) = broadcast::channel(1); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -922,14 +843,12 @@ mod tests { let (tx, _) = broadcast::channel(1); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -956,14 +875,12 @@ mod tests { let (tx, _) = broadcast::channel(1); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -991,14 +908,12 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -1027,14 +942,12 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -1052,14 +965,12 @@ mod tests { let raw_ars = vec![BINARY_NAME, "--debug", "resume", "--swap-id", SWAP_ID]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -1084,14 +995,12 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -1119,15 +1028,13 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, true); let data_dir = data_dir_path_cli(is_testnet); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -1156,14 +1063,12 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, true); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet, tx.clone()), + Request::buy_xmr(is_testnet), ); let (actual_config, actual_request) = match args { @@ -1180,14 +1085,12 @@ mod tests { async fn given_resume_on_mainnet_with_json_then_json_set() { let (tx, _) = broadcast::channel(1); let raw_ars = vec![BINARY_NAME, "--json", "resume", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (false, false, true); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -1212,14 +1115,12 @@ mod tests { ]; let (tx, _) = broadcast::channel(1); - let args = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); let (is_testnet, debug, json) = (true, false, true); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(tx.clone()), + Request::resume(), ); let (actual_config, actual_request) = match args { @@ -1245,9 +1146,7 @@ mod tests { MULTI_ADDRESS, ]; let (tx, _) = broadcast::channel(1); - let result = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1259,9 +1158,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1273,9 +1170,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let result = parse_args_and_apply_defaults(raw_ars).await.unwrap(); assert!(matches!(result, ParseResult::Context(_, _))); } @@ -1294,9 +1189,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1309,9 +1202,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1324,9 +1215,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars, tx.clone()) - .await - .unwrap(); + let result = parse_args_and_apply_defaults(raw_ars).await.unwrap(); assert!(matches!(result, ParseResult::Context(_, _))); } diff --git a/swap/src/cli/tracing.rs b/swap/src/cli/tracing.rs index 21a76199..5a360d1e 100644 --- a/swap/src/cli/tracing.rs +++ b/swap/src/cli/tracing.rs @@ -13,8 +13,7 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef) -> Result<()> { let level_filter = EnvFilter::try_new("swap=debug")?; let registry = Registry::default().with(level_filter); - let appender = - tracing_appender::rolling::never(dir.as_ref(), "swap-all.log"); + let appender = tracing_appender::rolling::never(dir.as_ref(), "swap-all.log"); let (appender, guard) = tracing_appender::non_blocking(appender); std::mem::forget(guard); @@ -47,19 +46,11 @@ pub struct StdErrPrinter { level: Level, } -type StdErrLayer = fmt::Layer< - S, - DefaultFields, - Format, - fn() -> std::io::Stderr, ->; +type StdErrLayer = + fmt::Layer, fn() -> std::io::Stderr>; -type StdErrJsonLayer = fmt::Layer< - S, - JsonFields, - Format, - fn() -> std::io::Stderr, ->; +type StdErrJsonLayer = + fmt::Layer, fn() -> std::io::Stderr>; fn debug_terminal_printer() -> StdErrPrinter>> { let is_terminal = atty::is(atty::Stream::Stderr); diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index df59c6a6..0eef7dcd 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -112,7 +112,7 @@ where } AliceState::BtcLocked { state3 } => { match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None {..} => { + ExpiredTimelocks::None { .. } => { // Record the current monero wallet block height so we don't have to scan from // block 0 for scenarios where we create a refund wallet. let monero_wallet_restore_blockheight = monero_wallet.block_height().await?; @@ -135,7 +135,7 @@ where transfer_proof, state3, } => match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None {..} => { + ExpiredTimelocks::None { .. } => { monero_wallet .watch_for_transfer(state3.lock_xmr_watch_request(transfer_proof.clone(), 1)) .await @@ -221,7 +221,7 @@ where encrypted_signature, state3, } => match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None {..} => { + ExpiredTimelocks::None { .. } => { let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; match state3.signed_redeem_transaction(*encrypted_signature) { Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await { diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index db8994fd..6f5de482 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -117,7 +117,7 @@ async fn next_state( } => { let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; - if let ExpiredTimelocks::None {..} = state3.expired_timelock(bitcoin_wallet).await? { + if let ExpiredTimelocks::None { .. } = state3.expired_timelock(bitcoin_wallet).await? { let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock); @@ -156,7 +156,7 @@ async fn next_state( } => { let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; - if let ExpiredTimelocks::None {..} = state.expired_timelock(bitcoin_wallet).await? { + if let ExpiredTimelocks::None { .. } = state.expired_timelock(bitcoin_wallet).await? { let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); select! { @@ -185,7 +185,7 @@ async fn next_state( BobState::XmrLocked(state) => { let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; - if let ExpiredTimelocks::None {..} = state.expired_timelock(bitcoin_wallet).await? { + if let ExpiredTimelocks::None { .. } = state.expired_timelock(bitcoin_wallet).await? { // Alice has locked Xmr // Bob sends Alice his key @@ -209,7 +209,7 @@ async fn next_state( BobState::EncSigSent(state) => { let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; - if let ExpiredTimelocks::None {..} = state.expired_timelock(bitcoin_wallet).await? { + if let ExpiredTimelocks::None { .. } = state.expired_timelock(bitcoin_wallet).await? { select! { state5 = state.watch_for_redeem_btc(bitcoin_wallet) => { BobState::BtcRedeemed(state5?) @@ -269,7 +269,7 @@ async fn next_state( BobState::BtcCancelled(state) => { // Bob has cancelled the swap match state.expired_timelock(bitcoin_wallet).await? { - ExpiredTimelocks::None {..} => { + ExpiredTimelocks::None { .. } => { bail!( "Internal error: canceled state reached before cancel timelock was expired" ); diff --git a/swap/src/rpc.rs b/swap/src/rpc.rs index 45ed6d6d..252e6e9a 100644 --- a/swap/src/rpc.rs +++ b/swap/src/rpc.rs @@ -26,7 +26,6 @@ pub async fn run_server( let addr = server.local_addr()?; let server_handle = server.start(modules)?; - tracing::info!(%addr, "Started RPC server"); Ok((addr, server_handle)) } diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index 797f7607..0572636c 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -14,6 +14,12 @@ use uuid::Uuid; pub fn register_modules(context: Arc) -> RpcModule> { let mut module = RpcModule::new(context); + module + .register_async_method("suspend_current_swap", |_, context| async move { + execute_request(Method::SuspendCurrentSwap, &context).await + }) + .unwrap(); + module .register_async_method("get_swap_info", |params, context| async move { let params: HashMap = params.parse()?;