Refactor recursive function to loop

This should get rid of the ever-growing stack size issue.
This commit is contained in:
Thomas Eizinger 2021-03-26 15:01:17 +11:00
parent fc175a3f53
commit c5827f84ca
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
5 changed files with 80 additions and 137 deletions

View File

@ -127,4 +127,3 @@ jobs:
run: cargo test --package swap --all-features --test ${{ matrix.test_name }} "" run: cargo test --package swap --all-features --test ${{ matrix.test_name }} ""
env: env:
MONERO_ADDITIONAL_SLEEP_PERIOD: 60000 MONERO_ADDITIONAL_SLEEP_PERIOD: 60000
RUST_MIN_STACK: 16777216 # 16 MB. Default is 8MB. This is fine as in tests we start 2 programs: Alice and Bob.

12
Cargo.lock generated
View File

@ -136,17 +136,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "async-recursion"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.48" version = "0.1.48"
@ -3433,7 +3422,6 @@ version = "0.3.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-compression", "async-compression",
"async-recursion",
"async-trait", "async-trait",
"atty", "atty",
"backoff", "backoff",

View File

@ -11,7 +11,6 @@ name = "swap"
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
async-compression = { version = "0.3", features = ["bzip2", "tokio"] } async-compression = { version = "0.3", features = ["bzip2", "tokio"] }
async-recursion = "0.3"
async-trait = "0.1" async-trait = "0.1"
atty = "0.2" atty = "0.2"
backoff = { version = "0.3", features = ["tokio"] } backoff = { version = "0.3", features = ["tokio"] }

View File

@ -1,7 +1,6 @@
//! Run an XMR/BTC swap in the role of Alice. //! Run an XMR/BTC swap in the role of Alice.
//! Alice holds XMR and wishes receive BTC. //! Alice holds XMR and wishes receive BTC.
use crate::bitcoin::{ExpiredTimelocks, TxRedeem}; use crate::bitcoin::{ExpiredTimelocks, TxRedeem};
use crate::database::Database;
use crate::env::Config; use crate::env::Config;
use crate::monero_ext::ScalarExt; use crate::monero_ext::ScalarExt;
use crate::protocol::alice; use crate::protocol::alice;
@ -9,13 +8,10 @@ use crate::protocol::alice::event_loop::EventLoopHandle;
use crate::protocol::alice::AliceState; use crate::protocol::alice::AliceState;
use crate::{bitcoin, database, monero}; use crate::{bitcoin, database, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use async_recursion::async_recursion;
use rand::{CryptoRng, RngCore}; use rand::{CryptoRng, RngCore};
use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::time::timeout; use tokio::time::timeout;
use tracing::{error, info}; use tracing::{error, info};
use uuid::Uuid;
trait Rng: RngCore + CryptoRng + Send {} trait Rng: RngCore + CryptoRng + Send {}
@ -37,41 +33,40 @@ pub async fn run(swap: alice::Swap) -> Result<AliceState> {
#[tracing::instrument(name = "swap", skip(swap,is_target_state), fields(id = %swap.swap_id))] #[tracing::instrument(name = "swap", skip(swap,is_target_state), fields(id = %swap.swap_id))]
pub async fn run_until( pub async fn run_until(
swap: alice::Swap, mut swap: alice::Swap,
is_target_state: fn(&AliceState) -> bool, is_target_state: fn(&AliceState) -> bool,
) -> Result<AliceState> { ) -> Result<AliceState> {
run_until_internal( let mut current_state = swap.state;
swap.state,
is_target_state,
swap.event_loop_handle,
swap.bitcoin_wallet,
swap.monero_wallet,
swap.env_config,
swap.swap_id,
swap.db,
)
.await
}
// State machine driver for swap execution while !is_target_state(&current_state) {
#[async_recursion] current_state = next_state(
#[allow(clippy::too_many_arguments)] current_state,
async fn run_until_internal( &mut swap.event_loop_handle,
state: AliceState, swap.bitcoin_wallet.as_ref(),
is_target_state: fn(&AliceState) -> bool, swap.monero_wallet.as_ref(),
mut event_loop_handle: EventLoopHandle, &swap.env_config,
bitcoin_wallet: Arc<bitcoin::Wallet>, )
monero_wallet: Arc<monero::Wallet>, .await?;
env_config: Config,
swap_id: Uuid, let db_state = (&current_state).into();
db: Arc<Database>, swap.db
) -> Result<AliceState> { .insert_latest_state(swap.swap_id, database::Swap::Alice(db_state))
info!("Current state: {}", state); .await?;
if is_target_state(&state) {
return Ok(state);
} }
let new_state = match state { Ok(current_state)
}
async fn next_state(
state: AliceState,
event_loop_handle: &mut EventLoopHandle,
bitcoin_wallet: &bitcoin::Wallet,
monero_wallet: &monero::Wallet,
env_config: &Config,
) -> Result<AliceState> {
info!("Current state: {}", state);
Ok(match state {
AliceState::Started { state3 } => { AliceState::Started { state3 } => {
timeout( timeout(
env_config.bob_time_to_act, env_config.bob_time_to_act,
@ -121,10 +116,10 @@ async fn run_until_internal(
AliceState::XmrLocked { AliceState::XmrLocked {
state3, state3,
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
} => match state3.expired_timelocks(bitcoin_wallet.as_ref()).await? { } => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => { ExpiredTimelocks::None => {
select! { select! {
_ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { _ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
AliceState::CancelTimelockExpired { AliceState::CancelTimelockExpired {
state3, state3,
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
@ -150,7 +145,7 @@ async fn run_until_internal(
state3, state3,
encrypted_signature, encrypted_signature,
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
} => match state3.expired_timelocks(bitcoin_wallet.as_ref()).await? { } => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => { ExpiredTimelocks::None => {
match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete( match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete(
*encrypted_signature, *encrypted_signature,
@ -168,7 +163,7 @@ async fn run_until_internal(
Err(e) => { Err(e) => {
error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e); error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e);
state3 state3
.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) .wait_for_cancel_timelock_to_expire(bitcoin_wallet)
.await?; .await?;
AliceState::CancelTimelockExpired { AliceState::CancelTimelockExpired {
@ -180,7 +175,7 @@ async fn run_until_internal(
Err(e) => { Err(e) => {
error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e); error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e);
state3 state3
.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) .wait_for_cancel_timelock_to_expire(bitcoin_wallet)
.await?; .await?;
AliceState::CancelTimelockExpired { AliceState::CancelTimelockExpired {
@ -336,20 +331,5 @@ async fn run_until_internal(
AliceState::BtcRedeemed => AliceState::BtcRedeemed, AliceState::BtcRedeemed => AliceState::BtcRedeemed,
AliceState::BtcPunished => AliceState::BtcPunished, AliceState::BtcPunished => AliceState::BtcPunished,
AliceState::SafelyAborted => AliceState::SafelyAborted, AliceState::SafelyAborted => AliceState::SafelyAborted,
}; })
let db_state = (&new_state).into();
db.insert_latest_state(swap_id, database::Swap::Alice(db_state))
.await?;
run_until_internal(
new_state,
is_target_state,
event_loop_handle,
bitcoin_wallet,
monero_wallet,
env_config,
swap_id,
db,
)
.await
} }

View File

@ -1,17 +1,14 @@
use crate::bitcoin::ExpiredTimelocks; use crate::bitcoin::ExpiredTimelocks;
use crate::database::{Database, Swap}; use crate::database::Swap;
use crate::env::Config; use crate::env::Config;
use crate::protocol::bob; use crate::protocol::bob;
use crate::protocol::bob::event_loop::EventLoopHandle; use crate::protocol::bob::event_loop::EventLoopHandle;
use crate::protocol::bob::state::*; use crate::protocol::bob::state::*;
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use async_recursion::async_recursion;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::sync::Arc;
use tokio::select; use tokio::select;
use tracing::trace; use tracing::trace;
use uuid::Uuid;
pub fn is_complete(state: &BobState) -> bool { pub fn is_complete(state: &BobState) -> bool {
matches!( matches!(
@ -29,49 +26,48 @@ pub async fn run(swap: bob::Swap) -> Result<BobState> {
} }
pub async fn run_until( pub async fn run_until(
swap: bob::Swap, mut swap: bob::Swap,
is_target_state: fn(&BobState) -> bool, is_target_state: fn(&BobState) -> bool,
) -> Result<BobState> { ) -> Result<BobState> {
run_until_internal( let mut current_state = swap.state;
swap.state,
is_target_state, while !is_target_state(&current_state) {
swap.event_loop_handle, current_state = next_state(
swap.db, current_state,
swap.bitcoin_wallet, &mut swap.event_loop_handle,
swap.monero_wallet, swap.bitcoin_wallet.as_ref(),
swap.swap_id, swap.monero_wallet.as_ref(),
swap.env_config, &swap.env_config,
swap.receive_monero_address, swap.receive_monero_address,
) )
.await .await?;
let db_state = current_state.clone().into();
swap.db
.insert_latest_state(swap.swap_id, Swap::Bob(db_state))
.await?;
}
Ok(current_state)
} }
// State machine driver for swap execution async fn next_state(
#[allow(clippy::too_many_arguments)]
#[async_recursion]
async fn run_until_internal(
state: BobState, state: BobState,
is_target_state: fn(&BobState) -> bool, event_loop_handle: &mut EventLoopHandle,
mut event_loop_handle: EventLoopHandle, bitcoin_wallet: &bitcoin::Wallet,
db: Database, monero_wallet: &monero::Wallet,
bitcoin_wallet: Arc<bitcoin::Wallet>, env_config: &Config,
monero_wallet: Arc<monero::Wallet>,
swap_id: Uuid,
env_config: Config,
receive_monero_address: monero::Address, receive_monero_address: monero::Address,
) -> Result<BobState> { ) -> Result<BobState> {
trace!("Current state: {}", state); trace!("Current state: {}", state);
if is_target_state(&state) {
return Ok(state);
}
let new_state = match state { Ok(match state {
BobState::Started { btc_amount } => { BobState::Started { btc_amount } => {
let bitcoin_refund_address = bitcoin_wallet.new_address().await?; let bitcoin_refund_address = bitcoin_wallet.new_address().await?;
let state2 = request_price_and_setup( let state2 = request_price_and_setup(
btc_amount, btc_amount,
&mut event_loop_handle, event_loop_handle,
env_config, env_config,
bitcoin_refund_address, bitcoin_refund_address,
) )
@ -93,10 +89,10 @@ async fn run_until_internal(
// Bob has locked Btc // Bob has locked Btc
// Watch for Alice to Lock Xmr or for cancel timelock to elapse // Watch for Alice to Lock Xmr or for cancel timelock to elapse
BobState::BtcLocked(state3) => { BobState::BtcLocked(state3) => {
if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet.as_ref()).await? { if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? {
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires = let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet);
// Record the current monero wallet block height so we don't have to scan from // Record the current monero wallet block height so we don't have to scan from
// block 0 once we create the redeem wallet. // block 0 once we create the redeem wallet.
@ -133,7 +129,7 @@ async fn run_until_internal(
lock_transfer_proof, lock_transfer_proof,
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
} => { } => {
if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet.as_ref()).await? { if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? {
let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); let watch_request = state.lock_xmr_watch_request(lock_transfer_proof);
select! { select! {
@ -142,13 +138,13 @@ async fn run_until_internal(
Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)), Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)),
Err(e) => { Err(e) => {
tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e); tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e);
state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()).await?; state.wait_for_cancel_timelock_to_expire(bitcoin_wallet).await?;
BobState::CancelTimelockExpired(state.cancel()) BobState::CancelTimelockExpired(state.cancel())
}, },
} }
} }
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
BobState::CancelTimelockExpired(state.cancel()) BobState::CancelTimelockExpired(state.cancel())
} }
} }
@ -157,7 +153,7 @@ async fn run_until_internal(
} }
} }
BobState::XmrLocked(state) => { BobState::XmrLocked(state) => {
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
// Alice has locked Xmr // Alice has locked Xmr
// Bob sends Alice his key // Bob sends Alice his key
@ -165,7 +161,7 @@ async fn run_until_internal(
_ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => { _ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => {
BobState::EncSigSent(state) BobState::EncSigSent(state)
}, },
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
BobState::CancelTimelockExpired(state.cancel()) BobState::CancelTimelockExpired(state.cancel())
} }
} }
@ -174,12 +170,12 @@ async fn run_until_internal(
} }
} }
BobState::EncSigSent(state) => { BobState::EncSigSent(state) => {
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
select! { select! {
state5 = state.watch_for_redeem_btc(bitcoin_wallet.as_ref()) => { state5 = state.watch_for_redeem_btc(bitcoin_wallet) => {
BobState::BtcRedeemed(state5?) BobState::BtcRedeemed(state5?)
}, },
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()) => { _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
BobState::CancelTimelockExpired(state.cancel()) BobState::CancelTimelockExpired(state.cancel())
} }
} }
@ -210,26 +206,22 @@ async fn run_until_internal(
} }
} }
BobState::CancelTimelockExpired(state4) => { BobState::CancelTimelockExpired(state4) => {
if state4 if state4.check_for_tx_cancel(bitcoin_wallet).await.is_err() {
.check_for_tx_cancel(bitcoin_wallet.as_ref()) state4.submit_tx_cancel(bitcoin_wallet).await?;
.await
.is_err()
{
state4.submit_tx_cancel(bitcoin_wallet.as_ref()).await?;
} }
BobState::BtcCancelled(state4) BobState::BtcCancelled(state4)
} }
BobState::BtcCancelled(state) => { BobState::BtcCancelled(state) => {
// Bob has cancelled the swap // Bob has cancelled the swap
match state.expired_timelock(bitcoin_wallet.as_ref()).await? { match state.expired_timelock(bitcoin_wallet).await? {
ExpiredTimelocks::None => { ExpiredTimelocks::None => {
bail!( bail!(
"Internal error: canceled state reached before cancel timelock was expired" "Internal error: canceled state reached before cancel timelock was expired"
); );
} }
ExpiredTimelocks::Cancel => { ExpiredTimelocks::Cancel => {
state.refund_btc(bitcoin_wallet.as_ref()).await?; state.refund_btc(bitcoin_wallet).await?;
BobState::BtcRefunded(state) BobState::BtcRefunded(state)
} }
ExpiredTimelocks::Punish => BobState::BtcPunished { ExpiredTimelocks::Punish => BobState::BtcPunished {
@ -241,28 +233,13 @@ async fn run_until_internal(
BobState::BtcPunished { tx_lock_id } => BobState::BtcPunished { tx_lock_id }, BobState::BtcPunished { tx_lock_id } => BobState::BtcPunished { tx_lock_id },
BobState::SafelyAborted => BobState::SafelyAborted, BobState::SafelyAborted => BobState::SafelyAborted,
BobState::XmrRedeemed { tx_lock_id } => BobState::XmrRedeemed { tx_lock_id }, BobState::XmrRedeemed { tx_lock_id } => BobState::XmrRedeemed { tx_lock_id },
}; })
let db_state = new_state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?;
run_until_internal(
new_state,
is_target_state,
event_loop_handle,
db,
bitcoin_wallet,
monero_wallet,
swap_id,
env_config,
receive_monero_address,
)
.await
} }
pub async fn request_price_and_setup( pub async fn request_price_and_setup(
btc: bitcoin::Amount, btc: bitcoin::Amount,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
env_config: Config, env_config: &Config,
bitcoin_refund_address: bitcoin::Address, bitcoin_refund_address: bitcoin::Address,
) -> Result<bob::state::State2> { ) -> Result<bob::state::State2> {
let xmr = event_loop_handle.request_spot_price(btc).await?; let xmr = event_loop_handle.request_spot_price(btc).await?;