diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ea64ba1..0d448bb1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -110,10 +110,12 @@ jobs: happy_path, happy_path_restart_bob_after_xmr_locked, happy_path_restart_bob_before_xmr_locked, + happy_path_restart_alice_after_xmr_locked, bob_refunds_using_cancel_and_refund_command, bob_refunds_using_cancel_and_refund_command_timelock_not_expired, bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, - punish + punish, + alice_punishes_after_restart_punish_timelock_expired ] runs-on: ubuntu-latest steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 0beb192e..71d1b9ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - A changelog file. +- Automatic resume of unfinished swaps for the `asb` upon startup. + Unfinished swaps from earlier versions will be skipped. ### Fixed diff --git a/bors.toml b/bors.toml index 294fc31b..9882cfa8 100644 --- a/bors.toml +++ b/bors.toml @@ -8,9 +8,11 @@ status = [ "test (macos-latest)", "docker_tests (happy_path)", "docker_tests (happy_path_restart_bob_after_xmr_locked)", + "docker_tests (happy_path_restart_alice_after_xmr_locked)", "docker_tests (happy_path_restart_bob_before_xmr_locked)", "docker_tests (bob_refunds_using_cancel_and_refund_command)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", - "docker_tests (punish)" + "docker_tests (punish)", + "docker_tests (alice_punishes_after_restart_punish_timelock_expired)" ] diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index b78da380..bf42be53 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -1,6 +1,6 @@ use crate::asb::Rate; -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct FixedRate(Rate); impl FixedRate { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 24948741..2ed686fe 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -84,6 +84,43 @@ where // terminate forever. self.send_transfer_proof.push(future::pending().boxed()); + let unfinished_swaps = match self.db.unfinished_alice() { + Ok(unfinished_swaps) => unfinished_swaps, + Err(_) => { + tracing::error!("Failed to load unfinished swaps"); + return; + } + }; + + for (swap_id, state) in unfinished_swaps { + let peer_id = match self.db.get_peer_id(swap_id) { + Ok(peer_id) => peer_id, + Err(_) => { + tracing::warn!(%swap_id, "Resuming swap skipped because no peer-id found for swap in database"); + continue; + } + }; + + let handle = self.new_handle(peer_id); + + let swap = Swap { + event_loop_handle: handle, + bitcoin_wallet: self.bitcoin_wallet.clone(), + monero_wallet: self.monero_wallet.clone(), + env_config: self.env_config, + db: self.db.clone(), + state: state.into(), + swap_id, + }; + + match self.swap_sender.send(swap).await { + Ok(_) => tracing::info!(%swap_id, "Resuming swap"), + Err(_) => { + tracing::warn!(%swap_id, "Failed to resume swap because receiver has been dropped") + } + } + } + loop { tokio::select! { swarm_event = self.swarm.next_event() => { @@ -264,8 +301,18 @@ where swap_id, }; - if let Err(error) = self.swap_sender.send(swap).await { - tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error); + // TODO: Consider adding separate components for start/rsume of swaps + + // swaps save peer id so we can resume + match self.db.insert_peer_id(swap_id, bob_peer_id).await { + Ok(_) => { + if let Err(error) = self.swap_sender.send(swap).await { + tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error); + } + } + Err(error) => { + tracing::warn!(%swap_id, "Unable to save peer-id, swap cannot be spawned: {}", error); + } } } diff --git a/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs b/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs new file mode 100644 index 00000000..5ede52a3 --- /dev/null +++ b/swap/tests/alice_punishes_after_restart_punish_timelock_expired.rs @@ -0,0 +1,62 @@ +pub mod harness; + +use harness::alice_run_until::is_xmr_lock_transaction_sent; +use harness::bob_run_until::is_btc_locked; +use harness::FastPunishConfig; +use swap::protocol::alice::AliceState; +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; + +/// Bob locks Btc and Alice locks Xmr. Bob does not act; he fails to send Alice +/// the encsig and fail to refund or redeem. Alice punishes. +#[tokio::test] +async fn alice_punishes_after_restart_if_punish_timelock_expired() { + harness::setup_test(FastPunishConfig, |mut ctx| async move { + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); + + let alice_swap = ctx.alice_next_swap().await; + let alice_bitcoin_wallet = alice_swap.bitcoin_wallet.clone(); + + let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent)); + + let bob_state = bob_swap.await??; + assert!(matches!(bob_state, BobState::BtcLocked { .. })); + + let alice_state = alice_swap.await??; + + // Ensure punish timelock is expired + if let AliceState::XmrLockTransactionSent { state3, .. } = alice_state { + alice_bitcoin_wallet + .subscribe_to(state3.tx_lock) + .await + .wait_until_confirmed_with(state3.punish_timelock) + .await?; + } else { + panic!( + "\ + Alice in unexpected state {}", + alice_state + ); + } + + ctx.restart_alice().await; + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); + + let alice_state = alice_swap.await??; + ctx.assert_alice_punished(alice_state).await; + + // Restart Bob after Alice punished to ensure Bob transitions to + // punished and does not run indefinitely + let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); + + let bob_state = bob::run(bob_swap).await?; + + ctx.assert_bob_punished(bob_state).await; + + Ok(()) + }) + .await; +} diff --git a/swap/tests/happy_path_restart_alice_after_xmr_locked.rs b/swap/tests/happy_path_restart_alice_after_xmr_locked.rs new file mode 100644 index 00000000..97a6f04a --- /dev/null +++ b/swap/tests/happy_path_restart_alice_after_xmr_locked.rs @@ -0,0 +1,40 @@ +pub mod harness; + +use harness::alice_run_until::is_xmr_lock_transaction_sent; +use harness::SlowCancelConfig; +use swap::protocol::alice::AliceState; +use swap::protocol::{alice, bob}; + +#[tokio::test] +async fn given_alice_restarts_after_xmr_is_locked_resume_swap() { + harness::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, _) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run(bob_swap)); + + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent)); + + let alice_state = alice_swap.await??; + + assert!(matches!( + alice_state, + AliceState::XmrLockTransactionSent { .. } + )); + + ctx.restart_alice().await; + let alice_swap = ctx.alice_next_swap().await; + assert!(matches!( + alice_swap.state, + AliceState::XmrLockTransactionSent { .. } + )); + + let alice_state = alice::run(alice_swap).await?; + ctx.assert_alice_redeemed(alice_state).await; + + let bob_state = bob_swap.await??; + ctx.assert_bob_redeemed(bob_state).await; + + Ok(()) + }) + .await; +} diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index bb4f7444..ad0eb1e8 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -29,8 +29,9 @@ use tempfile::tempdir; use testcontainers::clients::Cli; use testcontainers::{Container, Docker, RunArgs}; use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; use tokio::task::JoinHandle; -use tokio::time::interval; +use tokio::time::{interval, timeout}; use tracing_subscriber::util::SubscriberInitExt; use url::Url; use uuid::Uuid; @@ -79,24 +80,40 @@ impl BobParams { } } -pub struct BobEventLoopJoinHandle(JoinHandle<()>); +pub struct BobApplicationHandle(JoinHandle<()>); -impl BobEventLoopJoinHandle { +impl BobApplicationHandle { pub fn abort(&self) { self.0.abort() } } -pub struct AliceEventLoopJoinHandle(JoinHandle<()>); +pub struct AliceApplicationHandle { + handle: JoinHandle<()>, + peer_id: PeerId, +} + +impl AliceApplicationHandle { + pub fn abort(&self) { + self.handle.abort() + } +} pub struct TestContext { + env_config: Config, + btc_amount: bitcoin::Amount, xmr_amount: monero::Amount, + alice_seed: Seed, + alice_db_path: PathBuf, + alice_listen_address: Multiaddr, + alice_starting_balances: StartingBalances, alice_bitcoin_wallet: Arc, alice_monero_wallet: Arc, alice_swap_handle: mpsc::Receiver, + alice_handle: AliceApplicationHandle, bob_params: BobParams, bob_starting_balances: StartingBalances, @@ -105,11 +122,30 @@ pub struct TestContext { } impl TestContext { - pub async fn alice_next_swap(&mut self) -> alice::Swap { - self.alice_swap_handle.recv().await.unwrap() + pub async fn restart_alice(&mut self) { + self.alice_handle.abort(); + + let (alice_handle, alice_swap_handle) = start_alice( + &self.alice_seed, + self.alice_db_path.clone(), + self.alice_listen_address.clone(), + self.env_config, + self.alice_bitcoin_wallet.clone(), + self.alice_monero_wallet.clone(), + ); + + self.alice_handle = alice_handle; + self.alice_swap_handle = alice_swap_handle; } - pub async fn bob_swap(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) { + pub async fn alice_next_swap(&mut self) -> alice::Swap { + timeout(Duration::from_secs(10), self.alice_swap_handle.recv()) + .await + .expect("No Alice swap within 10 seconds, aborting because this test is waiting for a swap forever...") + .unwrap() + } + + pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) { let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); let swap = self @@ -123,13 +159,13 @@ impl TestContext { let join_handle = tokio::spawn(event_loop.run()); - (swap, BobEventLoopJoinHandle(join_handle)) + (swap, BobApplicationHandle(join_handle)) } pub async fn stop_and_resume_bob_from_db( &mut self, - join_handle: BobEventLoopJoinHandle, - ) -> (bob::Swap, BobEventLoopJoinHandle) { + join_handle: BobApplicationHandle, + ) -> (bob::Swap, BobApplicationHandle) { join_handle.abort(); let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); @@ -144,7 +180,7 @@ impl TestContext { let join_handle = tokio::spawn(event_loop.run()); - (swap, BobEventLoopJoinHandle(join_handle)) + (swap, BobApplicationHandle(join_handle)) } pub async fn assert_alice_redeemed(&mut self, state: AliceState) { @@ -462,20 +498,12 @@ where btc: bitcoin::Amount::ZERO, }; - let port = get_port().expect("Failed to find a free port"); - - let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port) - .parse() - .expect("failed to parse Alice's address"); - let electrs_rpc_port = containers .electrs .get_host_port(harness::electrs::RPC_PORT) .expect("Could not map electrs rpc port"); let alice_seed = Seed::random().unwrap(); - let bob_seed = Seed::random().unwrap(); - let (alice_bitcoin_wallet, alice_monero_wallet) = init_test_wallets( MONERO_WALLET_NAME_ALICE, containers.bitcoind_url.clone(), @@ -483,16 +511,27 @@ where alice_starting_balances.clone(), tempdir().unwrap().path(), electrs_rpc_port, - alice_seed, + &alice_seed, env_config, ) .await; - let db_path = tempdir().unwrap(); - let alice_db = Arc::new(Database::open(db_path.path()).unwrap()); + let alice_listen_port = get_port().expect("Failed to find a free port"); + let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", alice_listen_port) + .parse() + .expect("failed to parse Alice's address"); - let alice_seed = Seed::random().unwrap(); + let alice_db_path = tempdir().unwrap().into_path(); + let (alice_handle, alice_swap_handle) = start_alice( + &alice_seed, + alice_db_path.clone(), + alice_listen_address.clone(), + env_config, + alice_bitcoin_wallet.clone(), + alice_monero_wallet.clone(), + ); + let bob_seed = Seed::random().unwrap(); let bob_starting_balances = StartingBalances { xmr: monero::Amount::ZERO, btc: btc_amount * 10, @@ -505,47 +544,34 @@ where bob_starting_balances.clone(), tempdir().unwrap().path(), electrs_rpc_port, - bob_seed, + &bob_seed, env_config, ) .await; - let mut alice_swarm = swarm::new::(&alice_seed).unwrap(); - Swarm::listen_on(&mut alice_swarm, alice_listen_address.clone()).unwrap(); - - let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new( - alice_swarm, - env_config, - alice_bitcoin_wallet.clone(), - alice_monero_wallet.clone(), - alice_db, - FixedRate::default(), - bitcoin::Amount::ONE_BTC, - ) - .unwrap(); - - let alice_peer_id = alice_event_loop.peer_id(); - - tokio::spawn(alice_event_loop.run()); - let bob_params = BobParams { seed: Seed::random().unwrap(), db_path: tempdir().unwrap().path().to_path_buf(), swap_id: Uuid::new_v4(), bitcoin_wallet: bob_bitcoin_wallet.clone(), monero_wallet: bob_monero_wallet.clone(), - alice_address: alice_listen_address, - alice_peer_id, + alice_address: alice_listen_address.clone(), + alice_peer_id: alice_handle.peer_id, env_config, }; let test = TestContext { + env_config, btc_amount, xmr_amount, + alice_seed, + alice_db_path, + alice_listen_address, alice_starting_balances, alice_bitcoin_wallet, alice_monero_wallet, alice_swap_handle, + alice_handle, bob_params, bob_starting_balances, bob_bitcoin_wallet, @@ -555,6 +581,36 @@ where testfn(test).await.unwrap() } +fn start_alice( + seed: &Seed, + db_path: PathBuf, + listen_address: Multiaddr, + env_config: Config, + bitcoin_wallet: Arc, + monero_wallet: Arc, +) -> (AliceApplicationHandle, Receiver) { + let db = Arc::new(Database::open(db_path.as_path()).unwrap()); + + let mut swarm = swarm::new::(&seed).unwrap(); + Swarm::listen_on(&mut swarm, listen_address).unwrap(); + + let (event_loop, swap_handle) = alice::EventLoop::new( + swarm, + env_config, + bitcoin_wallet, + monero_wallet, + db, + FixedRate::default(), + bitcoin::Amount::ONE_BTC, + ) + .unwrap(); + + let peer_id = event_loop.peer_id(); + let handle = tokio::spawn(event_loop.run()); + + (AliceApplicationHandle { handle, peer_id }, swap_handle) +} + fn random_prefix() -> String { use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; @@ -714,7 +770,7 @@ async fn init_test_wallets( starting_balances: StartingBalances, datadir: &Path, electrum_rpc_port: u16, - seed: Seed, + seed: &Seed, env_config: Config, ) -> (Arc, Arc) { monero @@ -847,7 +903,7 @@ impl GetConfig for FastPunishConfig { fn get_config() -> Config { Config { bitcoin_cancel_timelock: CancelTimelock::new(10), - bitcoin_punish_timelock: PunishTimelock::new(1), + bitcoin_punish_timelock: PunishTimelock::new(10), ..env::Regtest::get_config() } }