Allow blockchain calls to fail

Prior to this change, functions could not fail early on permanent errors eg. parsing a url. Merged error enums.
This commit is contained in:
rishflab 2021-02-15 16:13:29 +11:00
parent a0ef1f96ec
commit 180e778df9
6 changed files with 43 additions and 53 deletions

View File

@ -211,7 +211,7 @@ pub trait BroadcastSignedTransaction {
#[async_trait] #[async_trait]
pub trait WatchForRawTransaction { pub trait WatchForRawTransaction {
async fn watch_for_raw_transaction(&self, txid: Txid) -> Transaction; async fn watch_for_raw_transaction(&self, txid: Txid) -> Result<Transaction>;
} }
#[async_trait] #[async_trait]
@ -225,12 +225,12 @@ pub trait WaitForTransactionFinality {
#[async_trait] #[async_trait]
pub trait GetBlockHeight { pub trait GetBlockHeight {
async fn get_block_height(&self) -> BlockHeight; async fn get_block_height(&self) -> Result<BlockHeight>;
} }
#[async_trait] #[async_trait]
pub trait TransactionBlockHeight { pub trait TransactionBlockHeight {
async fn transaction_block_height(&self, txid: Txid) -> BlockHeight; async fn transaction_block_height(&self, txid: Txid) -> Result<BlockHeight>;
} }
#[async_trait] #[async_trait]
@ -259,13 +259,14 @@ pub fn recover(S: PublicKey, sig: Signature, encsig: EncryptedSignature) -> Resu
Ok(s) Ok(s)
} }
pub async fn poll_until_block_height_is_gte<B>(client: &B, target: BlockHeight) pub async fn poll_until_block_height_is_gte<B>(client: &B, target: BlockHeight) -> Result<()>
where where
B: GetBlockHeight, B: GetBlockHeight,
{ {
while client.get_block_height().await < target { while client.get_block_height().await? < target {
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
Ok(())
} }
pub async fn current_epoch<W>( pub async fn current_epoch<W>(
@ -277,8 +278,8 @@ pub async fn current_epoch<W>(
where where
W: WatchForRawTransaction + TransactionBlockHeight + GetBlockHeight, W: WatchForRawTransaction + TransactionBlockHeight + GetBlockHeight,
{ {
let current_block_height = bitcoin_wallet.get_block_height().await; let current_block_height = bitcoin_wallet.get_block_height().await?;
let lock_tx_height = bitcoin_wallet.transaction_block_height(lock_tx_id).await; let lock_tx_height = bitcoin_wallet.transaction_block_height(lock_tx_id).await?;
let cancel_timelock_height = lock_tx_height + cancel_timelock; let cancel_timelock_height = lock_tx_height + cancel_timelock;
let punish_timelock_height = cancel_timelock_height + punish_timelock; let punish_timelock_height = cancel_timelock_height + punish_timelock;
@ -300,9 +301,9 @@ pub async fn wait_for_cancel_timelock_to_expire<W>(
where where
W: WatchForRawTransaction + TransactionBlockHeight + GetBlockHeight, W: WatchForRawTransaction + TransactionBlockHeight + GetBlockHeight,
{ {
let tx_lock_height = bitcoin_wallet.transaction_block_height(lock_tx_id).await; let tx_lock_height = bitcoin_wallet.transaction_block_height(lock_tx_id).await?;
poll_until_block_height_is_gte(bitcoin_wallet, tx_lock_height + cancel_timelock).await; poll_until_block_height_is_gte(bitcoin_wallet, tx_lock_height + cancel_timelock).await?;
Ok(()) Ok(())
} }

View File

@ -23,6 +23,14 @@ use tokio::{sync::Mutex, time::interval};
const SLED_TREE_NAME: &str = "default_tree"; const SLED_TREE_NAME: &str = "default_tree";
#[derive(Debug)]
enum Error {
Io(reqwest::Error),
Parse(std::num::ParseIntError),
NotYetMined,
JsonDeserialisation(reqwest::Error),
}
pub struct Wallet { pub struct Wallet {
pub inner: Arc<Mutex<bdk::Wallet<ElectrumBlockchain, bdk::sled::Tree>>>, pub inner: Arc<Mutex<bdk::Wallet<ElectrumBlockchain, bdk::sled::Tree>>>,
pub network: bitcoin::Network, pub network: bitcoin::Network,
@ -150,7 +158,7 @@ impl BroadcastSignedTransaction for Wallet {
#[async_trait] #[async_trait]
impl WatchForRawTransaction for Wallet { impl WatchForRawTransaction for Wallet {
async fn watch_for_raw_transaction(&self, txid: Txid) -> Transaction { async fn watch_for_raw_transaction(&self, txid: Txid) -> Result<Transaction> {
tracing::debug!("watching for tx: {}", txid); tracing::debug!("watching for tx: {}", txid);
retry(ConstantBackoff::new(Duration::from_secs(1)), || async { retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
let client = Client::new(self.rpc_url.as_ref())?; let client = Client::new(self.rpc_url.as_ref())?;
@ -159,7 +167,7 @@ impl WatchForRawTransaction for Wallet {
Ok(tx) Ok(tx)
}) })
.await .await
.expect("transient errors to be retried") .map_err(|err| anyhow!("transient errors to be retried: {:?}", err))
} }
} }
@ -174,19 +182,11 @@ impl GetRawTransaction for Wallet {
#[async_trait] #[async_trait]
impl GetBlockHeight for Wallet { impl GetBlockHeight for Wallet {
async fn get_block_height(&self) -> BlockHeight { async fn get_block_height(&self) -> Result<BlockHeight> {
// todo: create this url using the join() api in the Url type let url = self.http_url.join("blocks/tip/height")?;
let url = format!("{}{}", self.http_url.as_str(), "blocks/tip/height");
#[derive(Debug)]
enum Error {
Io(reqwest::Error),
Parse(std::num::ParseIntError),
}
let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async { let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
// todo: We may want to return early if we cannot connect to the electrum node
// rather than retrying
let height = reqwest::Client::new() let height = reqwest::Client::new()
.request(Method::GET, &url) .request(Method::GET, url.clone())
.send() .send()
.await .await
.map_err(Error::Io)? .map_err(Error::Io)?
@ -194,37 +194,29 @@ impl GetBlockHeight for Wallet {
.await .await
.map_err(Error::Io)? .map_err(Error::Io)?
.parse::<u32>() .parse::<u32>()
.map_err(Error::Parse)?; .map_err(|err| backoff::Error::Permanent(Error::Parse(err)))?;
Result::<_, backoff::Error<Error>>::Ok(height) Result::<_, backoff::Error<Error>>::Ok(height)
}) })
.await .await
.expect("transient errors to be retried"); .map_err(|err| anyhow!("transient errors to be retried: {:?}", err))?;
BlockHeight::new(height) Ok(BlockHeight::new(height))
} }
} }
#[async_trait] #[async_trait]
impl TransactionBlockHeight for Wallet { impl TransactionBlockHeight for Wallet {
async fn transaction_block_height(&self, txid: Txid) -> BlockHeight { async fn transaction_block_height(&self, txid: Txid) -> Result<BlockHeight> {
// todo: create this url using the join() api in the Url type let url = self.http_url.join(&format!("tx/{}/status", txid))?;
let url = format!("{}tx/{}/status", self.http_url, txid);
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
struct TransactionStatus { struct TransactionStatus {
block_height: Option<u32>, block_height: Option<u32>,
confirmed: bool, confirmed: bool,
} }
// todo: See if we can make this error handling more elegant
// errors
#[derive(Debug)]
enum Error {
Io(reqwest::Error),
NotYetMined,
JsonDeserialisation(reqwest::Error),
}
let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async { let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
let resp = reqwest::Client::new() let resp = reqwest::Client::new()
.request(Method::GET, &url) .request(Method::GET, url.clone())
.send() .send()
.await .await
.map_err(|err| backoff::Error::Transient(Error::Io(err)))?; .map_err(|err| backoff::Error::Transient(Error::Io(err)))?;
@ -241,9 +233,9 @@ impl TransactionBlockHeight for Wallet {
Result::<_, backoff::Error<Error>>::Ok(block_height) Result::<_, backoff::Error<Error>>::Ok(block_height)
}) })
.await .await
.expect("transient errors to be retried"); .map_err(|err| anyhow!("transient errors to be retried: {:?}", err))?;
BlockHeight::new(height) Ok(BlockHeight::new(height))
} }
} }
@ -260,9 +252,9 @@ impl WaitForTransactionFinality for Wallet {
let mut interval = interval(execution_params.bitcoin_avg_block_time / 4); let mut interval = interval(execution_params.bitcoin_avg_block_time / 4);
loop { loop {
let tx_block_height = self.transaction_block_height(txid).await; let tx_block_height = self.transaction_block_height(txid).await?;
tracing::debug!("tx_block_height: {:?}", tx_block_height); tracing::debug!("tx_block_height: {:?}", tx_block_height);
let block_height = self.get_block_height().await; let block_height = self.get_block_height().await?;
tracing::debug!("latest_block_height: {:?}", block_height); tracing::debug!("latest_block_height: {:?}", block_height);
if let Some(confirmations) = block_height.checked_sub(tx_block_height) { if let Some(confirmations) = block_height.checked_sub(tx_block_height) {
tracing::debug!("confirmations: {:?}", confirmations); tracing::debug!("confirmations: {:?}", confirmations);

View File

@ -42,7 +42,7 @@ where
bitcoin_wallet.watch_for_raw_transaction(lock_bitcoin_txid), bitcoin_wallet.watch_for_raw_transaction(lock_bitcoin_txid),
) )
.await .await
.context("Failed to find lock Bitcoin tx")?; .context("Failed to find lock Bitcoin tx")??;
// // We saw the transaction in the mempool, waiting for it to be confirmed. // // We saw the transaction in the mempool, waiting for it to be confirmed.
bitcoin_wallet bitcoin_wallet
@ -158,8 +158,9 @@ where
// First wait for cancel timelock to expire // First wait for cancel timelock to expire
let tx_lock_height = bitcoin_wallet let tx_lock_height = bitcoin_wallet
.transaction_block_height(tx_lock.txid()) .transaction_block_height(tx_lock.txid())
.await; .await?;
poll_until_block_height_is_gte(bitcoin_wallet.as_ref(), tx_lock_height + cancel_timelock).await; poll_until_block_height_is_gte(bitcoin_wallet.as_ref(), tx_lock_height + cancel_timelock)
.await?;
let tx_cancel = bitcoin::TxCancel::new(&tx_lock, cancel_timelock, a.public(), B); let tx_cancel = bitcoin::TxCancel::new(&tx_lock, cancel_timelock, a.public(), B);
@ -216,7 +217,7 @@ where
match select(punish_timelock_expired, seen_refund_tx).await { match select(punish_timelock_expired, seen_refund_tx).await {
Either::Left(_) => Ok((tx_refund, None)), Either::Left(_) => Ok((tx_refund, None)),
Either::Right((published_refund_tx, _)) => Ok((tx_refund, Some(published_refund_tx))), Either::Right((published_refund_tx, _)) => Ok((tx_refund, Some(published_refund_tx?))),
} }
} }

View File

@ -292,7 +292,7 @@ async fn run_until_internal(
AliceState::BtcCancelled { state3, tx_cancel } => { AliceState::BtcCancelled { state3, tx_cancel } => {
let tx_cancel_height = bitcoin_wallet let tx_cancel_height = bitcoin_wallet
.transaction_block_height(tx_cancel.txid()) .transaction_block_height(tx_cancel.txid())
.await; .await?;
let (tx_refund, published_refund_tx) = wait_for_bitcoin_refund( let (tx_refund, published_refund_tx) = wait_for_bitcoin_refund(
&tx_cancel, &tx_cancel,
@ -388,7 +388,7 @@ async fn run_until_internal(
match select(refund_tx_seen, punish_tx_finalised).await { match select(refund_tx_seen, punish_tx_finalised).await {
Either::Left((published_refund_tx, _)) => { Either::Left((published_refund_tx, _)) => {
let spend_key = extract_monero_private_key( let spend_key = extract_monero_private_key(
published_refund_tx, published_refund_tx?,
tx_refund, tx_refund,
state3.s_a, state3.s_a,
state3.a.clone(), state3.a.clone(),

View File

@ -480,7 +480,7 @@ impl State4 {
let tx_redeem_candidate = bitcoin_wallet let tx_redeem_candidate = bitcoin_wallet
.watch_for_raw_transaction(tx_redeem.txid()) .watch_for_raw_transaction(tx_redeem.txid())
.await; .await?;
let tx_redeem_sig = let tx_redeem_sig =
tx_redeem.extract_signature_by_key(tx_redeem_candidate, self.b.public())?; tx_redeem.extract_signature_by_key(tx_redeem_candidate, self.b.public())?;

View File

@ -155,11 +155,7 @@ impl TestContext {
let swap_handle = self.alice_swap_handle.recv().await.unwrap(); let swap_handle = self.alice_swap_handle.recv().await.unwrap();
let state = swap_handle.await.unwrap(); let state = swap_handle.await.unwrap();
assert!( assert!(matches!(state, AliceState::XmrRefunded));
matches!(state, AliceState::XmrRefunded),
"Alice state is not XmrRefunded: {}",
state
);
self.alice_bitcoin_wallet self.alice_bitcoin_wallet
.sync_wallet() .sync_wallet()