361: Introduce a more flexible transaction subscription system  r=rishflab a=thomaseizinger

TODO:

- [x] Make sure we unsubscribe once all receivers are gone. How do we handle repeated subscriptions?

Will squash the last 4 or 5 commits once approved

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
bors[bot] 2021-03-30 00:00:41 +00:00 committed by GitHub
commit bf771b0211
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 185 additions and 160 deletions

View File

@ -15,7 +15,7 @@ use miniscript::{Descriptor, DescriptorTrait};
use sha2::Sha256;
use std::collections::HashMap;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TxRedeem {
inner: Transaction,
digest: SigHash,

View File

@ -11,14 +11,13 @@ use bdk::keys::DerivableKey;
use bdk::{FeeRate, KeychainKind};
use bitcoin::Script;
use reqwest::Url;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::fmt;
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::sync::{watch, Mutex};
const SLED_TREE_NAME: &str = "default_tree";
@ -162,14 +161,13 @@ impl Wallet {
&self,
transaction: Transaction,
kind: &str,
) -> Result<(Txid, impl Future<Output = Result<()>> + '_)> {
) -> Result<(Txid, Subscription)> {
let txid = transaction.txid();
// to watch for confirmations, watching a single output is enough
let watcher = self.wait_for_transaction_finality(
(txid, transaction.output[0].script_pubkey.clone()),
kind.to_owned(),
);
let subscription = self
.subscribe_to((txid, transaction.output[0].script_pubkey.clone()))
.await;
self.wallet
.lock()
@ -181,7 +179,7 @@ impl Wallet {
tracing::info!(%txid, "Published Bitcoin {} transaction", kind);
Ok((txid, watcher))
Ok((txid, subscription))
}
pub async fn sign_and_finalize(&self, psbt: PartiallySignedTransaction) -> Result<Transaction> {
@ -209,63 +207,58 @@ impl Wallet {
self.client.lock().await.status_of_script(tx)
}
pub async fn watch_until_status<T>(
&self,
tx: &T,
mut status_fn: impl FnMut(ScriptStatus) -> bool,
) -> Result<()>
where
T: Watchable,
{
pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription {
let txid = tx.id();
let script = tx.script();
let mut last_status = None;
let sub = self
.client
.lock()
.await
.subscriptions
.entry((txid, script.clone()))
.or_insert_with(|| {
let (sender, receiver) = watch::channel(ScriptStatus::Unseen);
let client = self.client.clone();
loop {
let new_status = self.client.lock().await.status_of_script(tx)?;
tokio::spawn(async move {
let mut last_status = None;
if Some(new_status) != last_status {
tracing::debug!(%txid, "Transaction is {}", new_status);
}
last_status = Some(new_status);
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
if status_fn(new_status) {
break;
}
let new_status = match client.lock().await.status_of_script(&tx) {
Ok(new_status) => new_status,
Err(e) => {
tracing::warn!(%txid, "Failed to get status of script: {:#}", e);
return;
}
};
tokio::time::sleep(Duration::from_secs(5)).await;
}
if Some(new_status) != last_status {
tracing::debug!(%txid, "Transaction is {}", new_status);
}
last_status = Some(new_status);
Ok(())
}
let all_receivers_gone = sender.send(new_status).is_err();
async fn wait_for_transaction_finality<T>(&self, tx: T, kind: String) -> Result<()>
where
T: Watchable,
{
let conf_target = self.finality_confirmations;
let txid = tx.id();
if all_receivers_gone {
tracing::debug!(%txid, "All receivers gone, removing subscription");
client.lock().await.subscriptions.remove(&(txid, script));
return;
}
}
});
tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin {} transaction", conf_target, if conf_target > 1 { "s" } else { "" }, kind);
let mut seen_confirmations = 0;
self.watch_until_status(&tx, |status| match status {
ScriptStatus::Confirmed(inner) => {
let confirmations = inner.confirmations();
if confirmations > seen_confirmations {
tracing::info!(%txid, "Bitcoin {} tx has {} out of {} confirmation{}", kind, confirmations, conf_target, if conf_target > 1 { "s" } else { "" });
seen_confirmations = confirmations;
Subscription {
receiver,
finality_confirmations: self.finality_confirmations,
txid,
}
})
.clone();
inner.meets_target(conf_target)
},
_ => false
})
.await?;
Ok(())
sub
}
/// Selects an appropriate [`FeeRate`] to be used for getting transactions
@ -276,6 +269,66 @@ impl Wallet {
}
}
/// Represents a subscription to the status of a given transaction.
#[derive(Debug, Clone)]
pub struct Subscription {
receiver: watch::Receiver<ScriptStatus>,
finality_confirmations: u32,
txid: Txid,
}
impl Subscription {
pub async fn wait_until_final(&self) -> Result<()> {
let conf_target = self.finality_confirmations;
let txid = self.txid;
tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin transaction", conf_target, if conf_target > 1 { "s" } else { "" });
let mut seen_confirmations = 0;
self.wait_until(|status| match status {
ScriptStatus::Confirmed(inner) => {
let confirmations = inner.confirmations();
if confirmations > seen_confirmations {
tracing::info!(%txid, "Bitcoin tx has {} out of {} confirmation{}", confirmations, conf_target, if conf_target > 1 { "s" } else { "" });
seen_confirmations = confirmations;
}
inner.meets_target(conf_target)
},
_ => false
})
.await
}
pub async fn wait_until_seen(&self) -> Result<()> {
self.wait_until(ScriptStatus::has_been_seen).await
}
pub async fn wait_until_confirmed_with<T>(&self, target: T) -> Result<()>
where
u32: PartialOrd<T>,
T: Copy,
{
self.wait_until(|status| status.is_confirmed_with(target))
.await
}
async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> {
let mut receiver = self.receiver.clone();
while !predicate(&receiver.borrow()) {
receiver
.changed()
.await
.context("Failed while waiting for next status update")?;
}
Ok(())
}
}
/// Defines a watchable transaction.
///
/// For a transaction to be watchable, we need to know two things: Its
@ -303,6 +356,7 @@ struct Client {
last_ping: Instant,
interval: Duration,
script_history: BTreeMap<Script, Vec<GetHistoryRes>>,
subscriptions: HashMap<(Txid, Script), Subscription>,
}
impl Client {
@ -317,6 +371,7 @@ impl Client {
last_ping: Instant::now(),
interval,
script_history: Default::default(),
subscriptions: Default::default(),
})
}

View File

@ -315,19 +315,6 @@ pub struct State3 {
}
impl State3 {
pub async fn wait_for_cancel_timelock_to_expire(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<()> {
bitcoin_wallet
.watch_until_status(&self.tx_lock, |status| {
status.is_confirmed_with(self.cancel_timelock)
})
.await?;
Ok(())
}
pub async fn expired_timelocks(
&self,
bitcoin_wallet: &bitcoin::Wallet,

View File

@ -68,18 +68,12 @@ async fn next_state(
Ok(match state {
AliceState::Started { state3 } => {
timeout(
env_config.bob_time_to_act,
bitcoin_wallet.watch_until_status(&state3.tx_lock, |status| status.has_been_seen()),
)
.await
.context("Failed to find lock Bitcoin tx")??;
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
timeout(env_config.bob_time_to_act, tx_lock_status.wait_until_seen())
.await
.context("Failed to find lock Bitcoin tx")??;
bitcoin_wallet
.watch_until_status(&state3.tx_lock, |status| {
status.is_confirmed_with(env_config.bitcoin_finality_confirmations)
})
.await?;
tx_lock_status.wait_until_final().await?;
AliceState::BtcLocked { state3 }
}
@ -116,37 +110,42 @@ async fn next_state(
AliceState::XmrLocked {
state3,
monero_wallet_restore_blockheight,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
select! {
_ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
}
}
enc_sig = event_loop_handle.recv_encrypted_signature() => {
tracing::info!("Received encrypted signature");
} => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
AliceState::EncSigLearned {
state3,
encrypted_signature: Box::new(enc_sig?),
monero_wallet_restore_blockheight,
match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
select! {
_ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => {
AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
}
}
enc_sig = event_loop_handle.recv_encrypted_signature() => {
tracing::info!("Received encrypted signature");
AliceState::EncSigLearned {
state3,
encrypted_signature: Box::new(enc_sig?),
monero_wallet_restore_blockheight,
}
}
}
}
_ => AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
},
}
_ => AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
},
},
}
AliceState::EncSigLearned {
state3,
encrypted_signature,
monero_wallet_restore_blockheight,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete(
*encrypted_signature,
state3.a.clone(),
@ -154,7 +153,7 @@ async fn next_state(
state3.B,
) {
Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await {
Ok((_, finality)) => match finality.await {
Ok((_, subscription)) => match subscription.wait_until_final().await {
Ok(_) => AliceState::BtcRedeemed,
Err(e) => {
bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e)
@ -162,8 +161,8 @@ async fn next_state(
},
Err(e) => {
error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e);
state3
.wait_for_cancel_timelock_to_expire(bitcoin_wallet)
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
AliceState::CancelTimelockExpired {
@ -174,8 +173,8 @@ async fn next_state(
},
Err(e) => {
error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e);
state3
.wait_for_cancel_timelock_to_expire(bitcoin_wallet)
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
AliceState::CancelTimelockExpired {
@ -226,22 +225,15 @@ async fn next_state(
state3,
monero_wallet_restore_blockheight,
} => {
let tx_refund = state3.tx_refund();
let tx_cancel = state3.tx_cancel();
let seen_refund_tx =
bitcoin_wallet.watch_until_status(&tx_refund, |status| status.has_been_seen());
let punish_timelock_expired = bitcoin_wallet.watch_until_status(&tx_cancel, |status| {
status.is_confirmed_with(state3.punish_timelock)
});
let tx_refund_status = bitcoin_wallet.subscribe_to(state3.tx_refund()).await;
let tx_cancel_status = bitcoin_wallet.subscribe_to(state3.tx_cancel()).await;
select! {
seen_refund = seen_refund_tx => {
seen_refund = tx_refund_status.wait_until_seen() => {
seen_refund.context("Failed to monitor refund transaction")?;
let published_refund_tx = bitcoin_wallet.get_raw_transaction(tx_refund.txid()).await?;
let published_refund_tx = bitcoin_wallet.get_raw_transaction(state3.tx_refund().txid()).await?;
let spend_key = tx_refund.extract_monero_private_key(
let spend_key = state3.tx_refund().extract_monero_private_key(
published_refund_tx,
state3.s_a,
state3.a.clone(),
@ -254,7 +246,7 @@ async fn next_state(
monero_wallet_restore_blockheight,
}
}
_ = punish_timelock_expired => {
_ = tx_cancel_status.wait_until_confirmed_with(state3.punish_timelock) => {
AliceState::BtcPunishable {
state3,
monero_wallet_restore_blockheight,
@ -286,8 +278,9 @@ async fn next_state(
)?;
let punish = async {
let (txid, finality) = bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?;
finality.await?;
let (txid, subscription) =
bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?;
subscription.wait_until_final().await?;
Result::<_, anyhow::Error>::Ok(txid)
}

View File

@ -295,11 +295,11 @@ pub struct State3 {
S_a_bitcoin: bitcoin::PublicKey,
v: monero::PrivateViewKey,
xmr: monero::Amount,
cancel_timelock: CancelTimelock,
pub cancel_timelock: CancelTimelock,
punish_timelock: PunishTimelock,
refund_address: bitcoin::Address,
redeem_address: bitcoin::Address,
tx_lock: bitcoin::TxLock,
pub tx_lock: bitcoin::TxLock,
tx_cancel_sig_a: Signature,
tx_refund_encsig: bitcoin::EncryptedSignature,
min_monero_confirmations: u64,
@ -338,18 +338,6 @@ impl State3 {
}
}
pub async fn wait_for_cancel_timelock_to_expire(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<()> {
bitcoin_wallet
.watch_until_status(&self.tx_lock, |status| {
status.is_confirmed_with(self.cancel_timelock)
})
.await?;
Ok(())
}
pub fn cancel(&self) -> State6 {
State6 {
A: self.A,
@ -393,11 +381,11 @@ pub struct State4 {
s_b: monero::Scalar,
S_a_bitcoin: bitcoin::PublicKey,
v: monero::PrivateViewKey,
cancel_timelock: CancelTimelock,
pub cancel_timelock: CancelTimelock,
punish_timelock: PunishTimelock,
refund_address: bitcoin::Address,
redeem_address: bitcoin::Address,
tx_lock: bitcoin::TxLock,
pub tx_lock: bitcoin::TxLock,
tx_cancel_sig_a: Signature,
tx_refund_encsig: bitcoin::EncryptedSignature,
monero_wallet_restore_blockheight: BlockHeight,
@ -414,7 +402,9 @@ impl State4 {
let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest());
bitcoin_wallet
.watch_until_status(&tx_redeem, |status| status.has_been_seen())
.subscribe_to(tx_redeem.clone())
.await
.wait_until_seen()
.await?;
let tx_redeem_candidate = bitcoin_wallet.get_raw_transaction(tx_redeem.txid()).await?;
@ -433,19 +423,6 @@ impl State4 {
})
}
pub async fn wait_for_cancel_timelock_to_expire(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<()> {
bitcoin_wallet
.watch_until_status(&self.tx_lock, |status| {
status.is_confirmed_with(self.cancel_timelock)
})
.await?;
Ok(())
}
pub async fn expired_timelock(
&self,
bitcoin_wallet: &bitcoin::Wallet,
@ -569,9 +546,9 @@ impl State6 {
let signed_tx_refund =
tx_refund.add_signatures((self.A, sig_a), (self.b.public(), sig_b))?;
let (_, finality) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?;
let (_, subscription) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?;
finality.await?;
subscription.wait_until_final().await?;
Ok(())
}

View File

@ -8,7 +8,7 @@ use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result};
use rand::rngs::OsRng;
use tokio::select;
use tracing::trace;
use tracing::{info, trace};
pub fn is_complete(state: &BobState) -> bool {
matches!(
@ -89,10 +89,12 @@ async fn next_state(
// Bob has locked Btc
// Watch for Alice to Lock Xmr or for cancel timelock to elapse
BobState::BtcLocked(state3) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? {
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet);
tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock);
// Record the current monero wallet block height so we don't have to scan from
// block 0 once we create the redeem wallet.
@ -129,6 +131,8 @@ async fn next_state(
lock_transfer_proof,
monero_wallet_restore_blockheight,
} => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? {
let watch_request = state.lock_xmr_watch_request(lock_transfer_proof);
@ -138,13 +142,13 @@ async fn next_state(
Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)),
Err(e) => {
tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e);
state.wait_for_cancel_timelock_to_expire(bitcoin_wallet).await?;
tx_lock_status.wait_until_confirmed_with(state.cancel_timelock).await?;
BobState::CancelTimelockExpired(state.cancel())
},
}
}
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
_ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => {
BobState::CancelTimelockExpired(state.cancel())
}
}
@ -153,6 +157,10 @@ async fn next_state(
}
}
BobState::XmrLocked(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
info!("{:?}", tx_lock_status);
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
// Alice has locked Xmr
// Bob sends Alice his key
@ -161,7 +169,7 @@ async fn next_state(
_ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => {
BobState::EncSigSent(state)
},
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
_ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => {
BobState::CancelTimelockExpired(state.cancel())
}
}
@ -170,12 +178,14 @@ async fn next_state(
}
}
BobState::EncSigSent(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
select! {
state5 = state.watch_for_redeem_btc(bitcoin_wallet) => {
BobState::BtcRedeemed(state5?)
},
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
_ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => {
BobState::CancelTimelockExpired(state.cancel())
}
}

View File

@ -21,8 +21,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
// Ensure Bob's timelock is expired
if let BobState::BtcLocked(state3) = bob_swap.state.clone() {
state3
.wait_for_cancel_timelock_to_expire(bob_swap.bitcoin_wallet.as_ref())
bob_swap
.bitcoin_wallet
.subscribe_to(state3.tx_lock)
.await
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
} else {
panic!("Bob in unexpected state {}", bob_swap.state);