feat(asb): Retry publishing Bitcoin redeem transaction (#221)

This commit is contained in:
binarybaron 2024-12-04 15:24:10 +01:00 committed by GitHub
parent 13c7bf8a04
commit c5894eacdc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 159 additions and 94 deletions

View file

@ -717,28 +717,40 @@ impl EventLoopHandle {
let transfer_proof = self.build_transfer_proof_request(msg);
backoff::future::retry(backoff, || async {
// Create a oneshot channel to receive the acknowledgment of the transfer proof
let (singular_sender, singular_receiver) = oneshot::channel();
backoff::future::retry_notify(
backoff,
|| async {
// Create a oneshot channel to receive the acknowledgment of the transfer proof
let (singular_sender, singular_receiver) = oneshot::channel();
if let Err(err) = sender.send((self.peer, transfer_proof.clone(), singular_sender)) {
let err = anyhow!(err).context("Failed to communicate transfer proof through event loop channel");
tracing::error!(%err, swap_id = %self.swap_id, "Failed to send transfer proof");
return Err(backoff::Error::permanent(err));
}
if let Err(err) = sender.send((self.peer, transfer_proof.clone(), singular_sender))
{
return Err(backoff::Error::permanent(anyhow!(err).context(
"Failed to communicate transfer proof through event loop channel",
)));
}
match singular_receiver.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to send transfer proof due to a network error. We will retry");
Err(backoff::Error::transient(anyhow!(err)))
match singular_receiver.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(backoff::Error::transient(
anyhow!(err)
.context("A network error occurred while sending the transfer proof"),
)),
Err(_) => Err(backoff::Error::permanent(anyhow!(
"The sender channel should never be closed without sending a response"
))),
}
Err(_) => {
Err(backoff::Error::permanent(anyhow!("The sender channel should never be closed without sending a response")))
}
}
})
.await?;
},
|e, wait_time: Duration| {
tracing::warn!(
swap_id = %self.swap_id,
error = ?e,
"Failed to send transfer proof. We will retry in {} seconds",
wait_time.as_secs()
)
},
)
.await?;
self.transfer_proof_sender.take();

View file

@ -406,13 +406,12 @@ impl EventLoopHandle {
let backoff = Self::create_retry_config(EXECUTION_SETUP_PROTOCOL_TIMEOUT);
backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.execution_setup_sender.send_receive(swap.clone()).await {
Ok(Ok(state2)) => Ok(state2),
// These are errors thrown by the swap_setup/bob behaviour
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to setup swap. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(err.context("A network error occurred while setting up the swap")))
}
// This will happen if we don't establish a connection to Alice within the timeout of the MPSC channel
// The protocol does not dial Alice it self
@ -424,6 +423,12 @@ impl EventLoopHandle {
unreachable!("We never drop the receiver of the execution setup channel, so this should never happen")
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to setup swap. We will retry in {} seconds",
wait_time.as_secs()
)
})
.await
.context("Failed to setup swap after retries")
@ -448,17 +453,22 @@ impl EventLoopHandle {
let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT);
backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.quote_sender.send_receive(()).await {
Ok(Ok(quote)) => Ok(quote),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to request quote due to network error. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting a quote")))
}
Err(_) => {
unreachable!("We initiate the quote channel without a timeout and store both the sender and receiver in the same struct, so this should never happen");
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to request quote. We will retry in {} seconds",
wait_time.as_secs()
)
})
.await
.context("Failed to request quote after retries")
@ -469,17 +479,22 @@ impl EventLoopHandle {
let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT);
backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.cooperative_xmr_redeem_sender.send_receive(()).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to request cooperative XMR redeem due to network error. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting cooperative XMR redeem")))
}
Err(_) => {
unreachable!("We initiate the cooperative xmr redeem channel without a timeout and store both the sender and receiver in the same struct, so this should never happen");
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to request cooperative XMR redeem. We will retry in {} seconds",
wait_time.as_secs()
)
})
.await
.context("Failed to request cooperative XMR redeem after retries")
@ -497,17 +512,22 @@ impl EventLoopHandle {
.with_max_interval(REQUEST_RESPONSE_PROTOCOL_TIMEOUT)
.build();
backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.encrypted_signature_sender.send_receive(tx_redeem_encsig.clone()).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to send encrypted signature due to a network error. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while sending the encrypted signature")))
}
Err(_) => {
unreachable!("We initiate the encrypted signature channel without a timeout and store both the sender and receiver in the same struct, so this should never happen");
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to send encrypted signature. We will retry in {} seconds",
wait_time.as_secs()
)
})
.await
.context("Failed to send encrypted signature after retries")

View file

@ -7,6 +7,7 @@ use crate::bitcoin::ExpiredTimelocks;
use crate::env::Config;
use crate::protocol::alice::{AliceState, Swap};
use crate::{bitcoin, monero};
use ::bitcoin::consensus::encode::serialize_hex;
use anyhow::{bail, Context, Result};
use tokio::select;
use tokio::time::timeout;
@ -120,7 +121,7 @@ where
.with_max_interval(Duration::from_secs(60))
.build();
let transfer_proof = backoff::future::retry(backoff, || async {
let transfer_proof = backoff::future::retry_notify(backoff, || async {
// We check the status of the Bitcoin lock transaction
// If the swap is cancelled, there is no need to lock the Monero funds anymore
// because there is no way for the swap to succeed.
@ -136,13 +137,7 @@ where
let monero_wallet_restore_blockheight = monero_wallet
.block_height()
.await
.inspect_err(|e| {
tracing::warn!(
swap_id = %swap_id,
error = ?e,
"Failed to get Monero wallet block height while trying to lock XMR. We will retry."
)
})
.context("Failed to get Monero wallet block height")
.map_err(backoff::Error::transient)?;
// Lock the Monero
@ -150,16 +145,18 @@ where
.transfer(state3.lock_xmr_transfer_request())
.await
.map(|proof| Some((monero_wallet_restore_blockheight, proof)))
.inspect_err(|e| {
tracing::warn!(
swap_id = %swap_id,
error = ?e,
"Failed to lock Monero. Make sure your monero-wallet-rpc is connected to a synced daemon and enough funds are available. We will retry."
)
})
.context("Failed to transfer Monero. Make sure your monero-wallet-rpc is connected to a synced daemon and enough funds are available.")
.map_err(backoff::Error::transient)
}, |e, wait_time: Duration| {
tracing::warn!(
swap_id = %swap_id,
error = ?e,
"Failed to lock Monero. We will retry in {} seconds",
wait_time.as_secs()
)
})
.await?;
.await
.expect("We should never run out of retries while locking Monero");
match transfer_proof {
// If the transfer was successful, we transition to the next state
@ -229,7 +226,7 @@ where
// TODO: We should already listen for the encrypted signature here.
//
// If we send Bob the transfer proof, but for whatever reason we do not receive an acknoledgement from him
// we would be stuck in this state forever (deadlock). By listening for the encrypted signature here we
// we would be stuck in this state forever until the timelock expires. By listening for the encrypted signature here we
// can still proceed to the next state even if Bob does not respond with an acknoledgement.
result = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => {
result?;
@ -275,56 +272,88 @@ where
transfer_proof,
encrypted_signature,
state3,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None { .. } => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match state3.signed_redeem_transaction(*encrypted_signature) {
// TODO: We should retry publishing the redeem transaction if it fails
Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await {
Ok((_, subscription)) => match subscription.wait_until_seen().await {
Ok(_) => AliceState::BtcRedeemTransactionPublished { state3 },
Err(e) => {
bail!("Waiting for Bitcoin redeem transaction to be in mempool failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e)
}
},
Err(error) => {
tracing::error!("Failed to publish redeem transaction: {:#}", error);
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
} => {
// Try to sign the redeem transaction, otherwise wait for the cancel timelock to expire
let tx_redeem = match state3.signed_redeem_transaction(*encrypted_signature) {
Ok(tx_redeem) => tx_redeem,
Err(error) => {
tracing::error!("Failed to construct redeem transaction: {:#}", error);
tracing::info!(
timelock = %state3.cancel_timelock,
"Waiting for cancellation timelock to expire",
);
AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
}
}
},
Err(error) => {
tracing::error!("Failed to construct redeem transaction: {:#}", error);
tracing::info!(
timelock = %state3.cancel_timelock,
"Waiting for cancellation timelock to expire",
);
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
}
return Ok(AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
});
}
};
// Retry indefinitely to publish the redeem transaction, until the cancel timelock expires
// Publishing the redeem transaction might fail on the first try due to any number of reasons
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(None)
.with_max_interval(Duration::from_secs(60))
.build();
match backoff::future::retry_notify(backoff.clone(), || async {
// If the cancel timelock is expired, there is no need to try to publish the redeem transaction anymore
if !matches!(
state3.expired_timelocks(bitcoin_wallet).await?,
ExpiredTimelocks::None { .. }
) {
return Ok(None);
}
bitcoin_wallet
.broadcast(tx_redeem.clone(), "redeem")
.await
.map(Some)
.map_err(backoff::Error::transient)
}, |e, wait_time: Duration| {
tracing::warn!(
swap_id = %swap_id,
error = ?e,
"Failed to broadcast Bitcoin redeem transaction. We will retry in {} seconds",
wait_time.as_secs()
)
})
.await
.expect("We should never run out of retries while publishing the Bitcoin redeem transaction")
{
// We successfully published the redeem transaction
// We wait until we see the transaction in the mempool before transitioning to the next state
Some((txid, subscription)) => match subscription.wait_until_seen().await {
Ok(_) => AliceState::BtcRedeemTransactionPublished { state3 },
Err(e) => {
// We extract the txid and the hex representation of the transaction
// this'll allow the user to manually re-publish the transaction
let tx_hex = serialize_hex(&tx_redeem);
bail!("Waiting for Bitcoin redeem transaction to be in mempool failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You might be screwed. You can try to manually re-publish the transaction (TxID: {}, Tx Hex: {})", e, txid, tx_hex)
}
},
// Cancel timelock expired before we could publish the redeem transaction
None => {
tracing::error!("We were unable to publish the redeem transaction before the timelock expired.");
AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
}
}
}
_ => AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
},
},
}
AliceState::BtcRedeemTransactionPublished { state3 } => {
let subscription = bitcoin_wallet.subscribe_to(state3.tx_redeem()).await;
@ -347,7 +376,7 @@ where
// gets published once the cancel timelock expires.
if let Err(e) = state3.submit_tx_cancel(bitcoin_wallet).await {
tracing::debug!(
"Assuming cancel transaction is already broadcasted because: {:#}",
"Assuming cancel transaction is already broadcasted because we failed to publish: {:#}",
e
)
}
@ -415,6 +444,8 @@ where
transfer_proof,
state3,
} => {
// TODO: We should retry indefinitely here until we find the refund transaction
// TODO: If we crash while we are waiting for the punish_tx to be confirmed (punish_btc waits until confirmation), we will remain in this state forever because we will attempt to re-publish the punish transaction
let punish = state3.punish_btc(bitcoin_wallet).await;
match punish {
@ -433,7 +464,8 @@ where
let published_refund_tx = bitcoin_wallet
.get_raw_transaction(state3.tx_refund().txid())
.await?;
.await
.context("Failed to fetch refund transaction after assuming it was included because the punish transaction failed")?;
let spend_key = state3.extract_monero_private_key(published_refund_tx)?;