201: Fix ASB - Prevent the future from being stopped in production r=da-kami a=da-kami



Co-authored-by: Daniel Karzel <daniel@comit.network>
This commit is contained in:
bors[bot] 2021-02-17 04:45:48 +00:00 committed by GitHub
commit 8537b88a68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 22 deletions

View File

@ -7,7 +7,7 @@ use crate::{
execution_params::ExecutionParams, execution_params::ExecutionParams,
}; };
use ::bitcoin::{util::psbt::PartiallySignedTransaction, Txid}; use ::bitcoin::{util::psbt::PartiallySignedTransaction, Txid};
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{backoff::Constant as ConstantBackoff, tokio::retry}; use backoff::{backoff::Constant as ConstantBackoff, tokio::retry};
use bdk::{ use bdk::{
@ -23,12 +23,18 @@ use tokio::{sync::Mutex, time::interval};
const SLED_TREE_NAME: &str = "default_tree"; const SLED_TREE_NAME: &str = "default_tree";
#[derive(Debug)] #[derive(Debug, thiserror::Error)]
enum Error { enum Error {
#[error("Sending the request failed")]
Io(reqwest::Error), Io(reqwest::Error),
#[error("Conversion to Integer failed")]
Parse(std::num::ParseIntError), Parse(std::num::ParseIntError),
#[error("The transaction is not minded yet")]
NotYetMined, NotYetMined,
JsonDeserialisation(reqwest::Error), #[error("Deserialization failed")]
JsonDeserialization(reqwest::Error),
#[error("Electrum client error")]
ElectrumClient(electrum_client::Error),
} }
pub struct Wallet { pub struct Wallet {
@ -162,14 +168,24 @@ impl BroadcastSignedTransaction for Wallet {
impl WatchForRawTransaction for Wallet { impl WatchForRawTransaction for Wallet {
async fn watch_for_raw_transaction(&self, txid: Txid) -> Result<Transaction> { async fn watch_for_raw_transaction(&self, txid: Txid) -> Result<Transaction> {
tracing::debug!("watching for tx: {}", txid); tracing::debug!("watching for tx: {}", txid);
retry(ConstantBackoff::new(Duration::from_secs(1)), || async { let tx = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
let client = Client::new(self.rpc_url.as_ref())?; let client = Client::new(self.rpc_url.as_ref())
let tx = client.transaction_get(&txid)?; .map_err(|err| backoff::Error::Permanent(Error::ElectrumClient(err)))?;
tracing::debug!("found tx: {}", txid);
Ok(tx) let tx = client.transaction_get(&txid).map_err(|err| match err {
electrum_client::Error::Protocol(err) => {
tracing::debug!("Received protocol error {} from Electrum, retrying...", err);
backoff::Error::Transient(Error::NotYetMined)
}
err => backoff::Error::Permanent(Error::ElectrumClient(err)),
})?;
Result::<_, backoff::Error<Error>>::Ok(tx)
}) })
.await .await
.map_err(|err| anyhow!("transient errors to be retried: {:?}", err)) .context("transient errors to be retried")?;
Ok(tx)
} }
} }
@ -200,7 +216,7 @@ impl GetBlockHeight for Wallet {
Result::<_, backoff::Error<Error>>::Ok(height) Result::<_, backoff::Error<Error>>::Ok(height)
}) })
.await .await
.map_err(|err| anyhow!("transient errors to be retried: {:?}", err))?; .context("transient errors to be retried")?;
Ok(BlockHeight::new(height)) Ok(BlockHeight::new(height))
} }
@ -225,7 +241,7 @@ impl TransactionBlockHeight for Wallet {
let tx_status: TransactionStatus = resp let tx_status: TransactionStatus = resp
.json() .json()
.await .await
.map_err(|err| backoff::Error::Permanent(Error::JsonDeserialisation(err)))?; .map_err(|err| backoff::Error::Permanent(Error::JsonDeserialization(err)))?;
let block_height = tx_status let block_height = tx_status
.block_height .block_height
@ -234,7 +250,7 @@ impl TransactionBlockHeight for Wallet {
Result::<_, backoff::Error<Error>>::Ok(block_height) Result::<_, backoff::Error<Error>>::Ok(block_height)
}) })
.await .await
.map_err(|err| anyhow!("transient errors to be retried: {:?}", err))?; .context("transient errors to be retried")?;
Ok(BlockHeight::new(height)) Ok(BlockHeight::new(height))
} }

View File

@ -20,8 +20,8 @@ use libp2p::{
}; };
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc, mpsc::error::SendError};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace};
use uuid::Uuid; use uuid::Uuid;
// TODO: Use dynamic // TODO: Use dynamic
@ -246,14 +246,16 @@ impl EventLoop {
.build() .build()
.await?; .await?;
let (remote, remote_handle) = alice::run(swap).remote_handle(); let (swap, swap_handle) = alice::run(swap).remote_handle();
tokio::spawn(remote); tokio::spawn(swap);
let _ = self // For testing purposes the handle is currently sent via a channel so we can
.swap_handle_sender // await it. If a remote handle is dropped, the future of the swap is
.send(remote_handle) // also stopped. If we error upon sending the handle through the channel
.await // we have to call forget to detach the handle from the swap future.
.map_err(|err| warn!("Could not send swap handle over channel: {:?}", err)); if let Err(SendError(handle)) = self.swap_handle_sender.send(swap_handle).await {
handle.forget();
}
Ok(()) Ok(())
} }

View File

@ -86,7 +86,7 @@ async fn run_until_internal(
swap_id: Uuid, swap_id: Uuid,
db: Arc<Database>, db: Arc<Database>,
) -> Result<AliceState> { ) -> Result<AliceState> {
info!("Current state:{}", state); info!("Current state: {}", state);
if is_target_state(&state) { if is_target_state(&state) {
Ok(state) Ok(state)
} else { } else {