diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index ba16cf1c..f29f73a3 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -19,12 +19,13 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use tracing::{debug_span, Instrument}; +use tracing::{debug_span, Instrument, Span, field}; use uuid::Uuid; #[derive(PartialEq, Debug)] pub struct Request { pub cmd: Method, + pub log_reference: Option, } #[derive(Debug, PartialEq)] @@ -66,21 +67,83 @@ pub enum Method { SuspendCurrentSwap, } +impl Method { + fn get_tracing_span(&self, log_reference_id: Option) -> Span { + let span = match self { + Method::Balance => debug_span!("method", name = "Balance", log_reference_id=field::Empty), + Method::BuyXmr { swap_id, .. } => { + debug_span!("method", name="BuyXmr", swap_id=%swap_id, log_reference_id=field::Empty) + } + Method::CancelAndRefund { swap_id } => { + debug_span!("method", name="CancelAndRefund", swap_id=%swap_id, log_reference_id=field::Empty) + } + Method::Resume { swap_id } => { + debug_span!("method", name="Resume", swap_id=%swap_id, log_reference_id=field::Empty) + } + Method::Config => debug_span!("method", name = "Config", log_reference_id=field::Empty), + Method::ExportBitcoinWallet => { + debug_span!("method", name = "ExportBitcoinWallet", log_reference_id=field::Empty) + } + Method::GetCurrentSwap => { + debug_span!("method", name = "GetCurrentSwap", log_reference_id=field::Empty) + } + Method::GetSwapInfo { .. } => { + debug_span!("method", name = "GetSwapInfo", log_reference_id=field::Empty) + } + Method::History => debug_span!("method", name = "History", log_reference_id=field::Empty), + Method::ListSellers { .. } => { + debug_span!("method", name = "ListSellers", log_reference_id=field::Empty) + } + Method::MoneroRecovery { .. } => { + debug_span!("method", name = "MoneroRecovery", log_reference_id=field::Empty) + } + Method::RawHistory => debug_span!("method", name = "RawHistory", log_reference_id=field::Empty), + Method::StartDaemon { .. } => { + debug_span!("method", name = "StartDaemon", log_reference_id=field::Empty) + } + Method::SuspendCurrentSwap => { + debug_span!("method", name = "SuspendCurrentSwap", log_reference_id=field::Empty) + } + Method::WithdrawBtc { .. } => { + debug_span!("method", name = "WithdrawBtc", log_reference_id=field::Empty) + } + }; + if let Some(log_reference_id) = log_reference_id { + span.record("log_reference_id", &log_reference_id.as_str()); + } + span + } +} + impl Request { pub fn new(cmd: Method) -> Request { - Request { cmd } + Request { cmd, log_reference: None } } - async fn handle_cmd(self, context: Arc) -> Result { + pub fn with_id(cmd: Method, id: Option) -> Request { + Request { cmd, log_reference: id } + } + + // We pass the outer tracing span down to this function such that it can be passed down to other spawned tokio tasks + // This ensures that tasks like the event_loop are all part of the same tracing span + async fn handle_cmd( + self, + context: Arc, + ) -> Result { match self.cmd { Method::SuspendCurrentSwap => { - context.swap_lock.send_suspend_signal().await?; let swap_id = context.swap_lock.get_current_swap_id().await; - Ok(json!({ - "swapId": swap_id, - "success": true - })) + if swap_id.is_some() { + context.swap_lock.send_suspend_signal().await?; + + Ok(json!({ + "success": true, + "swapId": swap_id.unwrap() + })) + } else { + bail!("No swap is currently running") + } } Method::GetSwapInfo { swap_id } => { let bitcoin_wallet = context @@ -106,7 +169,7 @@ impl Request { let start_date = context.db.get_swap_start_date(swap_id).await?; - let state_name = format!("{:?}", swap_state); + let state_name = format!("{}", swap_state); // variable timelock: Option> let timelock = match swap_state { @@ -132,6 +195,7 @@ impl Request { // Add txids Ok(json!({ + "swapId": swap_id, "seller": { "peerId": peerId.to_string(), "addresses": addresses @@ -140,6 +204,7 @@ impl Request { "startDate": start_date, // If none return null, if some unwrap and return as json "timelock": timelock.map(|tl| tl.map(|tl| json!(tl)).unwrap_or(json!(null))).unwrap_or(json!(null)), + // Use display to get the string representation of the state "stateName": state_name, })) } @@ -195,7 +260,7 @@ impl Request { let (event_loop, mut event_loop_handle) = EventLoop::new(swap_id, swarm, seller_peer_id)?; - let event_loop = tokio::spawn(event_loop.run()); + let event_loop = tokio::spawn(event_loop.run().instrument(Span::current())); let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size()); let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount); @@ -278,12 +343,138 @@ impl Request { .release_swap_lock() .await .expect("Could not release swap lock"); - }); + }.instrument(Span::current())); Ok(json!({ "swapId": swap_id.to_string(), })) } + Method::Resume { swap_id } => { + context.swap_lock.acquire_swap_lock(swap_id).await?; + + tokio::spawn(async move { + tokio::select! { + _ = async { + let seller_peer_id = context.db.get_peer_id(swap_id).await?; + let seller_addresses = context.db.get_addresses(seller_peer_id).await?; + + let seed = context + .config + .seed + .as_ref() + .context("Could not get seed")? + .derive_libp2p_identity(); + + let behaviour = cli::Behaviour::new( + seller_peer_id, + context.config.env_config, + Arc::clone( + context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?, + ), + (seed.clone(), context.config.namespace), + ); + let mut swarm = swarm::cli( + seed.clone(), + context + .config + .tor_socks5_port + .context("Could not get Tor SOCKS5 port")?, + behaviour, + ) + .await?; + let our_peer_id = swarm.local_peer_id(); + + tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); + + for seller_address in seller_addresses { + swarm + .behaviour_mut() + .add_address(seller_peer_id, seller_address); + } + + let (event_loop, event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id)?; + let handle = tokio::spawn(event_loop.run().instrument(Span::current())); + + let monero_receive_address = context.db.get_monero_address(swap_id).await?; + let swap = Swap::from_db( + Arc::clone(&context.db), + swap_id, + Arc::clone( + context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?, + ), + Arc::clone( + context + .monero_wallet + .as_ref() + .context("Could not get Monero wallet")?, + ), + context.config.env_config, + event_loop_handle, + monero_receive_address, + ) + .await?; + + tokio::select! { + event_loop_result = handle => { + event_loop_result?; + }, + swap_result = bob::run(swap) => { + swap_result?; + } + }; + Ok::<(), anyhow::Error>(()) + } => { + () + }, + _ = context.swap_lock.listen_for_swap_force_suspension() => { + tracing::info!("Shutdown signal received, exiting"); + () + } + } + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); + }.instrument(Span::current())); + Ok(json!({ + "result": "ok", + })) + } + Method::CancelAndRefund { swap_id } => { + let bitcoin_wallet = context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?; + + context.swap_lock.acquire_swap_lock(swap_id).await?; + + let state = cli::cancel_and_refund( + swap_id, + Arc::clone(bitcoin_wallet), + Arc::clone(&context.db), + ) + .await; + + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); + + state.map(|state| { + json!({ + "result": state, + }) + }) + } Method::History => { let swaps = context.db.all().await?; let mut vec: Vec<(Uuid, String)> = Vec::new(); @@ -377,132 +568,6 @@ impl Request { "balance": bitcoin_balance.to_sat() })) } - Method::Resume { swap_id } => { - context.swap_lock.acquire_swap_lock(swap_id).await?; - - tokio::spawn(async move { - tokio::select! { - _ = async { - let seller_peer_id = context.db.get_peer_id(swap_id).await?; - let seller_addresses = context.db.get_addresses(seller_peer_id).await?; - - let seed = context - .config - .seed - .as_ref() - .context("Could not get seed")? - .derive_libp2p_identity(); - - let behaviour = cli::Behaviour::new( - seller_peer_id, - context.config.env_config, - Arc::clone( - context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?, - ), - (seed.clone(), context.config.namespace), - ); - let mut swarm = swarm::cli( - seed.clone(), - context - .config - .tor_socks5_port - .context("Could not get Tor SOCKS5 port")?, - behaviour, - ) - .await?; - let our_peer_id = swarm.local_peer_id(); - - tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); - - for seller_address in seller_addresses { - swarm - .behaviour_mut() - .add_address(seller_peer_id, seller_address); - } - - let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; - let handle = tokio::spawn(event_loop.run()); - - let monero_receive_address = context.db.get_monero_address(swap_id).await?; - let swap = Swap::from_db( - Arc::clone(&context.db), - swap_id, - Arc::clone( - context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?, - ), - Arc::clone( - context - .monero_wallet - .as_ref() - .context("Could not get Monero wallet")?, - ), - context.config.env_config, - event_loop_handle, - monero_receive_address, - ) - .await?; - - tokio::select! { - event_loop_result = handle => { - event_loop_result?; - }, - swap_result = bob::run(swap) => { - swap_result?; - } - }; - Ok::<(), anyhow::Error>(()) - } => { - () - }, - _ = context.swap_lock.listen_for_swap_force_suspension() => { - tracing::info!("Shutdown signal received, exiting"); - () - } - } - context - .swap_lock - .release_swap_lock() - .await - .expect("Could not release swap lock"); - }); - Ok(json!({ - "result": "ok", - })) - } - Method::CancelAndRefund { swap_id } => { - let bitcoin_wallet = context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?; - - context.swap_lock.acquire_swap_lock(swap_id).await?; - - let state = cli::cancel_and_refund( - swap_id, - Arc::clone(bitcoin_wallet), - Arc::clone(&context.db), - ) - .await; - - context - .swap_lock - .release_swap_lock() - .await - .expect("Could not release swap lock"); - - state.map(|state| { - json!({ - "result": state, - }) - }) - } Method::ListSellers { rendezvous_point } => { let rendezvous_node_peer_id = rendezvous_point .extract_peer_id() @@ -608,13 +673,11 @@ impl Request { } pub async fn call(self, context: Arc) -> Result { - // If the swap ID is set, we add it to the span - let call_span = debug_span!( - "cmd", - method = ?self.cmd, - ); + let method_span = self.cmd.get_tracing_span(self.log_reference.clone()).clone(); - self.handle_cmd(context).instrument(call_span).await + self.handle_cmd(context) + .instrument(method_span) + .await } } @@ -627,6 +690,7 @@ fn qr_code(value: &impl ToString) -> Result { .build(); Ok(qr_code) } + pub async fn determine_btc_to_swap( json: bool, bid_quote: impl Future>, diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index 6aed30da..c1f8565c 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -24,6 +24,7 @@ use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{watch, Mutex}; +use tracing::{Instrument, Span}; const SLED_TREE_NAME: &str = "default_tree"; @@ -192,7 +193,7 @@ impl Wallet { tokio::time::sleep(Duration::from_secs(5)).await; } - }); + }.instrument(Span::current())); Subscription { receiver, diff --git a/swap/src/cli/tracing.rs b/swap/src/cli/tracing.rs index 5a360d1e..1ede1e8a 100644 --- a/swap/src/cli/tracing.rs +++ b/swap/src/cli/tracing.rs @@ -22,7 +22,6 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef) -> Result<()> { fmt::layer() .with_ansi(false) .with_target(false) - .with_span_events(fmt::format::FmtSpan::FULL) .json() .with_writer(appender), ); diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index 0572636c..32559a1b 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -9,73 +9,93 @@ use libp2p::core::Multiaddr; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use jsonrpsee::types::Params; use uuid::Uuid; pub fn register_modules(context: Arc) -> RpcModule> { let mut module = RpcModule::new(context); module - .register_async_method("suspend_current_swap", |_, context| async move { - execute_request(Method::SuspendCurrentSwap, &context).await + .register_async_method("suspend_current_swap", |params, context| async move { + execute_request(params, Method::SuspendCurrentSwap, &context).await }) .unwrap(); module - .register_async_method("get_swap_info", |params, context| async move { - let params: HashMap = params.parse()?; + .register_async_method("get_swap_info", |params_raw, context| async move { + let params: HashMap = params_raw.parse()?; let swap_id = params.get("swap_id").ok_or_else(|| { jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()) })?; - get_swap_info(*swap_id, &context).await + execute_request( + params_raw, + Method::GetSwapInfo { + swap_id: *swap_id, + }, + &context, + ).await }) .unwrap(); module - .register_async_method("get_bitcoin_balance", |_, context| async move { - get_bitcoin_balance(&context).await + .register_async_method("get_bitcoin_balance", |params, context| async move { + execute_request(params, Method::Balance, &context).await }) .unwrap(); module - .register_async_method("get_history", |_, context| async move { - get_history(&context).await + .register_async_method("get_history", |params, context| async move { + execute_request(params, Method::History, &context).await }) .unwrap(); module - .register_async_method("get_raw_history", |_, context| async move { - get_raw_history(&context).await + .register_async_method("get_raw_history", |params, context| async move { + execute_request(params, Method::RawHistory, &context).await }) .unwrap(); module - .register_async_method("resume_swap", |params, context| async move { - let params: HashMap = params.parse()?; + .register_async_method("resume_swap", |params_raw, context| async move { + let params: HashMap = params_raw.parse()?; let swap_id = params.get("swap_id").ok_or_else(|| { jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()) })?; - resume_swap(*swap_id, &context).await + execute_request( + params_raw, + Method::Resume { + swap_id: *swap_id, + }, + &context, + ).await }) .unwrap(); module - .register_async_method("cancel_refund_swap", |params, context| async move { - let params: HashMap = params.parse()?; + .register_async_method("cancel_refund_swap", |params_raw, context| async move { + let params: HashMap = params_raw.parse()?; let swap_id = params.get("swap_id").ok_or_else(|| { jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()) })?; - cancel_and_refund_swap(*swap_id, &context).await + execute_request( + params_raw, + Method::CancelAndRefund { + swap_id: *swap_id, + }, + &context, + ).await }) .unwrap(); + module - .register_async_method("withdraw_btc", |params, context| async move { - let params: HashMap = params.parse()?; + .register_async_method("withdraw_btc", |params_raw, context| async move { + let params: HashMap = params_raw.parse()?; let amount = if let Some(amount_str) = params.get("amount") { Some( @@ -96,12 +116,19 @@ pub fn register_modules(context: Arc) -> RpcModule> { let withdraw_address = bitcoin_address::validate(withdraw_address, context.config.is_testnet)?; - withdraw_btc(withdraw_address, amount, &context).await + execute_request( + params_raw, + Method::WithdrawBtc { + amount, + address: withdraw_address, + }, + &context, + ).await }) .expect("Could not register RPC method withdraw_btc"); module - .register_async_method("buy_xmr", |params, context| async move { - let params: HashMap = params.parse()?; + .register_async_method("buy_xmr", |params_raw, context| async move { + let params: HashMap = params_raw.parse()?; let bitcoin_change_address = bitcoin::Address::from_str( params.get("bitcoin_change_address").ok_or_else(|| { @@ -132,29 +159,38 @@ pub fn register_modules(context: Arc) -> RpcModule> { })?) .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; - buy_xmr( - bitcoin_change_address, - monero_receive_address, - seller, + execute_request( + params_raw, + Method::BuyXmr { + bitcoin_change_address, + monero_receive_address, + seller, + swap_id: Uuid::new_v4(), + }, &context, - ) - .await + ).await }) .unwrap(); module - .register_async_method("list_sellers", |params, context| async move { - let params: HashMap = params.parse()?; + .register_async_method("list_sellers", |params_raw, context| async move { + let params: HashMap = params_raw.parse()?; let rendezvous_point = params.get("rendezvous_point").ok_or_else(|| { jsonrpsee_core::Error::Custom("Does not contain rendezvous_point".to_string()) })?; - list_sellers(rendezvous_point.clone(), &context).await + execute_request( + params_raw, + Method::ListSellers { + rendezvous_point: rendezvous_point.clone(), + }, + &context, + ).await }) .unwrap(); module - .register_async_method("get_current_swap", |_, context| async move { - get_current_swap(&context).await + .register_async_method("get_current_swap", |params, context| async move { + execute_request(params, Method::GetCurrentSwap, &context).await }) .unwrap(); @@ -162,88 +198,19 @@ pub fn register_modules(context: Arc) -> RpcModule> { } async fn execute_request( + params: Params<'static>, cmd: Method, context: &Arc, ) -> Result { - let request = Request::new(cmd); + let params_parsed = params.parse::>() + .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?; + + let reference_id = params_parsed + .get("log_reference_id"); + + let request = Request::with_id(cmd, reference_id.map(|s| s.clone())); request .call(Arc::clone(context)) .await .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) -} - -async fn get_current_swap( - context: &Arc, -) -> Result { - execute_request(Method::GetCurrentSwap, context).await -} - -async fn get_bitcoin_balance( - context: &Arc, -) -> Result { - execute_request(Method::Balance, context).await -} - -async fn get_history(context: &Arc) -> Result { - execute_request(Method::History, context).await -} - -async fn get_raw_history( - context: &Arc, -) -> Result { - execute_request(Method::RawHistory, context).await -} - -async fn get_swap_info( - swap_id: Uuid, - context: &Arc, -) -> Result { - execute_request(Method::GetSwapInfo { swap_id }, context).await -} - -async fn resume_swap( - swap_id: Uuid, - context: &Arc, -) -> Result { - execute_request(Method::Resume { swap_id }, context).await -} - -async fn cancel_and_refund_swap( - swap_id: Uuid, - context: &Arc, -) -> Result { - execute_request(Method::CancelAndRefund { swap_id }, context).await -} - -async fn withdraw_btc( - address: bitcoin::Address, - amount: Option, - context: &Arc, -) -> Result { - execute_request(Method::WithdrawBtc { amount, address }, context).await -} - -async fn buy_xmr( - bitcoin_change_address: bitcoin::Address, - monero_receive_address: monero::Address, - seller: Multiaddr, - context: &Arc, -) -> Result { - execute_request( - Method::BuyXmr { - seller, - swap_id: Uuid::new_v4(), - bitcoin_change_address, - monero_receive_address, - }, - context, - ) - .await -} - -async fn list_sellers( - rendezvous_point: Multiaddr, - context: &Arc, -) -> Result { - execute_request(Method::ListSellers { rendezvous_point }, context).await -} +} \ No newline at end of file