diff --git a/swap/src/api.rs b/swap/src/api.rs index 9e19f89a..68d16877 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -203,13 +203,13 @@ fn env_config_from(testnet: bool) -> EnvConfig { } #[cfg(test)] pub mod api_test { - use crate::api::request::{Method, Params, Request, Shutdown}; - use crate::tor::DEFAULT_SOCKS5_PORT; + use super::*; + use crate::api::request::{Method, Params, Request}; + use libp2p::Multiaddr; use std::str::FromStr; use tokio::sync::broadcast; use uuid::Uuid; - use super::*; pub const MULTI_ADDRESS: &str = "/ip4/127.0.0.1/tcp/9939/p2p/12D3KooWCdMKjesXMJz1SiZ7HgotrxuqhQJbP5sgBm2BwP1cqThi"; @@ -240,10 +240,11 @@ pub mod api_test { debug, json, is_testnet, - data_dir + data_dir, } } } + impl Request { pub fn buy_xmr(is_testnet: bool, tx: broadcast::Sender<()>) -> Request { let seller = Multiaddr::from_str(MULTI_ADDRESS).unwrap(); @@ -263,49 +264,34 @@ pub mod api_test { } }; - Request { - params: Params { - seller: Some(seller), - bitcoin_change_address: Some(bitcoin_change_address), - monero_receive_address: Some(monero_receive_address), - ..Default::default() - }, - cmd: Method::BuyXmr, - shutdown: Shutdown::new(tx.subscribe()), - } + Request::new(tx.subscribe(), Method::BuyXmr, Params { + seller: Some(seller), + bitcoin_change_address: Some(bitcoin_change_address), + monero_receive_address: Some(monero_receive_address), + swap_id: Some(Uuid::new_v4()), + ..Default::default() + }) } pub fn resume(tx: broadcast::Sender<()>) -> Request { - Request { - params: Params { - swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), - ..Default::default() - }, - cmd: Method::Resume, - shutdown: Shutdown::new(tx.subscribe()), - } + Request::new(tx.subscribe(), Method::Resume, Params { + swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), + ..Default::default() + }) } pub fn cancel(tx: broadcast::Sender<()>) -> Request { - Request { - params: Params { - swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), - ..Default::default() - }, - cmd: Method::CancelAndRefund, - shutdown: Shutdown::new(tx.subscribe()), - } + Request::new(tx.subscribe(), Method::CancelAndRefund, Params { + swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), + ..Default::default() + }) } pub fn refund(tx: broadcast::Sender<()>) -> Request { - Request { - params: Params { - swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), - ..Default::default() - }, - cmd: Method::CancelAndRefund, - shutdown: Shutdown::new(tx.subscribe()), - } + Request::new(tx.subscribe(), Method::CancelAndRefund, Params { + swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), + ..Default::default() + }) } } } diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index ebc0f111..52fcce72 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -15,10 +15,12 @@ use serde_json::json; use std::cmp::min; use std::convert::TryInto; use std::future::Future; +use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use std::net::SocketAddr; use tokio::sync::broadcast; +use tokio::sync::broadcast::Receiver; +use tracing::{debug_span, Instrument}; use uuid::Uuid; #[derive(PartialEq, Debug)] @@ -29,7 +31,7 @@ pub struct Request { } impl Shutdown { - pub fn new(notify: broadcast::Receiver<()>) -> Shutdown { + pub fn new(notify: Receiver<()>) -> Shutdown { Shutdown { shutdown: false, notify, @@ -101,11 +103,21 @@ pub enum Method { } impl Request { - pub async fn call(&mut self, context: Arc) -> Result { - let result = match self.cmd { - Method::BuyXmr => { - let swap_id = Uuid::new_v4(); + pub fn new(shutdownReceiver: Receiver<()>, cmd: Method, params: Params) -> Request { + Request { + params, + cmd, + shutdown: Shutdown::new(shutdownReceiver), + } + } + async fn handle_cmd(&mut self, context: Arc) -> Result { + match self.cmd { + Method::BuyXmr => { + let swap_id = self + .params + .swap_id + .context("Parameter swap_id is missing")?; let seed = context.config.seed.as_ref().context("Could not get seed")?; let env_config = context.config.env_config; let btc = context @@ -217,9 +229,9 @@ impl Request { .context("Failed to complete swap")?; } } - json!({ + Ok(json!({ "empty": "true" - }) + })) } Method::History => { let swaps = context.db.all().await?; @@ -229,11 +241,11 @@ impl Request { vec.push((swap_id, state.to_string())); } - json!({ "swaps": vec }) + Ok(json!({ "swaps": vec })) } Method::RawHistory => { let raw_history = context.db.raw_all().await?; - json!({ "raw_history": raw_history }) + Ok(json!({ "raw_history": raw_history })) } Method::GetSeller => { let swap_id = self.params.swap_id.context("Parameter swap_id is needed")?; @@ -249,10 +261,10 @@ impl Request { .await .with_context(|| "Could not get addressess")?; - json!({ + Ok(json!({ "peerId": peerId.to_base58(), "addresses": addresses - }) + })) } Method::SwapStartDate => { let swap_id = self @@ -262,9 +274,9 @@ impl Request { let start_date = context.db.get_swap_start_date(swap_id).await?; - json!({ + Ok(json!({ "start_date": start_date, - }) + })) } Method::Config => { let data_dir_display = context.config.data_dir.display(); @@ -275,13 +287,13 @@ impl Request { tracing::info!(path=%format!("{}/monero", data_dir_display), "Monero-wallet-rpc directory"); tracing::info!(path=%format!("{}/wallet", data_dir_display), "Internal bitcoin wallet directory"); - json!({ + Ok(json!({ "log_files": format!("{}/logs", data_dir_display), "sqlite": format!("{}/sqlite", data_dir_display), "seed": format!("{}/seed.pem", data_dir_display), "monero-wallet-rpc": format!("{}/monero", data_dir_display), "bitcoin_wallet": format!("{}/wallet", data_dir_display), - }) + })) } Method::WithdrawBtc => { let bitcoin_wallet = context @@ -312,22 +324,20 @@ impl Request { .broadcast(signed_tx.clone(), "withdraw") .await?; - json!({ + Ok(json!({ "signed_tx": signed_tx, "amount": amount.to_sat(), "txid": signed_tx.txid(), - }) + })) } Method::StartDaemon => { let server_address = match self.params.server_address { Some(address) => address, - None => { - "127.0.0.1:3456".parse()? - } + None => "127.0.0.1:3456".parse()?, }; - - let (_, server_handle) = rpc::run_server(server_address, Arc::clone(&context)).await?; + let (_, server_handle) = + rpc::run_server(server_address, Arc::clone(&context)).await?; loop { tokio::select! { @@ -353,9 +363,9 @@ impl Request { "Checked Bitcoin balance", ); - json!({ + Ok(json!({ "balance": bitcoin_balance.to_sat() - }) + })) } Method::Resume => { let swap_id = self @@ -437,9 +447,9 @@ impl Request { swap_result?; } } - json!({ + Ok(json!({ "result": [] - }) + })) } Method::CancelAndRefund => { let bitcoin_wallet = context @@ -456,9 +466,9 @@ impl Request { ) .await?; - json!({ + Ok(json!({ "result": state, - }) + })) } Method::ListSellers => { let rendezvous_point = self @@ -511,7 +521,7 @@ impl Request { } } - json!({ "sellers": sellers }) + Ok(json!({ "sellers": sellers })) } Method::ExportBitcoinWallet => { let bitcoin_wallet = context @@ -521,9 +531,9 @@ impl Request { let wallet_export = bitcoin_wallet.wallet_export("cli").await?; tracing::info!(descriptor=%wallet_export.to_string(), "Exported bitcoin wallet"); - json!({ + Ok(json!({ "result": [] - }) + })) } Method::MoneroRecovery => { let swap_state: BobState = context @@ -567,12 +577,32 @@ impl Request { println!("Spend key: {}", spend_key); } } - json!({ + Ok(json!({ "result": [] - }) + })) } - }; - Ok(result) + } + } + + pub async fn call(&mut self, context: Arc) -> Result { + // If the swap ID is set, we add it to the span + let call_span = self.params.swap_id.map_or_else( + || { + debug_span!( + "call", + method = ?self.cmd, + ) + }, + |swap_id| { + debug_span!( + "call", + method = ?self.cmd, + swap_id = swap_id.to_string(), + ) + }, + ); + + self.handle_cmd(context).instrument(call_span).await } } diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index f70df8dc..5dfaa14d 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -1,4 +1,4 @@ -use crate::api::request::{Method, Params, Request, Shutdown}; +use crate::api::request::{Method, Params, Request}; use crate::api::Context; use crate::bitcoin::{bitcoin_address, Amount}; use crate::monero; @@ -78,16 +78,12 @@ where let bitcoin_change_address = bitcoin_address::validate(bitcoin_change_address, is_testnet)?; - let request = Request { - params: Params { - bitcoin_change_address: Some(bitcoin_change_address), - monero_receive_address: Some(monero_receive_address), - seller: Some(seller), - ..Default::default() - }, - cmd: Method::BuyXmr, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::BuyXmr, Params { + bitcoin_change_address: Some(bitcoin_change_address), + monero_receive_address: Some(monero_receive_address), + seller: Some(seller), + ..Default::default() + }); let context = Context::build( Some(bitcoin), @@ -104,33 +100,21 @@ where (context, request) } CliCommand::History => { - let request = Request { - params: Params::default(), - cmd: Method::History, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::History, Params::default()); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Config => { - let request = Request { - params: Params::default(), - cmd: Method::Config, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::Config, Params::default()); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Balance { bitcoin } => { - let request = Request { - params: Params::default(), - cmd: Method::Balance, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::Balance, Params::default()); let context = Context::build( Some(bitcoin), @@ -152,11 +136,7 @@ where monero, tor, } => { - let request = Request { - params: Params::default(), - cmd: Method::StartDaemon, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::StartDaemon, Params::default()); let context = Context::build( Some(bitcoin), @@ -179,15 +159,11 @@ where } => { let address = bitcoin_address::validate(address, is_testnet)?; - let request = Request { - params: Params { - amount, - address: Some(address), - ..Default::default() - }, - cmd: Method::WithdrawBtc, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::WithdrawBtc, Params { + amount, + address: Some(address), + ..Default::default() + }); let context = Context::build( Some(bitcoin), @@ -209,14 +185,10 @@ where monero, tor, } => { - let request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::Resume, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::Resume, Params { + swap_id: Some(swap_id), + ..Default::default() + }); let context = Context::build( Some(bitcoin), @@ -237,14 +209,10 @@ where bitcoin, tor, } => { - let request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::CancelAndRefund, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::CancelAndRefund, Params { + swap_id: Some(swap_id), + ..Default::default() + }); let context = Context::build( Some(bitcoin), @@ -264,14 +232,10 @@ where rendezvous_point, tor, } => { - let request = Request { - params: Params { - rendezvous_point: Some(rendezvous_point), - ..Default::default() - }, - cmd: Method::ListSellers, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new(rx.subscribe(), Method::ListSellers, Params { + rendezvous_point: Some(rendezvous_point), + ..Default::default() + }); let context = Context::build( None, @@ -289,11 +253,11 @@ where (context, request) } CliCommand::ExportBitcoinWallet { bitcoin } => { - let request = Request { - params: Params::default(), - cmd: Method::ExportBitcoinWallet, - shutdown: Shutdown::new(rx.subscribe()), - }; + let request = Request::new( + rx.subscribe(), + Method::ExportBitcoinWallet, + Params::default(), + ); let context = Context::build( Some(bitcoin), @@ -309,15 +273,13 @@ where .await?; (context, request) } - CliCommand::MoneroRecovery { swap_id } => { - let request = Request { - params: Params { - swap_id: Some(swap_id.swap_id), - ..Default::default() - }, - cmd: Method::MoneroRecovery, - shutdown: Shutdown::new(rx.subscribe()), - }; + CliCommand::MoneroRecovery { + swap_id: SwapId { swap_id }, + } => { + let request = Request::new(rx.subscribe(), Method::MoneroRecovery, Params { + swap_id: Some(swap_id), + ..Default::default() + }); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; @@ -575,7 +537,7 @@ struct Seller { #[cfg(test)] mod tests { use super::*; - use crate::tor::DEFAULT_SOCKS5_PORT; + use crate::api::api_test::*; use crate::api::Config; diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index 8addc7a9..1f98b50e 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -1,4 +1,4 @@ -use crate::api::request::{Method, Params, Request, Shutdown}; +use crate::api::request::{Method, Params, Request}; use crate::api::Context; use crate::bitcoin::bitcoin_address; use crate::monero::monero_address; @@ -142,129 +142,78 @@ pub fn register_modules(context: Arc) -> RpcModule> { module } +async fn execute_request( + cmd: Method, + params: Params, + context: &Arc, +) -> Result { + let mut request = Request::new(context.shutdown.subscribe(), cmd, params); + request + .call(Arc::clone(context)) + .await + .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) +} + async fn get_bitcoin_balance( context: &Arc, ) -> Result { - let mut request = Request { - params: Params::default(), - cmd: Method::Balance, - shutdown: Shutdown::new(context.shutdown.subscribe()), - }; - let balance = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - - Ok(balance) + execute_request(Method::Balance, Params::default(), context).await } async fn get_history(context: &Arc) -> Result { - let mut request = Request { - params: Params::default(), - cmd: Method::History, - shutdown: Shutdown::new(context.shutdown.subscribe()), - }; - let history = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - - Ok(history) + execute_request(Method::History, Params::default(), context).await } + async fn get_raw_history( context: &Arc, ) -> Result { - let mut request = Request { - params: Params::default(), - cmd: Method::RawHistory, - shutdown: Shutdown::new(context.shutdown.subscribe()), - }; - let history = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - - Ok(history) + execute_request(Method::RawHistory, Params::default(), context).await } async fn get_seller( swap_id: Uuid, context: &Arc, ) -> Result { - let mut request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::GetSeller, - shutdown: Shutdown::new(context.shutdown.subscribe()), + let params = Params { + swap_id: Some(swap_id), + ..Default::default() }; - let result = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - - Ok(result) + execute_request(Method::GetSeller, params, context).await } async fn get_swap_start_date( swap_id: Uuid, context: &Arc, ) -> Result { - let mut request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::SwapStartDate, - shutdown: Shutdown::new(context.shutdown.subscribe()), + let params = Params { + swap_id: Some(swap_id), + ..Default::default() }; - let result = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - - Ok(result) + execute_request(Method::SwapStartDate, params, context).await } async fn resume_swap( swap_id: Uuid, context: &Arc, ) -> Result { - let mut request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::Resume, - shutdown: Shutdown::new(context.shutdown.subscribe()), + let params = Params { + swap_id: Some(swap_id), + ..Default::default() }; - - let result = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - Ok(result) + execute_request(Method::Resume, params, context).await } + async fn withdraw_btc( withdraw_address: bitcoin::Address, amount: Option, context: &Arc, ) -> Result { - let mut request = Request { - params: Params { - amount, - address: Some(withdraw_address), - ..Default::default() - }, - cmd: Method::WithdrawBtc, - shutdown: Shutdown::new(context.shutdown.subscribe()), + let params = Params { + amount, + address: Some(withdraw_address), + ..Default::default() }; - let result = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - Ok(result) + execute_request(Method::WithdrawBtc, params, context).await } async fn buy_xmr( @@ -273,38 +222,24 @@ async fn buy_xmr( seller: Multiaddr, context: &Arc, ) -> Result { - let mut request = Request { - params: Params { - bitcoin_change_address: Some(bitcoin_change_address), - monero_receive_address: Some(monero_receive_address), - seller: Some(seller), - ..Default::default() - }, - cmd: Method::BuyXmr, - shutdown: Shutdown::new(context.shutdown.subscribe()), + let params = Params { + bitcoin_change_address: Some(bitcoin_change_address), + monero_receive_address: Some(monero_receive_address), + seller: Some(seller), + swap_id: Some(Uuid::new_v4()), + ..Default::default() }; - let swap = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - Ok(swap) + + execute_request(Method::BuyXmr, params, context).await } async fn list_sellers( rendezvous_point: Multiaddr, context: &Arc, ) -> Result { - let mut request = Request { - params: Params { - rendezvous_point: Some(rendezvous_point), - ..Default::default() - }, - cmd: Method::ListSellers, - shutdown: Shutdown::new(context.shutdown.subscribe()), + let params = Params { + rendezvous_point: Some(rendezvous_point), + ..Default::default() }; - let result = request - .call(Arc::clone(context)) - .await - .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - Ok(result) + execute_request(Method::ListSellers, params, context).await } diff --git a/swap/tests/rpc.rs b/swap/tests/rpc.rs index 578d2902..2700cef4 100644 --- a/swap/tests/rpc.rs +++ b/swap/tests/rpc.rs @@ -1,22 +1,22 @@ -use anyhow::{bail, Context as AnyContext, Result}; -use futures::Future; +use anyhow::{Result}; + use jsonrpsee::ws_client::WsClientBuilder; -use jsonrpsee::{rpc_params, RpcModule}; +use jsonrpsee::{rpc_params}; use jsonrpsee_core::client::ClientT; use jsonrpsee_core::params::ObjectParams; -use jsonrpsee_types::error::CallError; + use sequential_test::sequential; -use serde_json::{json, Value}; + use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use swap::api::request::{Method, Params, Request, Shutdown}; -use swap::api::{Config, Context}; +use swap::api::{Context}; use swap::cli::command::{Bitcoin, Monero}; -use testcontainers::clients::Cli; -use testcontainers::{Container, Docker, RunArgs}; + + use tokio::sync::broadcast; -use tokio::time::{interval, timeout}; + use uuid::Uuid; #[cfg(test)] @@ -42,15 +42,11 @@ pub async fn initialize_context() -> (Arc, Request) { bitcoin_target_block: None, }; - let monero = Monero { + let _monero = Monero { monero_daemon_address: None, }; - let mut request = Request { - params: Params::default(), - cmd: Method::StartDaemon, - shutdown: Shutdown::new(tx.subscribe()), - }; + let request = Request::new(tx.subscribe(), Method::StartDaemon, Params::default()); let context = Context::build( Some(bitcoin),