refactor(wallet): Optimize Bitcoin timelock subscriptions for BDK upgrade (#374)

This commit is contained in:
Mohan 2025-06-03 11:06:51 +02:00 committed by GitHub
parent 3c7f863b3b
commit 63de5d407d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 82 additions and 15 deletions

View file

@ -57,6 +57,7 @@ pub async fn redeem(
} }
AliceState::BtcRedeemTransactionPublished { state3 } => { AliceState::BtcRedeemTransactionPublished { state3 } => {
let subscription = bitcoin_wallet.subscribe_to(state3.tx_redeem()).await; let subscription = bitcoin_wallet.subscribe_to(state3.tx_redeem()).await;
if let Finality::Await = finality { if let Finality::Await = finality {
subscription.wait_until_final().await?; subscription.wait_until_final().await?;
} }

View file

@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::ops::Add; use std::ops::{Add};
use typeshare::typeshare; use typeshare::typeshare;
/// Represent a timelock, expressed in relative block height as defined in /// Represent a timelock, expressed in relative block height as defined in
@ -39,6 +39,10 @@ impl CancelTimelock {
pub const fn new(number_of_blocks: u32) -> Self { pub const fn new(number_of_blocks: u32) -> Self {
Self(number_of_blocks) Self(number_of_blocks)
} }
pub fn half(&self) -> CancelTimelock {
Self(self.0 / 2)
}
} }
impl Add<CancelTimelock> for BlockHeight { impl Add<CancelTimelock> for BlockHeight {

View file

@ -680,13 +680,26 @@ impl Wallet {
where where
T: Watchable, T: Watchable,
{ {
self.electrum_client.lock().await.status_of_script(tx) self.electrum_client.lock().await.status_of_script(tx, true)
} }
pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription { pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription {
let txid = tx.id(); let txid = tx.id();
let script = tx.script(); let script = tx.script();
let initial_status = match self
.electrum_client
.lock()
.await
.status_of_script(&tx, false)
{
Ok(status) => Some(status),
Err(err) => {
tracing::debug!(%txid, %err, "Failed to get initial status for subscription. We won't notify the caller and will try again later.");
None
}
};
let sub = self let sub = self
.electrum_client .electrum_client
.lock() .lock()
@ -698,12 +711,12 @@ impl Wallet {
let client = self.electrum_client.clone(); let client = self.electrum_client.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut last_status = None; let mut last_status = initial_status;
loop { loop {
let new_status = client.lock() let new_status = client.lock()
.await .await
.status_of_script(&tx) .status_of_script(&tx, false)
.unwrap_or_else(|error| { .unwrap_or_else(|error| {
tracing::warn!(%txid, "Failed to get status of script: {:#}", error); tracing::warn!(%txid, "Failed to get status of script: {:#}", error);
ScriptStatus::Retrying ScriptStatus::Retrying
@ -1511,6 +1524,18 @@ impl Client {
Ok(()) Ok(())
} }
/// Update the client state for a single script.
///
/// As opposed to [`update_state`] this function does not
/// check the time since the last update before refreshing
/// It therefore also does not take a [`force`] parameter
pub fn update_state_single(&mut self, script: &impl Watchable) -> Result<()> {
self.update_script_history(script)?;
self.update_block_height()?;
Ok(())
}
/// Update the block height. /// Update the block height.
fn update_block_height(&mut self) -> Result<()> { fn update_block_height(&mut self) -> Result<()> {
let latest_block = self let latest_block = self
@ -1535,7 +1560,7 @@ impl Client {
fn update_script_histories(&mut self) -> Result<()> { fn update_script_histories(&mut self) -> Result<()> {
let scripts = self.script_history.keys().map(|s| s.as_script()); let scripts = self.script_history.keys().map(|s| s.as_script());
let histories = self let histories: Vec<Vec<GetHistoryRes>> = self
.electrum .electrum
.inner .inner
.batch_script_get_history(scripts) .batch_script_get_history(scripts)
@ -1555,6 +1580,17 @@ impl Client {
Ok(()) Ok(())
} }
/// Update the script history of a single script.
pub fn update_script_history(&mut self, script: &impl Watchable) -> Result<()> {
let (script, _) = script.script_and_txid();
let history = self.electrum.inner.script_get_history(script.as_script())?;
self.script_history.insert(script, history);
Ok(())
}
/// Broadcast a transaction to the network. /// Broadcast a transaction to the network.
pub fn transaction_broadcast(&self, transaction: &Transaction) -> Result<Arc<Txid>> { pub fn transaction_broadcast(&self, transaction: &Transaction) -> Result<Arc<Txid>> {
// Broadcast the transaction to the network. // Broadcast the transaction to the network.
@ -1570,21 +1606,29 @@ impl Client {
} }
/// Get the status of a script. /// Get the status of a script.
pub fn status_of_script(&mut self, script: &impl Watchable) -> Result<ScriptStatus> { pub fn status_of_script(
let (script, txid) = script.script_and_txid(); &mut self,
script: &impl Watchable,
force: bool,
) -> Result<ScriptStatus> {
let (script_buf, txid) = script.script_and_txid();
if !self.script_history.contains_key(&script) { if !self.script_history.contains_key(&script_buf) {
self.script_history.insert(script.clone(), vec![]); self.script_history.insert(script_buf.clone(), vec![]);
// Immediately refetch the status of the script // Immediately refetch the status of the script
// when we first subscribe to it. // when we first subscribe to it.
self.update_state(true)?; self.update_state_single(script)?;
} else if force {
// Immediately refetch the status of the script
// when [`force`] is set to true
self.update_state_single(script)?;
} else { } else {
// Otherwise, don't force a refetch. // Otherwise, don't force a refetch.
self.update_state(false)?; self.update_state(false)?;
} }
let history = self.script_history.entry(script).or_default(); let history = self.script_history.entry(script_buf).or_default();
let history_of_tx: Vec<&GetHistoryRes> = history let history_of_tx: Vec<&GetHistoryRes> = history
.iter() .iter()

View file

@ -74,6 +74,7 @@ where
Ok(match state { Ok(match state {
AliceState::Started { state3 } => { AliceState::Started { state3 } => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match timeout( match timeout(
env_config.bitcoin_lock_mempool_timeout, env_config.bitcoin_lock_mempool_timeout,
tx_lock_status.wait_until_seen(), tx_lock_status.wait_until_seen(),
@ -249,11 +250,12 @@ where
transfer_proof, transfer_proof,
state3, state3,
} => { } => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; let tx_lock_status_subscription =
bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
select! { select! {
biased; // make sure the cancel timelock expiry future is polled first biased; // make sure the cancel timelock expiry future is polled first
result = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => { result = tx_lock_status_subscription.wait_until_confirmed_with(state3.cancel_timelock) => {
result?; result?;
AliceState::CancelTimelockExpired { AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
@ -262,6 +264,20 @@ where
} }
} }
enc_sig = event_loop_handle.recv_encrypted_signature() => { enc_sig = event_loop_handle.recv_encrypted_signature() => {
// Fetch the status as early as possible to update the internal cache of our Electurm client
// Prevents redundant network requests later on when we redeem the Bitcoin
let tx_lock_status = bitcoin_wallet.status_of_script(&state3.tx_lock.clone()).await?;
if tx_lock_status.is_confirmed_with(state3.cancel_timelock.half()) {
tx_lock_status_subscription.wait_until_confirmed_with(state3.cancel_timelock).await?;
return Ok(AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
})
}
tracing::info!("Received encrypted signature"); tracing::info!("Received encrypted signature");
AliceState::EncSigLearned { AliceState::EncSigLearned {
@ -399,8 +415,10 @@ where
transfer_proof, transfer_proof,
state3, state3,
} => { } => {
let tx_refund_status = bitcoin_wallet.subscribe_to(state3.tx_refund()).await; let (tx_refund_status, tx_cancel_status) = tokio::join!(
let tx_cancel_status = bitcoin_wallet.subscribe_to(state3.tx_cancel()).await; bitcoin_wallet.subscribe_to(state3.tx_refund()),
bitcoin_wallet.subscribe_to(state3.tx_cancel())
);
select! { select! {
seen_refund = tx_refund_status.wait_until_seen() => { seen_refund = tx_refund_status.wait_until_seen() => {