From 63de5d407d2c4b5acbd0d763ddfb7b8f3fbf6d87 Mon Sep 17 00:00:00 2001 From: Mohan <86064887+binarybaron@users.noreply.github.com> Date: Tue, 3 Jun 2025 11:06:51 +0200 Subject: [PATCH] refactor(wallet): Optimize Bitcoin timelock subscriptions for BDK upgrade (#374) --- swap/src/asb/recovery/redeem.rs | 1 + swap/src/bitcoin/cancel.rs | 6 +++- swap/src/bitcoin/wallet.rs | 64 +++++++++++++++++++++++++++------ swap/src/protocol/alice/swap.rs | 26 +++++++++++--- 4 files changed, 82 insertions(+), 15 deletions(-) diff --git a/swap/src/asb/recovery/redeem.rs b/swap/src/asb/recovery/redeem.rs index 47f4a048..f0abe254 100644 --- a/swap/src/asb/recovery/redeem.rs +++ b/swap/src/asb/recovery/redeem.rs @@ -57,6 +57,7 @@ pub async fn redeem( } AliceState::BtcRedeemTransactionPublished { state3 } => { let subscription = bitcoin_wallet.subscribe_to(state3.tx_redeem()).await; + if let Finality::Await = finality { subscription.wait_until_final().await?; } diff --git a/swap/src/bitcoin/cancel.rs b/swap/src/bitcoin/cancel.rs index d3d01d71..ed29019b 100644 --- a/swap/src/bitcoin/cancel.rs +++ b/swap/src/bitcoin/cancel.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::HashMap; use std::fmt; -use std::ops::Add; +use std::ops::{Add}; use typeshare::typeshare; /// Represent a timelock, expressed in relative block height as defined in @@ -39,6 +39,10 @@ impl CancelTimelock { pub const fn new(number_of_blocks: u32) -> Self { Self(number_of_blocks) } + + pub fn half(&self) -> CancelTimelock { + Self(self.0 / 2) + } } impl Add for BlockHeight { diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index 7546e0a7..dadf55db 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -680,13 +680,26 @@ impl Wallet { where T: Watchable, { - self.electrum_client.lock().await.status_of_script(tx) + self.electrum_client.lock().await.status_of_script(tx, true) } pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription { let txid = tx.id(); let script = tx.script(); + let initial_status = match self + .electrum_client + .lock() + .await + .status_of_script(&tx, false) + { + Ok(status) => Some(status), + Err(err) => { + tracing::debug!(%txid, %err, "Failed to get initial status for subscription. We won't notify the caller and will try again later."); + None + } + }; + let sub = self .electrum_client .lock() @@ -698,12 +711,12 @@ impl Wallet { let client = self.electrum_client.clone(); tokio::spawn(async move { - let mut last_status = None; + let mut last_status = initial_status; loop { let new_status = client.lock() .await - .status_of_script(&tx) + .status_of_script(&tx, false) .unwrap_or_else(|error| { tracing::warn!(%txid, "Failed to get status of script: {:#}", error); ScriptStatus::Retrying @@ -1511,6 +1524,18 @@ impl Client { Ok(()) } + /// Update the client state for a single script. + /// + /// As opposed to [`update_state`] this function does not + /// check the time since the last update before refreshing + /// It therefore also does not take a [`force`] parameter + pub fn update_state_single(&mut self, script: &impl Watchable) -> Result<()> { + self.update_script_history(script)?; + self.update_block_height()?; + + Ok(()) + } + /// Update the block height. fn update_block_height(&mut self) -> Result<()> { let latest_block = self @@ -1535,7 +1560,7 @@ impl Client { fn update_script_histories(&mut self) -> Result<()> { let scripts = self.script_history.keys().map(|s| s.as_script()); - let histories = self + let histories: Vec> = self .electrum .inner .batch_script_get_history(scripts) @@ -1555,6 +1580,17 @@ impl Client { Ok(()) } + /// Update the script history of a single script. + pub fn update_script_history(&mut self, script: &impl Watchable) -> Result<()> { + let (script, _) = script.script_and_txid(); + + let history = self.electrum.inner.script_get_history(script.as_script())?; + + self.script_history.insert(script, history); + + Ok(()) + } + /// Broadcast a transaction to the network. pub fn transaction_broadcast(&self, transaction: &Transaction) -> Result> { // Broadcast the transaction to the network. @@ -1570,21 +1606,29 @@ impl Client { } /// Get the status of a script. - pub fn status_of_script(&mut self, script: &impl Watchable) -> Result { - let (script, txid) = script.script_and_txid(); + pub fn status_of_script( + &mut self, + script: &impl Watchable, + force: bool, + ) -> Result { + let (script_buf, txid) = script.script_and_txid(); - if !self.script_history.contains_key(&script) { - self.script_history.insert(script.clone(), vec![]); + if !self.script_history.contains_key(&script_buf) { + self.script_history.insert(script_buf.clone(), vec![]); // Immediately refetch the status of the script // when we first subscribe to it. - self.update_state(true)?; + self.update_state_single(script)?; + } else if force { + // Immediately refetch the status of the script + // when [`force`] is set to true + self.update_state_single(script)?; } else { // Otherwise, don't force a refetch. self.update_state(false)?; } - let history = self.script_history.entry(script).or_default(); + let history = self.script_history.entry(script_buf).or_default(); let history_of_tx: Vec<&GetHistoryRes> = history .iter() diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index a6cf5712..859da5b5 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -74,6 +74,7 @@ where Ok(match state { AliceState::Started { state3 } => { let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + match timeout( env_config.bitcoin_lock_mempool_timeout, tx_lock_status.wait_until_seen(), @@ -249,11 +250,12 @@ where transfer_proof, state3, } => { - let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + let tx_lock_status_subscription = + bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; select! { biased; // make sure the cancel timelock expiry future is polled first - result = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => { + result = tx_lock_status_subscription.wait_until_confirmed_with(state3.cancel_timelock) => { result?; AliceState::CancelTimelockExpired { monero_wallet_restore_blockheight, @@ -262,6 +264,20 @@ where } } enc_sig = event_loop_handle.recv_encrypted_signature() => { + // Fetch the status as early as possible to update the internal cache of our Electurm client + // Prevents redundant network requests later on when we redeem the Bitcoin + let tx_lock_status = bitcoin_wallet.status_of_script(&state3.tx_lock.clone()).await?; + + if tx_lock_status.is_confirmed_with(state3.cancel_timelock.half()) { + tx_lock_status_subscription.wait_until_confirmed_with(state3.cancel_timelock).await?; + + return Ok(AliceState::CancelTimelockExpired { + monero_wallet_restore_blockheight, + transfer_proof, + state3, + }) + } + tracing::info!("Received encrypted signature"); AliceState::EncSigLearned { @@ -399,8 +415,10 @@ where transfer_proof, state3, } => { - let tx_refund_status = bitcoin_wallet.subscribe_to(state3.tx_refund()).await; - let tx_cancel_status = bitcoin_wallet.subscribe_to(state3.tx_cancel()).await; + let (tx_refund_status, tx_cancel_status) = tokio::join!( + bitcoin_wallet.subscribe_to(state3.tx_refund()), + bitcoin_wallet.subscribe_to(state3.tx_cancel()) + ); select! { seen_refund = tx_refund_status.wait_until_seen() => {