diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index c7574f84..39accc48 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -7,7 +7,7 @@ use crate::{ execution_params::ExecutionParams, }; use ::bitcoin::{util::psbt::PartiallySignedTransaction, Txid}; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use backoff::{backoff::Constant as ConstantBackoff, tokio::retry}; use bdk::{ @@ -23,12 +23,18 @@ use tokio::{sync::Mutex, time::interval}; const SLED_TREE_NAME: &str = "default_tree"; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] enum Error { + #[error("Sending the request failed")] Io(reqwest::Error), + #[error("Conversion to Integer failed")] Parse(std::num::ParseIntError), + #[error("The transaction is not minded yet")] NotYetMined, - JsonDeserialisation(reqwest::Error), + #[error("Deserialization failed")] + JsonDeserialization(reqwest::Error), + #[error("Electrum client error")] + ElectrumClient(electrum_client::Error), } pub struct Wallet { @@ -162,14 +168,24 @@ impl BroadcastSignedTransaction for Wallet { impl WatchForRawTransaction for Wallet { async fn watch_for_raw_transaction(&self, txid: Txid) -> Result { tracing::debug!("watching for tx: {}", txid); - retry(ConstantBackoff::new(Duration::from_secs(1)), || async { - let client = Client::new(self.rpc_url.as_ref())?; - let tx = client.transaction_get(&txid)?; - tracing::debug!("found tx: {}", txid); - Ok(tx) + let tx = retry(ConstantBackoff::new(Duration::from_secs(1)), || async { + let client = Client::new(self.rpc_url.as_ref()) + .map_err(|err| backoff::Error::Permanent(Error::ElectrumClient(err)))?; + + let tx = client.transaction_get(&txid).map_err(|err| match err { + electrum_client::Error::Protocol(err) => { + tracing::debug!("Received protocol error {} from Electrum, retrying...", err); + backoff::Error::Transient(Error::NotYetMined) + } + err => backoff::Error::Permanent(Error::ElectrumClient(err)), + })?; + + Result::<_, backoff::Error>::Ok(tx) }) .await - .map_err(|err| anyhow!("transient errors to be retried: {:?}", err)) + .context("transient errors to be retried")?; + + Ok(tx) } } @@ -200,7 +216,7 @@ impl GetBlockHeight for Wallet { Result::<_, backoff::Error>::Ok(height) }) .await - .map_err(|err| anyhow!("transient errors to be retried: {:?}", err))?; + .context("transient errors to be retried")?; Ok(BlockHeight::new(height)) } @@ -225,7 +241,7 @@ impl TransactionBlockHeight for Wallet { let tx_status: TransactionStatus = resp .json() .await - .map_err(|err| backoff::Error::Permanent(Error::JsonDeserialisation(err)))?; + .map_err(|err| backoff::Error::Permanent(Error::JsonDeserialization(err)))?; let block_height = tx_status .block_height @@ -234,7 +250,7 @@ impl TransactionBlockHeight for Wallet { Result::<_, backoff::Error>::Ok(block_height) }) .await - .map_err(|err| anyhow!("transient errors to be retried: {:?}", err))?; + .context("transient errors to be retried")?; Ok(BlockHeight::new(height)) } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index df0e5ba9..a02bb24d 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -20,8 +20,8 @@ use libp2p::{ }; use rand::rngs::OsRng; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; -use tracing::{debug, error, trace, warn}; +use tokio::sync::{broadcast, mpsc, mpsc::error::SendError}; +use tracing::{debug, error, trace}; use uuid::Uuid; // TODO: Use dynamic @@ -246,14 +246,16 @@ impl EventLoop { .build() .await?; - let (remote, remote_handle) = alice::run(swap).remote_handle(); - tokio::spawn(remote); + let (swap, swap_handle) = alice::run(swap).remote_handle(); + tokio::spawn(swap); - let _ = self - .swap_handle_sender - .send(remote_handle) - .await - .map_err(|err| warn!("Could not send swap handle over channel: {:?}", err)); + // For testing purposes the handle is currently sent via a channel so we can + // await it. If a remote handle is dropped, the future of the swap is + // also stopped. If we error upon sending the handle through the channel + // we have to call forget to detach the handle from the swap future. + if let Err(SendError(handle)) = self.swap_handle_sender.send(swap_handle).await { + handle.forget(); + } Ok(()) } diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 7d8129d0..8e2870cb 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -86,7 +86,7 @@ async fn run_until_internal( swap_id: Uuid, db: Arc, ) -> Result { - info!("Current state:{}", state); + info!("Current state: {}", state); if is_target_state(&state) { Ok(state) } else {