From 9b503f336767bab202ca44c8213883ea20c7efcb Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Sun, 20 Aug 2023 00:18:44 +0200 Subject: [PATCH] Ensure correct tracing spans --- swap/src/api/request.rs | 324 +++++++++++++++++++++++----------------- 1 file changed, 188 insertions(+), 136 deletions(-) diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index ba16cf1c..06838531 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -15,11 +15,12 @@ use qrcode::QrCode; use serde_json::json; use std::cmp::min; use std::convert::TryInto; +use std::fmt::Formatter; use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use tracing::{debug_span, Instrument}; +use tracing::{debug_span, Instrument, Span}; use uuid::Uuid; #[derive(PartialEq, Debug)] @@ -66,12 +67,62 @@ pub enum Method { SuspendCurrentSwap, } +impl Method { + fn get_tracing_span(&self) -> Span { + match self { + Method::Balance => debug_span!("method", name = "Balance"), + Method::BuyXmr { swap_id, .. } => { + debug_span!("method", name="BuyXmr", swap_id=%swap_id) + } + Method::CancelAndRefund { swap_id } => { + debug_span!("method", name="CancelAndRefund", swap_id=%swap_id) + } + Method::Resume { swap_id } => { + debug_span!("method", name="Resume", swap_id=%swap_id) + } + Method::Config => debug_span!("method", name = "Config"), + Method::ExportBitcoinWallet => { + debug_span!("method", name = "ExportBitcoinWallet") + } + Method::GetCurrentSwap => { + debug_span!("method", name = "GetCurrentSwap") + } + Method::GetSwapInfo { .. } => { + debug_span!("method", name = "GetSwapInfo") + } + Method::History => debug_span!("method", name = "History"), + Method::ListSellers { .. } => { + debug_span!("method", name = "ListSellers") + } + Method::MoneroRecovery { .. } => { + debug_span!("method", name = "MoneroRecovery") + } + Method::RawHistory => debug_span!("method", name = "RawHistory"), + Method::StartDaemon { .. } => { + debug_span!("method", name = "StartDaemon") + } + Method::SuspendCurrentSwap => { + debug_span!("method", name = "SuspendCurrentSwap") + } + Method::WithdrawBtc { .. } => { + debug_span!("method", name = "WithdrawBtc") + } + } + } +} + impl Request { pub fn new(cmd: Method) -> Request { Request { cmd } } - async fn handle_cmd(self, context: Arc) -> Result { + // We pass the outer tracing span down to this function such that it can be passed down to other spawned tokio tasks + // This ensures that tasks like the event_loop are all part of the same tracing span + async fn handle_cmd( + self, + context: Arc, + tracing_span: Span, + ) -> Result { match self.cmd { Method::SuspendCurrentSwap => { context.swap_lock.send_suspend_signal().await?; @@ -151,6 +202,8 @@ impl Request { } => { context.swap_lock.acquire_swap_lock(swap_id).await?; + let tracing_span_clone = tracing_span.clone(); + tokio::spawn(async move { tokio::select! { biased; @@ -195,7 +248,7 @@ impl Request { let (event_loop, mut event_loop_handle) = EventLoop::new(swap_id, swarm, seller_peer_id)?; - let event_loop = tokio::spawn(event_loop.run()); + let event_loop = tokio::spawn(event_loop.run().instrument(tracing_span_clone.clone())); let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size()); let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount); @@ -278,12 +331,139 @@ impl Request { .release_swap_lock() .await .expect("Could not release swap lock"); - }); + }.instrument(tracing_span)); Ok(json!({ "swapId": swap_id.to_string(), })) } + Method::Resume { swap_id } => { + context.swap_lock.acquire_swap_lock(swap_id).await?; + let tracing_span_clone = tracing_span.clone(); + + tokio::spawn(async move { + tokio::select! { + _ = async { + let seller_peer_id = context.db.get_peer_id(swap_id).await?; + let seller_addresses = context.db.get_addresses(seller_peer_id).await?; + + let seed = context + .config + .seed + .as_ref() + .context("Could not get seed")? + .derive_libp2p_identity(); + + let behaviour = cli::Behaviour::new( + seller_peer_id, + context.config.env_config, + Arc::clone( + context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?, + ), + (seed.clone(), context.config.namespace), + ); + let mut swarm = swarm::cli( + seed.clone(), + context + .config + .tor_socks5_port + .context("Could not get Tor SOCKS5 port")?, + behaviour, + ) + .await?; + let our_peer_id = swarm.local_peer_id(); + + tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); + + for seller_address in seller_addresses { + swarm + .behaviour_mut() + .add_address(seller_peer_id, seller_address); + } + + let (event_loop, event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id)?; + let handle = tokio::spawn(event_loop.run().instrument(tracing_span_clone)); + + let monero_receive_address = context.db.get_monero_address(swap_id).await?; + let swap = Swap::from_db( + Arc::clone(&context.db), + swap_id, + Arc::clone( + context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?, + ), + Arc::clone( + context + .monero_wallet + .as_ref() + .context("Could not get Monero wallet")?, + ), + context.config.env_config, + event_loop_handle, + monero_receive_address, + ) + .await?; + + tokio::select! { + event_loop_result = handle => { + event_loop_result?; + }, + swap_result = bob::run(swap) => { + swap_result?; + } + }; + Ok::<(), anyhow::Error>(()) + } => { + () + }, + _ = context.swap_lock.listen_for_swap_force_suspension() => { + tracing::info!("Shutdown signal received, exiting"); + () + } + } + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); + }.instrument(tracing_span)); + Ok(json!({ + "result": "ok", + })) + } + Method::CancelAndRefund { swap_id } => { + let bitcoin_wallet = context + .bitcoin_wallet + .as_ref() + .context("Could not get Bitcoin wallet")?; + + context.swap_lock.acquire_swap_lock(swap_id).await?; + + let state = cli::cancel_and_refund( + swap_id, + Arc::clone(bitcoin_wallet), + Arc::clone(&context.db), + ) + .await; + + context + .swap_lock + .release_swap_lock() + .await + .expect("Could not release swap lock"); + + state.map(|state| { + json!({ + "result": state, + }) + }) + } Method::History => { let swaps = context.db.all().await?; let mut vec: Vec<(Uuid, String)> = Vec::new(); @@ -377,132 +557,6 @@ impl Request { "balance": bitcoin_balance.to_sat() })) } - Method::Resume { swap_id } => { - context.swap_lock.acquire_swap_lock(swap_id).await?; - - tokio::spawn(async move { - tokio::select! { - _ = async { - let seller_peer_id = context.db.get_peer_id(swap_id).await?; - let seller_addresses = context.db.get_addresses(seller_peer_id).await?; - - let seed = context - .config - .seed - .as_ref() - .context("Could not get seed")? - .derive_libp2p_identity(); - - let behaviour = cli::Behaviour::new( - seller_peer_id, - context.config.env_config, - Arc::clone( - context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?, - ), - (seed.clone(), context.config.namespace), - ); - let mut swarm = swarm::cli( - seed.clone(), - context - .config - .tor_socks5_port - .context("Could not get Tor SOCKS5 port")?, - behaviour, - ) - .await?; - let our_peer_id = swarm.local_peer_id(); - - tracing::debug!(peer_id = %our_peer_id, "Network layer initialized"); - - for seller_address in seller_addresses { - swarm - .behaviour_mut() - .add_address(seller_peer_id, seller_address); - } - - let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; - let handle = tokio::spawn(event_loop.run()); - - let monero_receive_address = context.db.get_monero_address(swap_id).await?; - let swap = Swap::from_db( - Arc::clone(&context.db), - swap_id, - Arc::clone( - context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?, - ), - Arc::clone( - context - .monero_wallet - .as_ref() - .context("Could not get Monero wallet")?, - ), - context.config.env_config, - event_loop_handle, - monero_receive_address, - ) - .await?; - - tokio::select! { - event_loop_result = handle => { - event_loop_result?; - }, - swap_result = bob::run(swap) => { - swap_result?; - } - }; - Ok::<(), anyhow::Error>(()) - } => { - () - }, - _ = context.swap_lock.listen_for_swap_force_suspension() => { - tracing::info!("Shutdown signal received, exiting"); - () - } - } - context - .swap_lock - .release_swap_lock() - .await - .expect("Could not release swap lock"); - }); - Ok(json!({ - "result": "ok", - })) - } - Method::CancelAndRefund { swap_id } => { - let bitcoin_wallet = context - .bitcoin_wallet - .as_ref() - .context("Could not get Bitcoin wallet")?; - - context.swap_lock.acquire_swap_lock(swap_id).await?; - - let state = cli::cancel_and_refund( - swap_id, - Arc::clone(bitcoin_wallet), - Arc::clone(&context.db), - ) - .await; - - context - .swap_lock - .release_swap_lock() - .await - .expect("Could not release swap lock"); - - state.map(|state| { - json!({ - "result": state, - }) - }) - } Method::ListSellers { rendezvous_point } => { let rendezvous_node_peer_id = rendezvous_point .extract_peer_id() @@ -608,13 +662,11 @@ impl Request { } pub async fn call(self, context: Arc) -> Result { - // If the swap ID is set, we add it to the span - let call_span = debug_span!( - "cmd", - method = ?self.cmd, - ); + let method_span = self.cmd.get_tracing_span(); - self.handle_cmd(context).instrument(call_span).await + self.handle_cmd(context, method_span.clone()) + .instrument(method_span) + .await } }