From 82738b111e4975b310b70b564152306ab52d37f9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 Mar 2021 16:06:28 +1100 Subject: [PATCH] Refactor `monero::Wallet::watch_for_transfer` to not use `backoff` Instead, we use a regular loop and extract everything into a function that can be independently tested. `backoff` would be useful to retry the actual call to the node. --- swap/src/bin/asb.rs | 5 +- swap/src/bin/swap.rs | 22 +++- swap/src/monero/wallet.rs | 213 ++++++++++++++++++++++++++---------- swap/tests/testutils/mod.rs | 4 + 4 files changed, 182 insertions(+), 62 deletions(-) diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index a40e46f9..01746d0e 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -24,7 +24,7 @@ use swap::asb::config::{ initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized, }; use swap::database::Database; -use swap::execution_params::GetExecutionParams; +use swap::execution_params::{ExecutionParams, GetExecutionParams}; use swap::fs::default_config_path; use swap::monero::Amount; use swap::protocol::alice::EventLoop; @@ -84,6 +84,7 @@ async fn main() -> Result<()> { config.clone(), &wallet_data_dir, seed.derive_extended_private_key(BITCOIN_NETWORK)?, + execution_params, ) .await?; @@ -131,6 +132,7 @@ async fn init_wallets( config: Config, bitcoin_wallet_data_dir: &Path, key: impl DerivableKey + Clone, + execution_params: ExecutionParams, ) -> Result<(bitcoin::Wallet, monero::Wallet)> { let bitcoin_wallet = bitcoin::Wallet::new( config.bitcoin.electrum_rpc_url, @@ -153,6 +155,7 @@ async fn init_wallets( config.monero.wallet_rpc_url.clone(), MONERO_NETWORK, DEFAULT_WALLET_NAME.to_string(), + execution_params, ); // Setup the Monero wallet diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 9bb981f8..c8e64866 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -23,7 +23,7 @@ use swap::bitcoin::{Amount, TxLock}; use swap::cli::command::{AliceConnectParams, Arguments, Command, MoneroParams}; use swap::cli::config::{read_config, Config}; use swap::database::Database; -use swap::execution_params::GetExecutionParams; +use swap::execution_params::{ExecutionParams, GetExecutionParams}; use swap::network::quote::BidQuote; use swap::protocol::bob; use swap::protocol::bob::{Builder, EventLoop}; @@ -104,8 +104,13 @@ async fn main() -> Result<()> { } let bitcoin_wallet = init_bitcoin_wallet(bitcoin_network, &config, seed).await?; - let (monero_wallet, _process) = - init_monero_wallet(monero_network, &config, monero_daemon_host).await?; + let (monero_wallet, _process) = init_monero_wallet( + monero_network, + &config, + monero_daemon_host, + execution_params, + ) + .await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); let (event_loop, mut event_loop_handle) = EventLoop::new( &seed.derive_libp2p_identity(), @@ -184,8 +189,13 @@ async fn main() -> Result<()> { } let bitcoin_wallet = init_bitcoin_wallet(bitcoin_network, &config, seed).await?; - let (monero_wallet, _process) = - init_monero_wallet(monero_network, &config, monero_daemon_host).await?; + let (monero_wallet, _process) = init_monero_wallet( + monero_network, + &config, + monero_daemon_host, + execution_params, + ) + .await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); let (event_loop, event_loop_handle) = EventLoop::new( @@ -282,6 +292,7 @@ async fn init_monero_wallet( monero_network: monero::Network, config: &Config, monero_daemon_host: String, + execution_params: ExecutionParams, ) -> Result<(monero::Wallet, monero::WalletRpcProcess)> { const MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME: &str = "swap-tool-blockchain-monitoring-wallet"; @@ -295,6 +306,7 @@ async fn init_monero_wallet( monero_wallet_rpc_process.endpoint(), monero_network, MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME.to_string(), + execution_params, ); monero_wallet.open_or_create().await?; diff --git a/swap/src/monero/wallet.rs b/swap/src/monero/wallet.rs index 44f1b8f8..bc762462 100644 --- a/swap/src/monero/wallet.rs +++ b/swap/src/monero/wallet.rs @@ -1,17 +1,17 @@ +use crate::execution_params::ExecutionParams; use crate::monero::{ Amount, InsufficientFunds, PrivateViewKey, PublicViewKey, TransferProof, TxHash, }; use ::monero::{Address, Network, PrivateKey, PublicKey}; use anyhow::{Context, Result}; -use backoff::backoff::Constant as ConstantBackoff; -use backoff::future::retry; -use bitcoin::hashes::core::sync::atomic::AtomicU32; use monero_rpc::wallet; -use monero_rpc::wallet::{BlockHeight, Refreshed}; +use monero_rpc::wallet::{BlockHeight, CheckTxKey, Refreshed}; +use std::cmp::min; +use std::future::Future; use std::str::FromStr; -use std::sync::atomic::Ordering; use std::time::Duration; use tokio::sync::Mutex; +use tokio::time::Interval; use tracing::{debug, info}; use url::Url; @@ -20,22 +20,30 @@ pub struct Wallet { inner: Mutex, network: Network, name: String, + exec_params: ExecutionParams, } impl Wallet { - pub fn new(url: Url, network: Network, name: String) -> Self { + pub fn new(url: Url, network: Network, name: String, exec_params: ExecutionParams) -> Self { Self { inner: Mutex::new(wallet::Client::new(url)), network, name, + exec_params, } } - pub fn new_with_client(client: wallet::Client, network: Network, name: String) -> Self { + pub fn new_with_client( + client: wallet::Client, + network: Network, + name: String, + exec_params: ExecutionParams, + ) -> Self { Self { inner: Mutex::new(client), network, name, + exec_params, } } @@ -157,63 +165,35 @@ impl Wallet { public_spend_key: PublicKey, public_view_key: PublicViewKey, transfer_proof: TransferProof, - expected_amount: Amount, + expected: Amount, conf_target: u32, ) -> Result<(), InsufficientFunds> { - let txid = &transfer_proof.tx_hash(); + let txid = transfer_proof.tx_hash(); tracing::info!(%txid, "Waiting for {} confirmation{} of Monero transaction", conf_target, if conf_target > 1 { "s" } else { "" }); - enum Error { - TxNotFound, - InsufficientConfirmations, - InsufficientFunds { expected: Amount, actual: Amount }, - } - let address = Address::standard(self.network, public_spend_key, public_view_key.into()); - let confirmations = AtomicU32::new(0u32); + let check_interval = tokio::time::interval(min( + self.exec_params.monero_avg_block_time / 10, + Duration::from_secs(1), + )); + let key = &transfer_proof.tx_key().to_string(); - let res = retry(ConstantBackoff::new(Duration::from_secs(1)), || async { - // NOTE: Currently, this is conflicting IO errors with the transaction not being - // in the blockchain yet, or not having enough confirmations on it. All these - // errors warrant a retry, but the strategy should probably differ per case - let proof = self - .inner - .lock() - .await - .check_tx_key( - &String::from(transfer_proof.tx_hash()), - &transfer_proof.tx_key().to_string(), - &address.to_string(), - ) - .await - .map_err(|_| backoff::Error::Transient(Error::TxNotFound))?; - - if proof.received != expected_amount.as_piconero() { - return Err(backoff::Error::Permanent(Error::InsufficientFunds { - expected: expected_amount, - actual: Amount::from_piconero(proof.received), - })); - } - - if proof.confirmations >= confirmations.load(Ordering::SeqCst) { - confirmations.store(proof.confirmations, Ordering::SeqCst); - - info!(%txid, "Monero lock tx has {} out of {} confirmations", proof.confirmations, conf_target); - } - - if proof.confirmations < conf_target { - return Err(backoff::Error::Transient(Error::InsufficientConfirmations)); - } - - Ok(proof) - }) - .await; - - if let Err(Error::InsufficientFunds { expected, actual }) = res { - return Err(InsufficientFunds { expected, actual }); - }; + wait_for_confirmations( + txid.0, + |txid| async move { + self.inner + .lock() + .await + .check_tx_key(&txid, &key, &address.to_string()) + .await + }, + check_interval, + expected, + conf_target, + ) + .await?; Ok(()) } @@ -255,3 +235,124 @@ impl Wallet { Amount::from_monero(0.000_03f64).expect("static fee to be convertible without problems") } } + +async fn wait_for_confirmations( + txid: String, + fetch_tx: impl Fn(String) -> Fut, + mut check_interval: Interval, + expected: Amount, + conf_target: u32, +) -> Result<(), InsufficientFunds> +where + Fut: Future>, +{ + let mut seen_confirmations = 0u32; + + while seen_confirmations < conf_target { + let tx = match fetch_tx(txid.clone()).await { + Ok(proof) => proof, + Err(error) => { + tracing::debug!(%txid, "Failed to retrieve tx from blockchain: {:#}", error); + continue; // treating every error as transient and retrying + // is obviously wrong but the jsonrpc client is + // too primitive to differentiate between all the + // cases + } + }; + + let received = Amount::from_piconero(tx.received); + + if received != expected { + return Err(InsufficientFunds { + expected, + actual: received, + }); + } + + if tx.confirmations > seen_confirmations { + seen_confirmations = tx.confirmations; + info!(%txid, "Monero lock tx has {} out of {} confirmations", tx.confirmations, conf_target); + } + + check_interval.tick().await; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use monero_rpc::wallet::CheckTxKey; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + + #[tokio::test] + async fn given_exact_confirmations_does_not_fetch_tx_again() { + let requests = Arc::new(AtomicU32::new(0)); + + let result = wait_for_confirmations( + String::from("TXID"), + move |_| { + let requests = requests.clone(); + + async move { + match requests.fetch_add(1, Ordering::SeqCst) { + 0 => Ok(CheckTxKey { + confirmations: 10, + received: 100, + }), + _ => panic!("should not be called more than once"), + } + } + }, + tokio::time::interval(Duration::from_millis(10)), + Amount::from_piconero(100), + 10, + ) + .await; + + assert!(result.is_ok()) + } + + /// A test that allows us to easily, visually verify if the log output is as + /// we desire. + /// + /// We want the following properties: + /// - Only print confirmations if they changed i.e. not every time we + /// request them + /// - Also print the last one, i.e. 10 / 10 + #[tokio::test] + async fn visual_log_check() { + let _ = tracing_subscriber::fmt().with_test_writer().try_init(); + const MAX_REQUESTS: u32 = 20; + + let requests = Arc::new(AtomicU32::new(0)); + + let result = wait_for_confirmations( + String::from("TXID"), + move |_| { + let requests = requests.clone(); + + async move { + match requests.fetch_add(1, Ordering::SeqCst) { + requests if requests <= MAX_REQUESTS => { + Ok(CheckTxKey { + confirmations: requests / 2, /* every 2nd request "yields" a + * confirmation */ + received: 100, + }) + } + _ => panic!("should not be called more than {} times", MAX_REQUESTS), + } + } + }, + tokio::time::interval(Duration::from_millis(10)), + Amount::from_piconero(100), + 10, + ) + .await; + + assert!(result.is_ok()) + } +} diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 7b15d44f..3b233090 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -359,6 +359,7 @@ where electrs_rpc_port, electrs_http_port, alice_seed, + execution_params, ) .await; @@ -381,6 +382,7 @@ where electrs_rpc_port, electrs_http_port, bob_seed, + execution_params, ) .await; @@ -590,6 +592,7 @@ async fn init_test_wallets( electrum_rpc_port: u16, electrum_http_port: u16, seed: Seed, + execution_params: ExecutionParams, ) -> (Arc, Arc) { monero .init(vec![(name, starting_balances.xmr.as_piconero())]) @@ -600,6 +603,7 @@ async fn init_test_wallets( monero.wallet(name).unwrap().client(), monero::Network::default(), name.to_string(), + execution_params, ); let electrum_rpc_url = {