Refactor monero::Wallet::watch_for_transfer to not use backoff

Instead, we use a regular loop and extract everything into a function
that can be independently tested.
`backoff` would be useful to retry the actual call to the node.
This commit is contained in:
Thomas Eizinger 2021-03-11 16:06:28 +11:00
parent f0a2134f76
commit 82738b111e
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
4 changed files with 182 additions and 62 deletions

View File

@ -24,7 +24,7 @@ use swap::asb::config::{
initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized,
};
use swap::database::Database;
use swap::execution_params::GetExecutionParams;
use swap::execution_params::{ExecutionParams, GetExecutionParams};
use swap::fs::default_config_path;
use swap::monero::Amount;
use swap::protocol::alice::EventLoop;
@ -84,6 +84,7 @@ async fn main() -> Result<()> {
config.clone(),
&wallet_data_dir,
seed.derive_extended_private_key(BITCOIN_NETWORK)?,
execution_params,
)
.await?;
@ -131,6 +132,7 @@ async fn init_wallets(
config: Config,
bitcoin_wallet_data_dir: &Path,
key: impl DerivableKey<Segwitv0> + Clone,
execution_params: ExecutionParams,
) -> Result<(bitcoin::Wallet, monero::Wallet)> {
let bitcoin_wallet = bitcoin::Wallet::new(
config.bitcoin.electrum_rpc_url,
@ -153,6 +155,7 @@ async fn init_wallets(
config.monero.wallet_rpc_url.clone(),
MONERO_NETWORK,
DEFAULT_WALLET_NAME.to_string(),
execution_params,
);
// Setup the Monero wallet

View File

@ -23,7 +23,7 @@ use swap::bitcoin::{Amount, TxLock};
use swap::cli::command::{AliceConnectParams, Arguments, Command, MoneroParams};
use swap::cli::config::{read_config, Config};
use swap::database::Database;
use swap::execution_params::GetExecutionParams;
use swap::execution_params::{ExecutionParams, GetExecutionParams};
use swap::network::quote::BidQuote;
use swap::protocol::bob;
use swap::protocol::bob::{Builder, EventLoop};
@ -104,8 +104,13 @@ async fn main() -> Result<()> {
}
let bitcoin_wallet = init_bitcoin_wallet(bitcoin_network, &config, seed).await?;
let (monero_wallet, _process) =
init_monero_wallet(monero_network, &config, monero_daemon_host).await?;
let (monero_wallet, _process) = init_monero_wallet(
monero_network,
&config,
monero_daemon_host,
execution_params,
)
.await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let (event_loop, mut event_loop_handle) = EventLoop::new(
&seed.derive_libp2p_identity(),
@ -184,8 +189,13 @@ async fn main() -> Result<()> {
}
let bitcoin_wallet = init_bitcoin_wallet(bitcoin_network, &config, seed).await?;
let (monero_wallet, _process) =
init_monero_wallet(monero_network, &config, monero_daemon_host).await?;
let (monero_wallet, _process) = init_monero_wallet(
monero_network,
&config,
monero_daemon_host,
execution_params,
)
.await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let (event_loop, event_loop_handle) = EventLoop::new(
@ -282,6 +292,7 @@ async fn init_monero_wallet(
monero_network: monero::Network,
config: &Config,
monero_daemon_host: String,
execution_params: ExecutionParams,
) -> Result<(monero::Wallet, monero::WalletRpcProcess)> {
const MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME: &str = "swap-tool-blockchain-monitoring-wallet";
@ -295,6 +306,7 @@ async fn init_monero_wallet(
monero_wallet_rpc_process.endpoint(),
monero_network,
MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME.to_string(),
execution_params,
);
monero_wallet.open_or_create().await?;

View File

@ -1,17 +1,17 @@
use crate::execution_params::ExecutionParams;
use crate::monero::{
Amount, InsufficientFunds, PrivateViewKey, PublicViewKey, TransferProof, TxHash,
};
use ::monero::{Address, Network, PrivateKey, PublicKey};
use anyhow::{Context, Result};
use backoff::backoff::Constant as ConstantBackoff;
use backoff::future::retry;
use bitcoin::hashes::core::sync::atomic::AtomicU32;
use monero_rpc::wallet;
use monero_rpc::wallet::{BlockHeight, Refreshed};
use monero_rpc::wallet::{BlockHeight, CheckTxKey, Refreshed};
use std::cmp::min;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Interval;
use tracing::{debug, info};
use url::Url;
@ -20,22 +20,30 @@ pub struct Wallet {
inner: Mutex<wallet::Client>,
network: Network,
name: String,
exec_params: ExecutionParams,
}
impl Wallet {
pub fn new(url: Url, network: Network, name: String) -> Self {
pub fn new(url: Url, network: Network, name: String, exec_params: ExecutionParams) -> Self {
Self {
inner: Mutex::new(wallet::Client::new(url)),
network,
name,
exec_params,
}
}
pub fn new_with_client(client: wallet::Client, network: Network, name: String) -> Self {
pub fn new_with_client(
client: wallet::Client,
network: Network,
name: String,
exec_params: ExecutionParams,
) -> Self {
Self {
inner: Mutex::new(client),
network,
name,
exec_params,
}
}
@ -157,63 +165,35 @@ impl Wallet {
public_spend_key: PublicKey,
public_view_key: PublicViewKey,
transfer_proof: TransferProof,
expected_amount: Amount,
expected: Amount,
conf_target: u32,
) -> Result<(), InsufficientFunds> {
let txid = &transfer_proof.tx_hash();
let txid = transfer_proof.tx_hash();
tracing::info!(%txid, "Waiting for {} confirmation{} of Monero transaction", conf_target, if conf_target > 1 { "s" } else { "" });
enum Error {
TxNotFound,
InsufficientConfirmations,
InsufficientFunds { expected: Amount, actual: Amount },
}
let address = Address::standard(self.network, public_spend_key, public_view_key.into());
let confirmations = AtomicU32::new(0u32);
let check_interval = tokio::time::interval(min(
self.exec_params.monero_avg_block_time / 10,
Duration::from_secs(1),
));
let key = &transfer_proof.tx_key().to_string();
let res = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
// NOTE: Currently, this is conflicting IO errors with the transaction not being
// in the blockchain yet, or not having enough confirmations on it. All these
// errors warrant a retry, but the strategy should probably differ per case
let proof = self
.inner
.lock()
.await
.check_tx_key(
&String::from(transfer_proof.tx_hash()),
&transfer_proof.tx_key().to_string(),
&address.to_string(),
)
.await
.map_err(|_| backoff::Error::Transient(Error::TxNotFound))?;
if proof.received != expected_amount.as_piconero() {
return Err(backoff::Error::Permanent(Error::InsufficientFunds {
expected: expected_amount,
actual: Amount::from_piconero(proof.received),
}));
}
if proof.confirmations >= confirmations.load(Ordering::SeqCst) {
confirmations.store(proof.confirmations, Ordering::SeqCst);
info!(%txid, "Monero lock tx has {} out of {} confirmations", proof.confirmations, conf_target);
}
if proof.confirmations < conf_target {
return Err(backoff::Error::Transient(Error::InsufficientConfirmations));
}
Ok(proof)
})
.await;
if let Err(Error::InsufficientFunds { expected, actual }) = res {
return Err(InsufficientFunds { expected, actual });
};
wait_for_confirmations(
txid.0,
|txid| async move {
self.inner
.lock()
.await
.check_tx_key(&txid, &key, &address.to_string())
.await
},
check_interval,
expected,
conf_target,
)
.await?;
Ok(())
}
@ -255,3 +235,124 @@ impl Wallet {
Amount::from_monero(0.000_03f64).expect("static fee to be convertible without problems")
}
}
async fn wait_for_confirmations<Fut>(
txid: String,
fetch_tx: impl Fn(String) -> Fut,
mut check_interval: Interval,
expected: Amount,
conf_target: u32,
) -> Result<(), InsufficientFunds>
where
Fut: Future<Output = Result<CheckTxKey>>,
{
let mut seen_confirmations = 0u32;
while seen_confirmations < conf_target {
let tx = match fetch_tx(txid.clone()).await {
Ok(proof) => proof,
Err(error) => {
tracing::debug!(%txid, "Failed to retrieve tx from blockchain: {:#}", error);
continue; // treating every error as transient and retrying
// is obviously wrong but the jsonrpc client is
// too primitive to differentiate between all the
// cases
}
};
let received = Amount::from_piconero(tx.received);
if received != expected {
return Err(InsufficientFunds {
expected,
actual: received,
});
}
if tx.confirmations > seen_confirmations {
seen_confirmations = tx.confirmations;
info!(%txid, "Monero lock tx has {} out of {} confirmations", tx.confirmations, conf_target);
}
check_interval.tick().await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use monero_rpc::wallet::CheckTxKey;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[tokio::test]
async fn given_exact_confirmations_does_not_fetch_tx_again() {
let requests = Arc::new(AtomicU32::new(0));
let result = wait_for_confirmations(
String::from("TXID"),
move |_| {
let requests = requests.clone();
async move {
match requests.fetch_add(1, Ordering::SeqCst) {
0 => Ok(CheckTxKey {
confirmations: 10,
received: 100,
}),
_ => panic!("should not be called more than once"),
}
}
},
tokio::time::interval(Duration::from_millis(10)),
Amount::from_piconero(100),
10,
)
.await;
assert!(result.is_ok())
}
/// A test that allows us to easily, visually verify if the log output is as
/// we desire.
///
/// We want the following properties:
/// - Only print confirmations if they changed i.e. not every time we
/// request them
/// - Also print the last one, i.e. 10 / 10
#[tokio::test]
async fn visual_log_check() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
const MAX_REQUESTS: u32 = 20;
let requests = Arc::new(AtomicU32::new(0));
let result = wait_for_confirmations(
String::from("TXID"),
move |_| {
let requests = requests.clone();
async move {
match requests.fetch_add(1, Ordering::SeqCst) {
requests if requests <= MAX_REQUESTS => {
Ok(CheckTxKey {
confirmations: requests / 2, /* every 2nd request "yields" a
* confirmation */
received: 100,
})
}
_ => panic!("should not be called more than {} times", MAX_REQUESTS),
}
}
},
tokio::time::interval(Duration::from_millis(10)),
Amount::from_piconero(100),
10,
)
.await;
assert!(result.is_ok())
}
}

View File

@ -359,6 +359,7 @@ where
electrs_rpc_port,
electrs_http_port,
alice_seed,
execution_params,
)
.await;
@ -381,6 +382,7 @@ where
electrs_rpc_port,
electrs_http_port,
bob_seed,
execution_params,
)
.await;
@ -590,6 +592,7 @@ async fn init_test_wallets(
electrum_rpc_port: u16,
electrum_http_port: u16,
seed: Seed,
execution_params: ExecutionParams,
) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) {
monero
.init(vec![(name, starting_balances.xmr.as_piconero())])
@ -600,6 +603,7 @@ async fn init_test_wallets(
monero.wallet(name).unwrap().client(),
monero::Network::default(),
name.to_string(),
execution_params,
);
let electrum_rpc_url = {