From 7d2b7bee922f65f3cf2d7c93c757b530a52a747f Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Fri, 11 Aug 2023 10:50:13 +0200 Subject: [PATCH 1/6] Combine Cmd and Params --- swap/src/api/request.rs | 161 +++++++++++++++------------------------- swap/src/bin/swap.rs | 2 +- swap/src/cli/command.rs | 68 +++++++---------- swap/src/rpc/methods.rs | 76 ++++++++----------- 4 files changed, 118 insertions(+), 189 deletions(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index e4360c7a..d7a58aa1 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -18,14 +18,13 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use structopt::lazy_static::lazy_static; use tokio::sync::broadcast::Receiver; -use tokio::sync::Mutex; use tracing::{debug_span, Instrument}; use uuid::Uuid; #[derive(PartialEq, Debug)] pub struct Request { - pub params: Params, pub cmd: Method, pub shutdown: Shutdown, } @@ -72,72 +71,72 @@ impl PartialEq for Shutdown { } } -#[derive(Default, PartialEq, Debug)] -pub struct Params { - pub seller: Option, - pub bitcoin_change_address: Option, - pub monero_receive_address: Option, - pub rendezvous_point: Option, - pub swap_id: Option, - pub amount: Option, - pub server_address: Option, - pub address: Option, -} - #[derive(Debug, PartialEq)] pub enum Method { - BuyXmr, + BuyXmr { + seller: Multiaddr, + bitcoin_change_address: bitcoin::Address, + monero_receive_address: monero::Address, + swap_id: Uuid, + }, History, RawHistory, Config, - WithdrawBtc, + WithdrawBtc { + amount: Option, + address: bitcoin::Address, + }, Balance, - GetSeller, - SwapStartDate, - Resume, - CancelAndRefund, - ListSellers, + GetSeller { + swap_id: Uuid, + }, + SwapStartDate { + swap_id: Uuid, + }, + Resume { + swap_id: Uuid, + }, + CancelAndRefund { + swap_id: Uuid, + }, + ListSellers { + rendezvous_point: Multiaddr, + }, ExportBitcoinWallet, - MoneroRecovery, - StartDaemon, + MoneroRecovery { + swap_id: Uuid, + }, + StartDaemon { + server_address: Option, + } } impl Request { - pub fn new(shutdownReceiver: Receiver<()>, cmd: Method, params: Params) -> Request { + pub fn new(shutdownReceiver: Receiver<()>, cmd: Method) -> Request { Request { - params, cmd, shutdown: Shutdown::new(shutdownReceiver), } } - async fn handle_cmd(&mut self, context: Arc) -> Result { + fn has_lockable_swap_id(&self) -> Option { match self.cmd { - Method::BuyXmr => { - let swap_id = self - .params - .swap_id - .context("Parameter swap_id is missing")?; + Method::BuyXmr { swap_id, .. } + | Method::Resume { swap_id } + | Method::CancelAndRefund { swap_id } => Some(swap_id), + _ => None, + } + } + + async fn handle_cmd(mut self, context: Arc) -> Result { + match self.cmd { + Method::BuyXmr { seller, bitcoin_change_address, monero_receive_address, swap_id } => { let seed = context.config.seed.as_ref().context("Could not get seed")?; let env_config = context.config.env_config; let btc = context .bitcoin_wallet .as_ref() .context("Could not get Bitcoin wallet")?; - let seller = self - .params - .seller - .clone() - .context("Parameter seller is missing")?; - let monero_receive_address = self - .params - .monero_receive_address - .context("Parameter monero_receive_address is missing")?; - let bitcoin_change_address = self - .params - .bitcoin_change_address - .clone() - .context("Parameter bitcoin_change_address is missing")?; let bitcoin_wallet = btc; let seller_peer_id = seller @@ -247,8 +246,7 @@ impl Request { let raw_history = context.db.raw_all().await?; Ok(json!({ "raw_history": raw_history })) } - Method::GetSeller => { - let swap_id = self.params.swap_id.context("Parameter swap_id is needed")?; + Method::GetSeller { swap_id } => { let peerId = context .db .get_peer_id(swap_id) @@ -266,12 +264,7 @@ impl Request { "addresses": addresses })) } - Method::SwapStartDate => { - let swap_id = self - .params - .swap_id - .context("Parameter swap_id is missing")?; - + Method::SwapStartDate { swap_id } => { let start_date = context.db.get_swap_start_date(swap_id).await?; Ok(json!({ @@ -295,19 +288,13 @@ impl Request { "bitcoin_wallet": format!("{}/wallet", data_dir_display), })) } - Method::WithdrawBtc => { + Method::WithdrawBtc {address, amount} => { let bitcoin_wallet = context .bitcoin_wallet .as_ref() .context("Could not get Bitcoin wallet")?; - let address = self - .params - .address - .clone() - .context("Parameter address is missing")?; - - let amount = match self.params.amount { + let amount = match amount { Some(amount) => amount, None => { bitcoin_wallet @@ -330,12 +317,9 @@ impl Request { "txid": signed_tx.txid(), })) } - Method::StartDaemon => { + Method::StartDaemon {server_address} => { // Default to 127.0.0.1:1234 - let server_address = self - .params - .server_address - .unwrap_or("127.0.0.1:1234".parse().unwrap()); + let server_address = server_address.unwrap_or("127.0.0.1:1234".parse().unwrap()); let (_, server_handle) = rpc::run_server(server_address, Arc::clone(&context)).await?; @@ -368,12 +352,7 @@ impl Request { "balance": bitcoin_balance.to_sat() })) } - Method::Resume => { - let swap_id = self - .params - .swap_id - .context("Parameter swap_id is missing")?; - + Method::Resume {swap_id} => { let seller_peer_id = context.db.get_peer_id(swap_id).await?; let seller_addresses = context.db.get_addresses(seller_peer_id).await?; @@ -452,16 +431,14 @@ impl Request { "result": [] })) } - Method::CancelAndRefund => { + Method::CancelAndRefund {swap_id} => { let bitcoin_wallet = context .bitcoin_wallet .as_ref() .context("Could not get Bitcoin wallet")?; let state = cli::cancel_and_refund( - self.params - .swap_id - .context("Parameter swap_id is missing")?, + swap_id, Arc::clone(bitcoin_wallet), Arc::clone(&context.db), ) @@ -471,12 +448,7 @@ impl Request { "result": state, })) } - Method::ListSellers => { - let rendezvous_point = self - .params - .rendezvous_point - .clone() - .context("Parameter rendezvous_point is missing")?; + Method::ListSellers {rendezvous_point} => { let rendezvous_node_peer_id = rendezvous_point .extract_peer_id() .context("Rendezvous node address must contain peer ID")?; @@ -536,13 +508,11 @@ impl Request { "result": [] })) } - Method::MoneroRecovery => { + Method::MoneroRecovery {swap_id} => { let swap_state: BobState = context .db .get_state( - self.params - .swap_id - .context("Parameter swap_id is missing")?, + swap_id, ) .await? .try_into()?; @@ -585,22 +555,11 @@ impl Request { } } - pub async fn call(&mut self, context: Arc) -> Result { + pub async fn call(self, context: Arc) -> Result { // If the swap ID is set, we add it to the span - let call_span = self.params.swap_id.map_or_else( - || { - debug_span!( - "call", - method = ?self.cmd, - ) - }, - |swap_id| { - debug_span!( - "call", - method = ?self.cmd, - swap_id = swap_id.to_string(), - ) - }, + let call_span = debug_span!( + "call", + method = ?self.cmd, ); self.handle_cmd(context).instrument(call_span).await diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index e86cf75a..5dbd949c 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -22,7 +22,7 @@ use tokio::sync::broadcast; #[tokio::main] async fn main() -> Result<()> { let (tx, _) = broadcast::channel(1); - let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx).await? { + let (context, request) = match parse_args_and_apply_defaults(env::args_os(), tx).await? { ParseResult::Context(context, request) => (context, request), ParseResult::PrintAndExitZero { message } => { println!("{}", message); diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index e3c664b1..38d861a7 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -1,4 +1,4 @@ -use crate::api::request::{Method, Params, Request}; +use crate::api::request::{Method, Request}; use crate::api::Context; use crate::bitcoin::{bitcoin_address, Amount}; use crate::monero; @@ -80,13 +80,12 @@ where let request = Request::new( rx.subscribe(), - Method::BuyXmr, - Params { - bitcoin_change_address: Some(bitcoin_change_address), - monero_receive_address: Some(monero_receive_address), - seller: Some(seller), - ..Default::default() - }, + Method::BuyXmr { + seller, + bitcoin_change_address, + monero_receive_address, + swap_id: Uuid::new_v4(), + } ); let context = Context::build( @@ -104,21 +103,21 @@ where (context, request) } CliCommand::History => { - let request = Request::new(rx.subscribe(), Method::History, Params::default()); + let request = Request::new(rx.subscribe(), Method::History); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Config => { - let request = Request::new(rx.subscribe(), Method::Config, Params::default()); + let request = Request::new(rx.subscribe(), Method::Config); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Balance { bitcoin } => { - let request = Request::new(rx.subscribe(), Method::Balance, Params::default()); + let request = Request::new(rx.subscribe(), Method::Balance); let context = Context::build( Some(bitcoin), @@ -142,11 +141,9 @@ where } => { let request = Request::new( rx.subscribe(), - Method::StartDaemon, - Params { - server_address, - ..Default::default() - }, + Method::StartDaemon { + server_address + } ); let context = Context::build( @@ -172,11 +169,9 @@ where let request = Request::new( rx.subscribe(), - Method::WithdrawBtc, - Params { + Method::WithdrawBtc { amount, - address: Some(address), - ..Default::default() + address, }, ); @@ -202,11 +197,9 @@ where } => { let request = Request::new( rx.subscribe(), - Method::Resume, - Params { - swap_id: Some(swap_id), - ..Default::default() - }, + Method::Resume { + swap_id + } ); let context = Context::build( @@ -230,11 +223,9 @@ where } => { let request = Request::new( rx.subscribe(), - Method::CancelAndRefund, - Params { - swap_id: Some(swap_id), - ..Default::default() - }, + Method::CancelAndRefund { + swap_id + } ); let context = Context::build( @@ -257,11 +248,9 @@ where } => { let request = Request::new( rx.subscribe(), - Method::ListSellers, - Params { - rendezvous_point: Some(rendezvous_point), - ..Default::default() - }, + Method::ListSellers { + rendezvous_point + } ); let context = Context::build( @@ -283,7 +272,6 @@ where let request = Request::new( rx.subscribe(), Method::ExportBitcoinWallet, - Params::default(), ); let context = Context::build( @@ -305,11 +293,9 @@ where } => { let request = Request::new( rx.subscribe(), - Method::MoneroRecovery, - Params { - swap_id: Some(swap_id), - ..Default::default() - }, + Method::MoneroRecovery { + swap_id + } ); let context = diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index dbdb0479..5c564900 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -1,4 +1,4 @@ -use crate::api::request::{Method, Params, Request}; +use crate::api::request::{Method, Request}; use crate::api::Context; use crate::bitcoin::bitcoin_address; use crate::monero::monero_address; @@ -161,10 +161,9 @@ pub fn register_modules(context: Arc) -> RpcModule> { async fn execute_request( cmd: Method, - params: Params, context: &Arc, ) -> Result { - let mut request = Request::new(context.shutdown.subscribe(), cmd, params); + let request = Request::new(context.shutdown.subscribe(), cmd); request .call(Arc::clone(context)) .await @@ -174,74 +173,64 @@ async fn execute_request( async fn get_bitcoin_balance( context: &Arc, ) -> Result { - execute_request(Method::Balance, Params::default(), context).await + execute_request(Method::Balance, context).await } async fn get_history(context: &Arc) -> Result { - execute_request(Method::History, Params::default(), context).await + execute_request(Method::History, context).await } async fn get_raw_history( context: &Arc, ) -> Result { - execute_request(Method::RawHistory, Params::default(), context).await + execute_request(Method::RawHistory, context).await } async fn get_seller( swap_id: Uuid, context: &Arc, ) -> Result { - let params = Params { - swap_id: Some(swap_id), - ..Default::default() - }; - execute_request(Method::GetSeller, params, context).await + execute_request(Method::GetSeller { + swap_id + }, context).await } async fn get_swap_start_date( swap_id: Uuid, context: &Arc, ) -> Result { - let params = Params { - swap_id: Some(swap_id), - ..Default::default() - }; - execute_request(Method::SwapStartDate, params, context).await + execute_request(Method::SwapStartDate { + swap_id + }, context).await } async fn resume_swap( swap_id: Uuid, context: &Arc, ) -> Result { - let params = Params { - swap_id: Some(swap_id), - ..Default::default() - }; - execute_request(Method::Resume, params, context).await + execute_request(Method::Resume { + swap_id + }, context).await } async fn cancel_and_refund_swap( swap_id: Uuid, context: &Arc, ) -> Result { - let params = Params { - swap_id: Some(swap_id), - ..Default::default() - }; - execute_request(Method::CancelAndRefund, params, context).await + execute_request(Method::CancelAndRefund { + swap_id + }, context).await } async fn withdraw_btc( - withdraw_address: bitcoin::Address, + address: bitcoin::Address, amount: Option, context: &Arc, ) -> Result { - let params = Params { + execute_request(Method::WithdrawBtc { amount, - address: Some(withdraw_address), - ..Default::default() - }; - execute_request(Method::WithdrawBtc, params, context).await + address, + }, context).await } async fn buy_xmr( @@ -250,24 +239,19 @@ async fn buy_xmr( seller: Multiaddr, context: &Arc, ) -> Result { - let params = Params { - bitcoin_change_address: Some(bitcoin_change_address), - monero_receive_address: Some(monero_receive_address), - seller: Some(seller), - swap_id: Some(Uuid::new_v4()), - ..Default::default() - }; - - execute_request(Method::BuyXmr, params, context).await + execute_request(Method::BuyXmr { + seller, + swap_id: Uuid::new_v4(), + bitcoin_change_address, + monero_receive_address + }, context).await } async fn list_sellers( rendezvous_point: Multiaddr, context: &Arc, ) -> Result { - let params = Params { - rendezvous_point: Some(rendezvous_point), - ..Default::default() - }; - execute_request(Method::ListSellers, params, context).await + execute_request(Method::ListSellers { + rendezvous_point + }, context).await } From 849e6e7a147bf8707d2f244c716a83a6ad2db7cc Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Fri, 11 Aug 2023 11:53:07 +0200 Subject: [PATCH 2/6] Disallow concurrent swaps --- swap/src/api/request.rs | 54 +++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index d7a58aa1..00ba2440 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -22,6 +22,11 @@ use structopt::lazy_static::lazy_static; use tokio::sync::broadcast::Receiver; use tracing::{debug_span, Instrument}; use uuid::Uuid; +use tokio::sync::Mutex; + +lazy_static! { + static ref SWAP_MUTEX: Mutex> = Mutex::new(None); +} #[derive(PartialEq, Debug)] pub struct Request { @@ -161,7 +166,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, behaviour, ) - .await?; + .await?; swarm.behaviour_mut().add_address(seller_peer_id, seller); tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); @@ -182,7 +187,7 @@ impl Request { || bitcoin_wallet.sync(), estimate_fee, ) - .await + .await { Ok(val) => val, Err(error) => match error.downcast::() { @@ -382,7 +387,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, behaviour, ) - .await?; + .await?; let our_peer_id = swarm.local_peer_id(); tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); @@ -417,7 +422,7 @@ impl Request { event_loop_handle, monero_receive_address, ) - .await?; + .await?; tokio::select! { event_loop_result = handle => { @@ -442,7 +447,7 @@ impl Request { Arc::clone(bitcoin_wallet), Arc::clone(&context.db), ) - .await?; + .await?; Ok(json!({ "result": state, @@ -470,7 +475,7 @@ impl Request { .context("Could not get Tor SOCKS5 port")?, identity, ) - .await?; + .await?; for seller in &sellers { match seller.status { @@ -558,10 +563,26 @@ impl Request { pub async fn call(self, context: Arc) -> Result { // If the swap ID is set, we add it to the span let call_span = debug_span!( - "call", + "cmd", method = ?self.cmd, ); + if let Some(swap_id) = self.has_lockable_swap_id() { + println!("taking lock for swap_id: {}", swap_id); + let mut guard = SWAP_MUTEX.try_lock().context("Another swap is already running")?; + if guard.is_some() { + bail!("Another swap is already running"); + } + + let _ = guard.insert(swap_id.clone()); + + let result = self.handle_cmd(context).instrument(call_span).await; + guard.take(); + + println!("releasing lock for swap_id: {}", swap_id); + + return result; + } self.handle_cmd(context).instrument(call_span).await } } @@ -584,15 +605,15 @@ pub async fn determine_btc_to_swap( sync: FS, estimate_fee: FFE, ) -> Result<(bitcoin::Amount, bitcoin::Amount)> -where - TB: Future>, - FB: Fn() -> TB, - TMG: Future>, - FMG: Fn() -> TMG, - TS: Future>, - FS: Fn() -> TS, - FFE: Fn(bitcoin::Amount) -> TFE, - TFE: Future>, + where + TB: Future>, + FB: Fn() -> TB, + TMG: Future>, + FMG: Fn() -> TMG, + TS: Future>, + FS: Fn() -> TS, + FFE: Fn(bitcoin::Amount) -> TFE, + TFE: Future>, { tracing::debug!("Requesting quote"); let bid_quote = bid_quote.await?; @@ -667,3 +688,4 @@ where Ok((btc_swap_amount, fees)) } + From ffbbe2401020f27d8a1abe0427a0688b52095497 Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Fri, 11 Aug 2023 15:29:59 +0200 Subject: [PATCH 3/6] Use RwLock instead of Mutex to allow for parallel reads and add get_current_swap endpoint --- swap/src/api/request.rs | 27 +++++++++++++++++---------- swap/src/rpc/methods.rs | 7 +++++++ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index 00ba2440..d5de5e3d 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -16,16 +16,16 @@ use std::cmp::min; use std::convert::TryInto; use std::future::Future; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc}; use std::time::Duration; use structopt::lazy_static::lazy_static; use tokio::sync::broadcast::Receiver; use tracing::{debug_span, Instrument}; use uuid::Uuid; -use tokio::sync::Mutex; +use tokio::sync::RwLock; lazy_static! { - static ref SWAP_MUTEX: Mutex> = Mutex::new(None); + static ref SWAP_LOCK: RwLock> = RwLock::new(None); } #[derive(PartialEq, Debug)] @@ -113,7 +113,8 @@ pub enum Method { }, StartDaemon { server_address: Option, - } + }, + GetCurrentSwap, } impl Request { @@ -556,7 +557,12 @@ impl Request { Ok(json!({ "result": [] })) - } + }, + Method::GetCurrentSwap => { + Ok(json!({ + "swap_id": SWAP_LOCK.read().await.clone() + })) + }, } } @@ -569,15 +575,16 @@ impl Request { if let Some(swap_id) = self.has_lockable_swap_id() { println!("taking lock for swap_id: {}", swap_id); - let mut guard = SWAP_MUTEX.try_lock().context("Another swap is already running")?; - if guard.is_some() { - bail!("Another swap is already running"); + let mut guard = SWAP_LOCK.write().await; + if let Some(running_swap_id) = guard.as_ref() { + bail!("Another swap is already running: {}", running_swap_id); } - let _ = guard.insert(swap_id.clone()); + drop(guard); let result = self.handle_cmd(context).instrument(call_span).await; - guard.take(); + + SWAP_LOCK.write().await.take(); println!("releasing lock for swap_id: {}", swap_id); diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index 5c564900..946b5b94 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -156,6 +156,9 @@ pub fn register_modules(context: Arc) -> RpcModule> { list_sellers(rendezvous_point.clone(), &context).await }) .expect("Could not register RPC method list_sellers"); + module.register_async_method("get_current_swap", |_, context| async move { + get_current_swap(&context).await + }).expect("Could not register RPC method get_current_swap"); module } @@ -170,6 +173,10 @@ async fn execute_request( .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) } +async fn get_current_swap(context: &Arc) -> Result { + execute_request(Method::GetCurrentSwap, context).await +} + async fn get_bitcoin_balance( context: &Arc, ) -> Result { From f804254f9436287a9b3925784c5feac2a0cf8661 Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Fri, 11 Aug 2023 15:39:46 +0200 Subject: [PATCH 4/6] Return wallet descriptor to RPC API caller --- swap/src/api/request.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index d5de5e3d..a54b41be 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -511,7 +511,7 @@ impl Request { let wallet_export = bitcoin_wallet.wallet_export("cli").await?; tracing::info!(descriptor=%wallet_export.to_string(), "Exported bitcoin wallet"); Ok(json!({ - "result": [] + "descriptor": wallet_export.to_string(), })) } Method::MoneroRecovery {swap_id} => { From 433bf824f9a854f5f2a949112d4323a238268655 Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Sat, 12 Aug 2023 17:16:23 +0200 Subject: [PATCH 5/6] Append all cli logs to single log file After careful consideration, I've concluded that it's not practical/possible to ensure that the previous behaviour (one log file per swap) is preserved due to limitations of the tracing-subscriber crate and a big in the built in JSON formatter --- swap/src/api.rs | 2 +- swap/src/cli/tracing.rs | 75 +++++++++++++++-------------------------- 2 files changed, 29 insertions(+), 48 deletions(-) diff --git a/swap/src/api.rs b/swap/src/api.rs index 8f3db7bb..2b86d5e2 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -93,7 +93,7 @@ impl Context { let tor_socks5_port = tor.map(|tor| tor.tor_socks5_port); START.call_once(|| { - let _ = cli::tracing::init(debug, json, data_dir.join("logs"), None); + let _ = cli::tracing::init(debug, json, data_dir.join("logs")); }); let context = Context { diff --git a/swap/src/cli/tracing.rs b/swap/src/cli/tracing.rs index a1cd77fb..21a76199 100644 --- a/swap/src/cli/tracing.rs +++ b/swap/src/cli/tracing.rs @@ -7,55 +7,36 @@ use tracing::{Event, Level, Subscriber}; use tracing_subscriber::fmt::format::{DefaultFields, Format, JsonFields}; use tracing_subscriber::fmt::time::UtcTime; use tracing_subscriber::layer::{Context, SubscriberExt}; -use tracing_subscriber::{fmt, EnvFilter, FmtSubscriber, Layer, Registry}; -use uuid::Uuid; +use tracing_subscriber::{fmt, EnvFilter, Layer, Registry}; -pub fn init(debug: bool, json: bool, dir: impl AsRef, swap_id: Option) -> Result<()> { - if let Some(swap_id) = swap_id { - let level_filter = EnvFilter::try_new("swap=debug")?; +pub fn init(debug: bool, json: bool, dir: impl AsRef) -> Result<()> { + let level_filter = EnvFilter::try_new("swap=debug")?; + let registry = Registry::default().with(level_filter); - let registry = Registry::default().with(level_filter); + let appender = + tracing_appender::rolling::never(dir.as_ref(), "swap-all.log"); + let (appender, guard) = tracing_appender::non_blocking(appender); - let appender = - tracing_appender::rolling::never(dir.as_ref(), format!("swap-{}.log", swap_id)); - let (appender, guard) = tracing_appender::non_blocking(appender); + std::mem::forget(guard); - std::mem::forget(guard); + let file_logger = registry.with( + fmt::layer() + .with_ansi(false) + .with_target(false) + .with_span_events(fmt::format::FmtSpan::FULL) + .json() + .with_writer(appender), + ); - let file_logger = registry.with( - fmt::layer() - .with_ansi(false) - .with_target(false) - .json() - .with_writer(appender), - ); - - if json && debug { - set_global_default(file_logger.with(debug_json_terminal_printer()))?; - } else if json && !debug { - set_global_default(file_logger.with(info_json_terminal_printer()))?; - } else if !json && debug { - set_global_default(file_logger.with(debug_terminal_printer()))?; - } else { - set_global_default(file_logger.with(info_terminal_printer()))?; - } + if json && debug { + set_global_default(file_logger.with(debug_json_terminal_printer()))?; + } else if json && !debug { + set_global_default(file_logger.with(info_json_terminal_printer()))?; + } else if !json && debug { + set_global_default(file_logger.with(debug_terminal_printer()))?; } else { - let level = if debug { Level::DEBUG } else { Level::INFO }; - let is_terminal = atty::is(atty::Stream::Stderr); - - let builder = FmtSubscriber::builder() - .with_env_filter(format!("swap={}", level)) - .with_writer(std::io::stderr) - .with_ansi(is_terminal) - .with_timer(UtcTime::rfc_3339()) - .with_target(false); - - if json { - builder.json().init(); - } else { - builder.init(); - } - }; + set_global_default(file_logger.with(info_terminal_printer()))?; + } tracing::info!("Logging initialized to {}", dir.as_ref().display()); Ok(()) @@ -66,17 +47,17 @@ pub struct StdErrPrinter { level: Level, } -type StdErrLayer = tracing_subscriber::fmt::Layer< +type StdErrLayer = fmt::Layer< S, DefaultFields, - Format, + Format, fn() -> std::io::Stderr, >; -type StdErrJsonLayer = tracing_subscriber::fmt::Layer< +type StdErrJsonLayer = fmt::Layer< S, JsonFields, - Format, + Format, fn() -> std::io::Stderr, >; From 1b13608d96e5fda007c8ad5d20f060b83cceb581 Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Sat, 12 Aug 2023 23:15:29 +0200 Subject: [PATCH 6/6] Add get_swap_expired_timelock timelock, other small refactoring - Add get_swap_expired_timelock endpoint to return expired timelock if one exists. Fails if bitcoin lock tx has not yet published or if swap is already finished. - Rename current_epoch to expired_timelock to enforce consistent method names - Add blocks left until current expired timelock expires (next timelock expires) to ExpiredTimelock struct - Change .expect() to .unwrap() in rpc server method register because those will only fail if we register the same method twice which will never happen --- swap/src/api/request.rs | 37 ++++++++++++++++++++- swap/src/bitcoin.rs | 8 +++-- swap/src/bitcoin/cancel.rs | 12 +++++++ swap/src/bitcoin/timelocks.rs | 10 ++++-- swap/src/bitcoin/wallet.rs | 59 +++++++++++++++++++++++++++++---- swap/src/protocol/alice/swap.rs | 6 ++-- swap/src/protocol/bob/state.rs | 2 +- swap/src/protocol/bob/swap.rs | 12 +++---- swap/src/rpc/methods.rs | 41 +++++++++++++++++------ 9 files changed, 155 insertions(+), 32 deletions(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index a54b41be..24b56a62 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -1,5 +1,5 @@ use crate::api::Context; -use crate::bitcoin::{Amount, TxLock}; +use crate::bitcoin::{Amount, ExpiredTimelocks, TxLock}; use crate::cli::{list_sellers, EventLoop, SellerStatus}; use crate::libp2p_ext::MultiAddrExt; use crate::network::quote::{BidQuote, ZeroQuoteReceived}; @@ -115,6 +115,9 @@ pub enum Method { server_address: Option, }, GetCurrentSwap, + GetSwapExpiredTimelock { + swap_id: Uuid, + }, } impl Request { @@ -563,6 +566,38 @@ impl Request { "swap_id": SWAP_LOCK.read().await.clone() })) }, + Method::GetSwapExpiredTimelock { swap_id } => { + let swap_state: BobState = context + .db + .get_state( + swap_id, + ) + .await? + .try_into()?; + + let bitcoin_wallet = context.bitcoin_wallet.as_ref().context("Could not get Bitcoin wallet")?; + + let timelock = match swap_state { + BobState::Started { .. } + | BobState::SafelyAborted + | BobState::SwapSetupCompleted(_) => bail!("Bitcoin lock transaction has not been published yet"), + BobState::BtcLocked { state3: state, .. } + | BobState::XmrLockProofReceived { state, .. } => state.expired_timelock(bitcoin_wallet).await, + BobState::XmrLocked(state) + | BobState::EncSigSent(state) => state.expired_timelock(bitcoin_wallet).await, + BobState::CancelTimelockExpired(state) + | BobState::BtcCancelled(state) => state.expired_timelock(bitcoin_wallet).await, + BobState::BtcPunished { .. } => Ok(ExpiredTimelocks::Punish), + // swap is already finished + BobState::BtcRefunded(_) + | BobState::BtcRedeemed(_) + | BobState::XmrRedeemed { .. } => bail!("Bitcoin have already been redeemed or refunded") + }?; + + Ok(json!({ + "timelock": timelock, + })) + }, } } diff --git a/swap/src/bitcoin.rs b/swap/src/bitcoin.rs index f3e42f63..7d50354e 100644 --- a/swap/src/bitcoin.rs +++ b/swap/src/bitcoin.rs @@ -244,10 +244,14 @@ pub fn current_epoch( } if tx_lock_status.is_confirmed_with(cancel_timelock) { - return ExpiredTimelocks::Cancel; + return ExpiredTimelocks::Cancel { + blocks_left: tx_cancel_status.blocks_left_until(punish_timelock), + } } - ExpiredTimelocks::None + ExpiredTimelocks::None { + blocks_left: tx_lock_status.blocks_left_until(cancel_timelock), + } } pub mod bitcoin_address { diff --git a/swap/src/bitcoin/cancel.rs b/swap/src/bitcoin/cancel.rs index 35b6b197..aec3fe38 100644 --- a/swap/src/bitcoin/cancel.rs +++ b/swap/src/bitcoin/cancel.rs @@ -24,6 +24,12 @@ use std::ops::Add; #[serde(transparent)] pub struct CancelTimelock(u32); +impl From for u32 { + fn from(cancel_timelock: CancelTimelock) -> Self { + cancel_timelock.0 + } +} + impl CancelTimelock { pub const fn new(number_of_blocks: u32) -> Self { Self(number_of_blocks) @@ -64,6 +70,12 @@ impl fmt::Display for CancelTimelock { #[serde(transparent)] pub struct PunishTimelock(u32); +impl From for u32 { + fn from(punish_timelock: PunishTimelock) -> Self { + punish_timelock.0 + } +} + impl PunishTimelock { pub const fn new(number_of_blocks: u32) -> Self { Self(number_of_blocks) diff --git a/swap/src/bitcoin/timelocks.rs b/swap/src/bitcoin/timelocks.rs index e8b72ea6..dee8b4a0 100644 --- a/swap/src/bitcoin/timelocks.rs +++ b/swap/src/bitcoin/timelocks.rs @@ -37,9 +37,13 @@ impl Add for BlockHeight { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum ExpiredTimelocks { - None, - Cancel, + None { + blocks_left: u32, + }, + Cancel { + blocks_left: u32, + }, Punish, } diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index 9a8500b6..ec3bba11 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -274,7 +274,7 @@ impl Subscription { pub async fn wait_until_confirmed_with(&self, target: T) -> Result<()> where - u32: PartialOrd, + T: Into, T: Copy, { self.wait_until(|status| status.is_confirmed_with(target)) @@ -926,10 +926,19 @@ impl Confirmed { } pub fn meets_target(&self, target: T) -> bool - where - u32: PartialOrd, + where T: Into { - self.confirmations() >= target + self.confirmations() >= target.into() + } + + pub fn blocks_left_until(&self, target: T) -> u32 + where T: Into, T: Copy + { + if self.meets_target(target) { + 0 + } else { + target.into() - self.confirmations() + } } } @@ -941,8 +950,7 @@ impl ScriptStatus { /// Check if the script has met the given confirmation target. pub fn is_confirmed_with(&self, target: T) -> bool - where - u32: PartialOrd, + where T: Into { match self { ScriptStatus::Confirmed(inner) => inner.meets_target(target), @@ -950,6 +958,18 @@ impl ScriptStatus { } } + // Calculate the number of blocks left until the target is met. + pub fn blocks_left_until(&self, target: T) -> u32 + where T: Into, T: Copy + { + match self { + ScriptStatus::Confirmed(inner) => { + inner.blocks_left_until(target) + } + _ => target.into(), + } + } + pub fn has_been_seen(&self) -> bool { matches!(self, ScriptStatus::InMempool | ScriptStatus::Confirmed(_)) } @@ -1005,6 +1025,33 @@ mod tests { assert_eq!(confirmed.depth, 0) } + #[test] + fn given_depth_0_should_return_0_blocks_left_until_1() { + let script = ScriptStatus::Confirmed(Confirmed { depth: 0 }); + + let blocks_left = script.blocks_left_until(1); + + assert_eq!(blocks_left, 0) + } + + #[test] + fn given_depth_1_should_return_0_blocks_left_until_1() { + let script = ScriptStatus::Confirmed(Confirmed { depth: 1 }); + + let blocks_left = script.blocks_left_until(1); + + assert_eq!(blocks_left, 0) + } + + #[test] + fn given_depth_0_should_return_1_blocks_left_until_2() { + let script = ScriptStatus::Confirmed(Confirmed { depth: 0 }); + + let blocks_left = script.blocks_left_until(2); + + assert_eq!(blocks_left, 1) + } + #[test] fn given_one_BTC_and_100k_sats_per_vb_fees_should_not_hit_max() { // 400 weight = 100 vbyte diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 941e5a43..df59c6a6 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -112,7 +112,7 @@ where } AliceState::BtcLocked { state3 } => { match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None => { + ExpiredTimelocks::None {..} => { // Record the current monero wallet block height so we don't have to scan from // block 0 for scenarios where we create a refund wallet. let monero_wallet_restore_blockheight = monero_wallet.block_height().await?; @@ -135,7 +135,7 @@ where transfer_proof, state3, } => match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None => { + ExpiredTimelocks::None {..} => { monero_wallet .watch_for_transfer(state3.lock_xmr_watch_request(transfer_proof.clone(), 1)) .await @@ -221,7 +221,7 @@ where encrypted_signature, state3, } => match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None => { + ExpiredTimelocks::None {..} => { let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; match state3.signed_redeem_transaction(*encrypted_signature) { Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await { diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index f683ffa7..e390ec41 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -440,7 +440,7 @@ impl State3 { self.tx_lock.txid() } - pub async fn current_epoch( + pub async fn expired_timelock( &self, bitcoin_wallet: &bitcoin::Wallet, ) -> Result { diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 66933a87..db8994fd 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -117,7 +117,7 @@ async fn next_state( } => { let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; - if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? { + if let ExpiredTimelocks::None {..} = state3.expired_timelock(bitcoin_wallet).await? { let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock); @@ -156,7 +156,7 @@ async fn next_state( } => { let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; - if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? { + if let ExpiredTimelocks::None {..} = state.expired_timelock(bitcoin_wallet).await? { let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); select! { @@ -185,7 +185,7 @@ async fn next_state( BobState::XmrLocked(state) => { let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; - if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { + if let ExpiredTimelocks::None {..} = state.expired_timelock(bitcoin_wallet).await? { // Alice has locked Xmr // Bob sends Alice his key @@ -209,7 +209,7 @@ async fn next_state( BobState::EncSigSent(state) => { let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; - if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { + if let ExpiredTimelocks::None {..} = state.expired_timelock(bitcoin_wallet).await? { select! { state5 = state.watch_for_redeem_btc(bitcoin_wallet) => { BobState::BtcRedeemed(state5?) @@ -269,12 +269,12 @@ async fn next_state( BobState::BtcCancelled(state) => { // Bob has cancelled the swap match state.expired_timelock(bitcoin_wallet).await? { - ExpiredTimelocks::None => { + ExpiredTimelocks::None {..} => { bail!( "Internal error: canceled state reached before cancel timelock was expired" ); } - ExpiredTimelocks::Cancel => { + ExpiredTimelocks::Cancel { .. } => { state.publish_refund_btc(bitcoin_wallet).await?; BobState::BtcRefunded(state) } diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index 946b5b94..98ba5185 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -18,19 +18,19 @@ pub fn register_modules(context: Arc) -> RpcModule> { .register_async_method("get_bitcoin_balance", |_, context| async move { get_bitcoin_balance(&context).await }) - .expect("Could not register RPC method get_bitcoin_balance"); + .unwrap(); module .register_async_method("get_history", |_, context| async move { get_history(&context).await }) - .expect("Could not register RPC method get_history"); + .unwrap(); module .register_async_method("get_raw_history", |_, context| async move { get_raw_history(&context).await }) - .expect("Could not register RPC method get_raw_history"); + .unwrap(); module .register_async_method("get_seller", |params, context| async move { @@ -42,7 +42,7 @@ pub fn register_modules(context: Arc) -> RpcModule> { get_seller(*swap_id, &context).await }) - .expect("Could not register RPC method get_seller"); + .unwrap(); module .register_async_method("get_swap_start_date", |params, context| async move { @@ -54,7 +54,7 @@ pub fn register_modules(context: Arc) -> RpcModule> { get_swap_start_date(*swap_id, &context).await }) - .expect("Could not register RPC method get_swap_start_date"); + .unwrap(); module .register_async_method("resume_swap", |params, context| async move { @@ -66,7 +66,18 @@ pub fn register_modules(context: Arc) -> RpcModule> { resume_swap(*swap_id, &context).await }) - .expect("Could not register RPC method resume_swap"); + .unwrap(); + + module.register_async_method("get_swap_expired_timelock", |params, context| async move { + let params: HashMap = params.parse()?; + + let swap_id = params.get("swap_id").ok_or_else(|| { + jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()) + })?; + + get_swap_timelock(*swap_id, &context).await + }).unwrap(); + module .register_async_method("cancel_refund_swap", |params, context| async move { let params: HashMap = params.parse()?; @@ -77,7 +88,7 @@ pub fn register_modules(context: Arc) -> RpcModule> { cancel_and_refund_swap(*swap_id, &context).await }) - .expect("Could not register RPC method cancel_refund_swap"); + .unwrap(); module .register_async_method("withdraw_btc", |params, context| async move { let params: HashMap = params.parse()?; @@ -145,7 +156,7 @@ pub fn register_modules(context: Arc) -> RpcModule> { ) .await }) - .expect("Could not register RPC method buy_xmr"); + .unwrap(); module .register_async_method("list_sellers", |params, context| async move { let params: HashMap = params.parse()?; @@ -155,10 +166,11 @@ pub fn register_modules(context: Arc) -> RpcModule> { list_sellers(rendezvous_point.clone(), &context).await }) - .expect("Could not register RPC method list_sellers"); + .unwrap(); module.register_async_method("get_current_swap", |_, context| async move { get_current_swap(&context).await - }).expect("Could not register RPC method get_current_swap"); + }).unwrap(); + module } @@ -220,6 +232,15 @@ async fn resume_swap( }, context).await } +async fn get_swap_timelock( + swap_id: Uuid, + context: &Arc, +) -> Result { + execute_request(Method::GetSwapExpiredTimelock { + swap_id + }, context).await +} + async fn cancel_and_refund_swap( swap_id: Uuid, context: &Arc,