From c5827f84caef2c33cca4ed82b427a9cefd4e0ba0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 26 Mar 2021 15:01:17 +1100 Subject: [PATCH] Refactor recursive function to loop This should get rid of the ever-growing stack size issue. --- .github/workflows/ci.yml | 1 - Cargo.lock | 12 ---- swap/Cargo.toml | 1 - swap/src/protocol/alice/swap.rs | 90 ++++++++++--------------- swap/src/protocol/bob/swap.rs | 113 +++++++++++++------------------- 5 files changed, 80 insertions(+), 137 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8a548a8f..5ea64ba1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,4 +127,3 @@ jobs: run: cargo test --package swap --all-features --test ${{ matrix.test_name }} "" env: MONERO_ADDITIONAL_SLEEP_PERIOD: 60000 - RUST_MIN_STACK: 16777216 # 16 MB. Default is 8MB. This is fine as in tests we start 2 programs: Alice and Bob. diff --git a/Cargo.lock b/Cargo.lock index 604d3e4f..abb9815d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,17 +136,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "async-recursion" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.48" @@ -3433,7 +3422,6 @@ version = "0.3.0" dependencies = [ "anyhow", "async-compression", - "async-recursion", "async-trait", "atty", "backoff", diff --git a/swap/Cargo.toml b/swap/Cargo.toml index e6502c98..9d5a88be 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -11,7 +11,6 @@ name = "swap" [dependencies] anyhow = "1" async-compression = { version = "0.3", features = ["bzip2", "tokio"] } -async-recursion = "0.3" async-trait = "0.1" atty = "0.2" backoff = { version = "0.3", features = ["tokio"] } diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 4a767c79..bf84bbea 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -1,7 +1,6 @@ //! Run an XMR/BTC swap in the role of Alice. //! Alice holds XMR and wishes receive BTC. use crate::bitcoin::{ExpiredTimelocks, TxRedeem}; -use crate::database::Database; use crate::env::Config; use crate::monero_ext::ScalarExt; use crate::protocol::alice; @@ -9,13 +8,10 @@ use crate::protocol::alice::event_loop::EventLoopHandle; use crate::protocol::alice::AliceState; use crate::{bitcoin, database, monero}; use anyhow::{bail, Context, Result}; -use async_recursion::async_recursion; use rand::{CryptoRng, RngCore}; -use std::sync::Arc; use tokio::select; use tokio::time::timeout; use tracing::{error, info}; -use uuid::Uuid; trait Rng: RngCore + CryptoRng + Send {} @@ -37,41 +33,40 @@ pub async fn run(swap: alice::Swap) -> Result { #[tracing::instrument(name = "swap", skip(swap,is_target_state), fields(id = %swap.swap_id))] pub async fn run_until( - swap: alice::Swap, + mut swap: alice::Swap, is_target_state: fn(&AliceState) -> bool, ) -> Result { - run_until_internal( - swap.state, - is_target_state, - swap.event_loop_handle, - swap.bitcoin_wallet, - swap.monero_wallet, - swap.env_config, - swap.swap_id, - swap.db, - ) - .await -} + let mut current_state = swap.state; -// State machine driver for swap execution -#[async_recursion] -#[allow(clippy::too_many_arguments)] -async fn run_until_internal( - state: AliceState, - is_target_state: fn(&AliceState) -> bool, - mut event_loop_handle: EventLoopHandle, - bitcoin_wallet: Arc, - monero_wallet: Arc, - env_config: Config, - swap_id: Uuid, - db: Arc, -) -> Result { - info!("Current state: {}", state); - if is_target_state(&state) { - return Ok(state); + while !is_target_state(¤t_state) { + current_state = next_state( + current_state, + &mut swap.event_loop_handle, + swap.bitcoin_wallet.as_ref(), + swap.monero_wallet.as_ref(), + &swap.env_config, + ) + .await?; + + let db_state = (¤t_state).into(); + swap.db + .insert_latest_state(swap.swap_id, database::Swap::Alice(db_state)) + .await?; } - let new_state = match state { + Ok(current_state) +} + +async fn next_state( + state: AliceState, + event_loop_handle: &mut EventLoopHandle, + bitcoin_wallet: &bitcoin::Wallet, + monero_wallet: &monero::Wallet, + env_config: &Config, +) -> Result { + info!("Current state: {}", state); + + Ok(match state { AliceState::Started { state3 } => { timeout( env_config.bob_time_to_act, @@ -121,10 +116,10 @@ async fn run_until_internal( AliceState::XmrLocked { state3, monero_wallet_restore_blockheight, - } => match state3.expired_timelocks(bitcoin_wallet.as_ref()).await? { + } => match state3.expired_timelocks(bitcoin_wallet).await? { ExpiredTimelocks::None => { select! { - _ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { + _ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { AliceState::CancelTimelockExpired { state3, monero_wallet_restore_blockheight, @@ -150,7 +145,7 @@ async fn run_until_internal( state3, encrypted_signature, monero_wallet_restore_blockheight, - } => match state3.expired_timelocks(bitcoin_wallet.as_ref()).await? { + } => match state3.expired_timelocks(bitcoin_wallet).await? { ExpiredTimelocks::None => { match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete( *encrypted_signature, @@ -168,7 +163,7 @@ async fn run_until_internal( Err(e) => { error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e); state3 - .wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) + .wait_for_cancel_timelock_to_expire(bitcoin_wallet) .await?; AliceState::CancelTimelockExpired { @@ -180,7 +175,7 @@ async fn run_until_internal( Err(e) => { error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e); state3 - .wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) + .wait_for_cancel_timelock_to_expire(bitcoin_wallet) .await?; AliceState::CancelTimelockExpired { @@ -336,20 +331,5 @@ async fn run_until_internal( AliceState::BtcRedeemed => AliceState::BtcRedeemed, AliceState::BtcPunished => AliceState::BtcPunished, AliceState::SafelyAborted => AliceState::SafelyAborted, - }; - - let db_state = (&new_state).into(); - db.insert_latest_state(swap_id, database::Swap::Alice(db_state)) - .await?; - run_until_internal( - new_state, - is_target_state, - event_loop_handle, - bitcoin_wallet, - monero_wallet, - env_config, - swap_id, - db, - ) - .await + }) } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 0241c703..ac0ef989 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -1,17 +1,14 @@ use crate::bitcoin::ExpiredTimelocks; -use crate::database::{Database, Swap}; +use crate::database::Swap; use crate::env::Config; use crate::protocol::bob; use crate::protocol::bob::event_loop::EventLoopHandle; use crate::protocol::bob::state::*; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; -use async_recursion::async_recursion; use rand::rngs::OsRng; -use std::sync::Arc; use tokio::select; use tracing::trace; -use uuid::Uuid; pub fn is_complete(state: &BobState) -> bool { matches!( @@ -29,49 +26,48 @@ pub async fn run(swap: bob::Swap) -> Result { } pub async fn run_until( - swap: bob::Swap, + mut swap: bob::Swap, is_target_state: fn(&BobState) -> bool, ) -> Result { - run_until_internal( - swap.state, - is_target_state, - swap.event_loop_handle, - swap.db, - swap.bitcoin_wallet, - swap.monero_wallet, - swap.swap_id, - swap.env_config, - swap.receive_monero_address, - ) - .await + let mut current_state = swap.state; + + while !is_target_state(¤t_state) { + current_state = next_state( + current_state, + &mut swap.event_loop_handle, + swap.bitcoin_wallet.as_ref(), + swap.monero_wallet.as_ref(), + &swap.env_config, + swap.receive_monero_address, + ) + .await?; + + let db_state = current_state.clone().into(); + swap.db + .insert_latest_state(swap.swap_id, Swap::Bob(db_state)) + .await?; + } + + Ok(current_state) } -// State machine driver for swap execution -#[allow(clippy::too_many_arguments)] -#[async_recursion] -async fn run_until_internal( +async fn next_state( state: BobState, - is_target_state: fn(&BobState) -> bool, - mut event_loop_handle: EventLoopHandle, - db: Database, - bitcoin_wallet: Arc, - monero_wallet: Arc, - swap_id: Uuid, - env_config: Config, + event_loop_handle: &mut EventLoopHandle, + bitcoin_wallet: &bitcoin::Wallet, + monero_wallet: &monero::Wallet, + env_config: &Config, receive_monero_address: monero::Address, ) -> Result { trace!("Current state: {}", state); - if is_target_state(&state) { - return Ok(state); - } - let new_state = match state { + Ok(match state { BobState::Started { btc_amount } => { let bitcoin_refund_address = bitcoin_wallet.new_address().await?; let state2 = request_price_and_setup( btc_amount, - &mut event_loop_handle, + event_loop_handle, env_config, bitcoin_refund_address, ) @@ -93,10 +89,10 @@ async fn run_until_internal( // Bob has locked Btc // Watch for Alice to Lock Xmr or for cancel timelock to elapse BobState::BtcLocked(state3) => { - if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet.as_ref()).await? { + if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? { let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = - state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); + state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet); // Record the current monero wallet block height so we don't have to scan from // block 0 once we create the redeem wallet. @@ -133,7 +129,7 @@ async fn run_until_internal( lock_transfer_proof, monero_wallet_restore_blockheight, } => { - if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet.as_ref()).await? { + if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? { let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); select! { @@ -142,13 +138,13 @@ async fn run_until_internal( Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)), Err(e) => { tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e); - state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()).await?; + state.wait_for_cancel_timelock_to_expire(bitcoin_wallet).await?; BobState::CancelTimelockExpired(state.cancel()) }, } } - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { + _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -157,7 +153,7 @@ async fn run_until_internal( } } BobState::XmrLocked(state) => { - if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { + if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { // Alice has locked Xmr // Bob sends Alice his key @@ -165,7 +161,7 @@ async fn run_until_internal( _ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => { BobState::EncSigSent(state) }, - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { + _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -174,12 +170,12 @@ async fn run_until_internal( } } BobState::EncSigSent(state) => { - if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { + if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { select! { - state5 = state.watch_for_redeem_btc(bitcoin_wallet.as_ref()) => { + state5 = state.watch_for_redeem_btc(bitcoin_wallet) => { BobState::BtcRedeemed(state5?) }, - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { + _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -210,26 +206,22 @@ async fn run_until_internal( } } BobState::CancelTimelockExpired(state4) => { - if state4 - .check_for_tx_cancel(bitcoin_wallet.as_ref()) - .await - .is_err() - { - state4.submit_tx_cancel(bitcoin_wallet.as_ref()).await?; + if state4.check_for_tx_cancel(bitcoin_wallet).await.is_err() { + state4.submit_tx_cancel(bitcoin_wallet).await?; } BobState::BtcCancelled(state4) } BobState::BtcCancelled(state) => { // Bob has cancelled the swap - match state.expired_timelock(bitcoin_wallet.as_ref()).await? { + match state.expired_timelock(bitcoin_wallet).await? { ExpiredTimelocks::None => { bail!( "Internal error: canceled state reached before cancel timelock was expired" ); } ExpiredTimelocks::Cancel => { - state.refund_btc(bitcoin_wallet.as_ref()).await?; + state.refund_btc(bitcoin_wallet).await?; BobState::BtcRefunded(state) } ExpiredTimelocks::Punish => BobState::BtcPunished { @@ -241,28 +233,13 @@ async fn run_until_internal( BobState::BtcPunished { tx_lock_id } => BobState::BtcPunished { tx_lock_id }, BobState::SafelyAborted => BobState::SafelyAborted, BobState::XmrRedeemed { tx_lock_id } => BobState::XmrRedeemed { tx_lock_id }, - }; - - let db_state = new_state.clone().into(); - db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; - run_until_internal( - new_state, - is_target_state, - event_loop_handle, - db, - bitcoin_wallet, - monero_wallet, - swap_id, - env_config, - receive_monero_address, - ) - .await + }) } pub async fn request_price_and_setup( btc: bitcoin::Amount, event_loop_handle: &mut EventLoopHandle, - env_config: Config, + env_config: &Config, bitcoin_refund_address: bitcoin::Address, ) -> Result { let xmr = event_loop_handle.request_spot_price(btc).await?;