From 01739eddb1661eafda1ac3fd39ed0e8621ca2b4a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 23 Mar 2021 13:49:57 +1100 Subject: [PATCH] Introduce a more flexible transaction subscription system Instead of watching for status changes directly on bitcoin::Wallet, we return a Subscription object back to the caller. This subscription object can be re-used multiple times. Among other things, this now allows callers of `broadcast` to decide on what to wait for given the returned Subscription object. The new API is also more concise which allows us to remove some of the functions on the actor states in favor of simple inline calls. Co-authored-by: rishflab --- swap/src/bitcoin/redeem.rs | 2 +- swap/src/bitcoin/wallet.rs | 165 ++++++++++++------ swap/src/protocol/alice/state.rs | 13 -- swap/src/protocol/alice/swap.rs | 95 +++++----- swap/src/protocol/bob/state.rs | 41 +---- swap/src/protocol/bob/swap.rs | 22 ++- ...refunds_using_cancel_and_refund_command.rs | 7 +- 7 files changed, 185 insertions(+), 160 deletions(-) diff --git a/swap/src/bitcoin/redeem.rs b/swap/src/bitcoin/redeem.rs index 6d14c201..da073693 100644 --- a/swap/src/bitcoin/redeem.rs +++ b/swap/src/bitcoin/redeem.rs @@ -15,7 +15,7 @@ use miniscript::{Descriptor, DescriptorTrait}; use sha2::Sha256; use std::collections::HashMap; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TxRedeem { inner: Transaction, digest: SigHash, diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index 97ae5424..e4ba57d4 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -11,14 +11,13 @@ use bdk::keys::DerivableKey; use bdk::{FeeRate, KeychainKind}; use bitcoin::Script; use reqwest::Url; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::fmt; -use std::future::Future; use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::Mutex; +use tokio::sync::{watch, Mutex}; const SLED_TREE_NAME: &str = "default_tree"; @@ -162,14 +161,13 @@ impl Wallet { &self, transaction: Transaction, kind: &str, - ) -> Result<(Txid, impl Future> + '_)> { + ) -> Result<(Txid, Subscription)> { let txid = transaction.txid(); // to watch for confirmations, watching a single output is enough - let watcher = self.wait_for_transaction_finality( - (txid, transaction.output[0].script_pubkey.clone()), - kind.to_owned(), - ); + let subscription = self + .subscribe_to((txid, transaction.output[0].script_pubkey.clone())) + .await; self.wallet .lock() @@ -181,7 +179,7 @@ impl Wallet { tracing::info!(%txid, "Published Bitcoin {} transaction", kind); - Ok((txid, watcher)) + Ok((txid, subscription)) } pub async fn sign_and_finalize(&self, psbt: PartiallySignedTransaction) -> Result { @@ -209,63 +207,58 @@ impl Wallet { self.client.lock().await.status_of_script(tx) } - pub async fn watch_until_status( - &self, - tx: &T, - mut status_fn: impl FnMut(ScriptStatus) -> bool, - ) -> Result<()> - where - T: Watchable, - { + pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription { let txid = tx.id(); + let script = tx.script(); - let mut last_status = None; + let sub = self + .client + .lock() + .await + .subscriptions + .entry((txid, script.clone())) + .or_insert_with(|| { + let (sender, receiver) = watch::channel(ScriptStatus::Unseen); + let client = self.client.clone(); - loop { - let new_status = self.client.lock().await.status_of_script(tx)?; + tokio::spawn(async move { + let mut last_status = None; - if Some(new_status) != last_status { - tracing::debug!(%txid, "Transaction is {}", new_status); - } - last_status = Some(new_status); + loop { + tokio::time::sleep(Duration::from_secs(5)).await; - if status_fn(new_status) { - break; - } + let new_status = match client.lock().await.status_of_script(&tx) { + Ok(new_status) => new_status, + Err(e) => { + tracing::warn!(%txid, "Failed to get status of script: {:#}", e); + return; + } + }; - tokio::time::sleep(Duration::from_secs(5)).await; - } + if Some(new_status) != last_status { + tracing::debug!(%txid, "Transaction is {}", new_status); + } + last_status = Some(new_status); - Ok(()) - } + let all_receivers_gone = sender.send(new_status).is_err(); - async fn wait_for_transaction_finality(&self, tx: T, kind: String) -> Result<()> - where - T: Watchable, - { - let conf_target = self.finality_confirmations; - let txid = tx.id(); + if all_receivers_gone { + tracing::debug!(%txid, "All receivers gone, removing subscription"); + client.lock().await.subscriptions.remove(&(txid, script)); + return; + } + } + }); - tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin {} transaction", conf_target, if conf_target > 1 { "s" } else { "" }, kind); - - let mut seen_confirmations = 0; - - self.watch_until_status(&tx, |status| match status { - ScriptStatus::Confirmed(inner) => { - let confirmations = inner.confirmations(); - - if confirmations > seen_confirmations { - tracing::info!(%txid, "Bitcoin {} tx has {} out of {} confirmation{}", kind, confirmations, conf_target, if conf_target > 1 { "s" } else { "" }); - seen_confirmations = confirmations; + Subscription { + receiver, + finality_confirmations: self.finality_confirmations, + txid, } + }) + .clone(); - inner.meets_target(conf_target) - }, - _ => false - }) - .await?; - - Ok(()) + sub } /// Selects an appropriate [`FeeRate`] to be used for getting transactions @@ -276,6 +269,66 @@ impl Wallet { } } +/// Represents a subscription to the status of a given transaction. +#[derive(Debug, Clone)] +pub struct Subscription { + receiver: watch::Receiver, + finality_confirmations: u32, + txid: Txid, +} + +impl Subscription { + pub async fn wait_until_final(&self) -> Result<()> { + let conf_target = self.finality_confirmations; + let txid = self.txid; + + tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin transaction", conf_target, if conf_target > 1 { "s" } else { "" }); + + let mut seen_confirmations = 0; + + self.wait_until(|status| match status { + ScriptStatus::Confirmed(inner) => { + let confirmations = inner.confirmations(); + + if confirmations > seen_confirmations { + tracing::info!(%txid, "Bitcoin tx has {} out of {} confirmation{}", confirmations, conf_target, if conf_target > 1 { "s" } else { "" }); + seen_confirmations = confirmations; + } + + inner.meets_target(conf_target) + }, + _ => false + }) + .await + } + + pub async fn wait_until_seen(&self) -> Result<()> { + self.wait_until(ScriptStatus::has_been_seen).await + } + + pub async fn wait_until_confirmed_with(&self, target: T) -> Result<()> + where + u32: PartialOrd, + T: Copy, + { + self.wait_until(|status| status.is_confirmed_with(target)) + .await + } + + async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> { + let mut receiver = self.receiver.clone(); + + while !predicate(&receiver.borrow()) { + receiver + .changed() + .await + .context("Failed while waiting for next status update")?; + } + + Ok(()) + } +} + /// Defines a watchable transaction. /// /// For a transaction to be watchable, we need to know two things: Its @@ -303,6 +356,7 @@ struct Client { last_ping: Instant, interval: Duration, script_history: BTreeMap>, + subscriptions: HashMap<(Txid, Script), Subscription>, } impl Client { @@ -317,6 +371,7 @@ impl Client { last_ping: Instant::now(), interval, script_history: Default::default(), + subscriptions: Default::default(), }) } diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index e0ff69df..ae9e58dd 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -315,19 +315,6 @@ pub struct State3 { } impl State3 { - pub async fn wait_for_cancel_timelock_to_expire( - &self, - bitcoin_wallet: &bitcoin::Wallet, - ) -> Result<()> { - bitcoin_wallet - .watch_until_status(&self.tx_lock, |status| { - status.is_confirmed_with(self.cancel_timelock) - }) - .await?; - - Ok(()) - } - pub async fn expired_timelocks( &self, bitcoin_wallet: &bitcoin::Wallet, diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index bf84bbea..b07b92ae 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -68,18 +68,12 @@ async fn next_state( Ok(match state { AliceState::Started { state3 } => { - timeout( - env_config.bob_time_to_act, - bitcoin_wallet.watch_until_status(&state3.tx_lock, |status| status.has_been_seen()), - ) - .await - .context("Failed to find lock Bitcoin tx")??; + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + timeout(env_config.bob_time_to_act, tx_lock_status.wait_until_seen()) + .await + .context("Failed to find lock Bitcoin tx")??; - bitcoin_wallet - .watch_until_status(&state3.tx_lock, |status| { - status.is_confirmed_with(env_config.bitcoin_finality_confirmations) - }) - .await?; + tx_lock_status.wait_until_final().await?; AliceState::BtcLocked { state3 } } @@ -116,37 +110,42 @@ async fn next_state( AliceState::XmrLocked { state3, monero_wallet_restore_blockheight, - } => match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None => { - select! { - _ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { - AliceState::CancelTimelockExpired { - state3, - monero_wallet_restore_blockheight, - } - } - enc_sig = event_loop_handle.recv_encrypted_signature() => { - tracing::info!("Received encrypted signature"); + } => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; - AliceState::EncSigLearned { - state3, - encrypted_signature: Box::new(enc_sig?), - monero_wallet_restore_blockheight, + match state3.expired_timelocks(bitcoin_wallet).await? { + ExpiredTimelocks::None => { + select! { + _ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => { + AliceState::CancelTimelockExpired { + state3, + monero_wallet_restore_blockheight, + } + } + enc_sig = event_loop_handle.recv_encrypted_signature() => { + tracing::info!("Received encrypted signature"); + + AliceState::EncSigLearned { + state3, + encrypted_signature: Box::new(enc_sig?), + monero_wallet_restore_blockheight, + } } } } + _ => AliceState::CancelTimelockExpired { + state3, + monero_wallet_restore_blockheight, + }, } - _ => AliceState::CancelTimelockExpired { - state3, - monero_wallet_restore_blockheight, - }, - }, + } AliceState::EncSigLearned { state3, encrypted_signature, monero_wallet_restore_blockheight, } => match state3.expired_timelocks(bitcoin_wallet).await? { ExpiredTimelocks::None => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete( *encrypted_signature, state3.a.clone(), @@ -154,7 +153,7 @@ async fn next_state( state3.B, ) { Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await { - Ok((_, finality)) => match finality.await { + Ok((_, subscription)) => match subscription.wait_until_final().await { Ok(_) => AliceState::BtcRedeemed, Err(e) => { bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e) @@ -162,8 +161,8 @@ async fn next_state( }, Err(e) => { error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e); - state3 - .wait_for_cancel_timelock_to_expire(bitcoin_wallet) + tx_lock_status + .wait_until_confirmed_with(state3.cancel_timelock) .await?; AliceState::CancelTimelockExpired { @@ -174,8 +173,8 @@ async fn next_state( }, Err(e) => { error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e); - state3 - .wait_for_cancel_timelock_to_expire(bitcoin_wallet) + tx_lock_status + .wait_until_confirmed_with(state3.cancel_timelock) .await?; AliceState::CancelTimelockExpired { @@ -226,22 +225,15 @@ async fn next_state( state3, monero_wallet_restore_blockheight, } => { - let tx_refund = state3.tx_refund(); - let tx_cancel = state3.tx_cancel(); - - let seen_refund_tx = - bitcoin_wallet.watch_until_status(&tx_refund, |status| status.has_been_seen()); - - let punish_timelock_expired = bitcoin_wallet.watch_until_status(&tx_cancel, |status| { - status.is_confirmed_with(state3.punish_timelock) - }); + let tx_refund_status = bitcoin_wallet.subscribe_to(state3.tx_refund()).await; + let tx_cancel_status = bitcoin_wallet.subscribe_to(state3.tx_cancel()).await; select! { - seen_refund = seen_refund_tx => { + seen_refund = tx_refund_status.wait_until_seen() => { seen_refund.context("Failed to monitor refund transaction")?; - let published_refund_tx = bitcoin_wallet.get_raw_transaction(tx_refund.txid()).await?; + let published_refund_tx = bitcoin_wallet.get_raw_transaction(state3.tx_refund().txid()).await?; - let spend_key = tx_refund.extract_monero_private_key( + let spend_key = state3.tx_refund().extract_monero_private_key( published_refund_tx, state3.s_a, state3.a.clone(), @@ -254,7 +246,7 @@ async fn next_state( monero_wallet_restore_blockheight, } } - _ = punish_timelock_expired => { + _ = tx_cancel_status.wait_until_confirmed_with(state3.punish_timelock) => { AliceState::BtcPunishable { state3, monero_wallet_restore_blockheight, @@ -286,8 +278,9 @@ async fn next_state( )?; let punish = async { - let (txid, finality) = bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?; - finality.await?; + let (txid, subscription) = + bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?; + subscription.wait_until_final().await?; Result::<_, anyhow::Error>::Ok(txid) } diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 1ccfc476..dd3a3bea 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -295,11 +295,11 @@ pub struct State3 { S_a_bitcoin: bitcoin::PublicKey, v: monero::PrivateViewKey, xmr: monero::Amount, - cancel_timelock: CancelTimelock, + pub cancel_timelock: CancelTimelock, punish_timelock: PunishTimelock, refund_address: bitcoin::Address, redeem_address: bitcoin::Address, - tx_lock: bitcoin::TxLock, + pub tx_lock: bitcoin::TxLock, tx_cancel_sig_a: Signature, tx_refund_encsig: bitcoin::EncryptedSignature, min_monero_confirmations: u64, @@ -338,18 +338,6 @@ impl State3 { } } - pub async fn wait_for_cancel_timelock_to_expire( - &self, - bitcoin_wallet: &bitcoin::Wallet, - ) -> Result<()> { - bitcoin_wallet - .watch_until_status(&self.tx_lock, |status| { - status.is_confirmed_with(self.cancel_timelock) - }) - .await?; - Ok(()) - } - pub fn cancel(&self) -> State6 { State6 { A: self.A, @@ -393,11 +381,11 @@ pub struct State4 { s_b: monero::Scalar, S_a_bitcoin: bitcoin::PublicKey, v: monero::PrivateViewKey, - cancel_timelock: CancelTimelock, + pub cancel_timelock: CancelTimelock, punish_timelock: PunishTimelock, refund_address: bitcoin::Address, redeem_address: bitcoin::Address, - tx_lock: bitcoin::TxLock, + pub tx_lock: bitcoin::TxLock, tx_cancel_sig_a: Signature, tx_refund_encsig: bitcoin::EncryptedSignature, monero_wallet_restore_blockheight: BlockHeight, @@ -414,7 +402,9 @@ impl State4 { let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()); bitcoin_wallet - .watch_until_status(&tx_redeem, |status| status.has_been_seen()) + .subscribe_to(tx_redeem.clone()) + .await + .wait_until_seen() .await?; let tx_redeem_candidate = bitcoin_wallet.get_raw_transaction(tx_redeem.txid()).await?; @@ -433,19 +423,6 @@ impl State4 { }) } - pub async fn wait_for_cancel_timelock_to_expire( - &self, - bitcoin_wallet: &bitcoin::Wallet, - ) -> Result<()> { - bitcoin_wallet - .watch_until_status(&self.tx_lock, |status| { - status.is_confirmed_with(self.cancel_timelock) - }) - .await?; - - Ok(()) - } - pub async fn expired_timelock( &self, bitcoin_wallet: &bitcoin::Wallet, @@ -569,9 +546,9 @@ impl State6 { let signed_tx_refund = tx_refund.add_signatures((self.A, sig_a), (self.b.public(), sig_b))?; - let (_, finality) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?; + let (_, subscription) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?; - finality.await?; + subscription.wait_until_final().await?; Ok(()) } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index ac0ef989..2df76c08 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -8,7 +8,7 @@ use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; use rand::rngs::OsRng; use tokio::select; -use tracing::trace; +use tracing::{info, trace}; pub fn is_complete(state: &BobState) -> bool { matches!( @@ -89,10 +89,12 @@ async fn next_state( // Bob has locked Btc // Watch for Alice to Lock Xmr or for cancel timelock to elapse BobState::BtcLocked(state3) => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? { let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = - state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet); + tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock); // Record the current monero wallet block height so we don't have to scan from // block 0 once we create the redeem wallet. @@ -129,6 +131,8 @@ async fn next_state( lock_transfer_proof, monero_wallet_restore_blockheight, } => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; + if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? { let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); @@ -138,13 +142,13 @@ async fn next_state( Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)), Err(e) => { tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e); - state.wait_for_cancel_timelock_to_expire(bitcoin_wallet).await?; + tx_lock_status.wait_until_confirmed_with(state.cancel_timelock).await?; BobState::CancelTimelockExpired(state.cancel()) }, } } - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { + _ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -153,6 +157,10 @@ async fn next_state( } } BobState::XmrLocked(state) => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; + + info!("{:?}", tx_lock_status); + if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { // Alice has locked Xmr // Bob sends Alice his key @@ -161,7 +169,7 @@ async fn next_state( _ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => { BobState::EncSigSent(state) }, - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { + _ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -170,12 +178,14 @@ async fn next_state( } } BobState::EncSigSent(state) => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; + if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { select! { state5 = state.watch_for_redeem_btc(bitcoin_wallet) => { BobState::BtcRedeemed(state5?) }, - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { + _ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => { BobState::CancelTimelockExpired(state.cancel()) } } diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs index dc2ea482..e04eab58 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs @@ -21,8 +21,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { // Ensure Bob's timelock is expired if let BobState::BtcLocked(state3) = bob_swap.state.clone() { - state3 - .wait_for_cancel_timelock_to_expire(bob_swap.bitcoin_wallet.as_ref()) + bob_swap + .bitcoin_wallet + .subscribe_to(state3.tx_lock) + .await + .wait_until_confirmed_with(state3.cancel_timelock) .await?; } else { panic!("Bob in unexpected state {}", bob_swap.state);