Alice resumes swaps

This commit is contained in:
Daniel Karzel 2021-03-26 15:38:30 +11:00 committed by Thomas Eizinger
parent b6e4fb4f9d
commit 2135a6e53e
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
8 changed files with 262 additions and 51 deletions

View File

@ -110,10 +110,12 @@ jobs:
happy_path,
happy_path_restart_bob_after_xmr_locked,
happy_path_restart_bob_before_xmr_locked,
happy_path_restart_alice_after_xmr_locked,
bob_refunds_using_cancel_and_refund_command,
bob_refunds_using_cancel_and_refund_command_timelock_not_expired,
bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force,
punish
punish,
alice_punishes_after_restart_punish_timelock_expired
]
runs-on: ubuntu-latest
steps:

View File

@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- A changelog file.
- Automatic resume of unfinished swaps for the `asb` upon startup.
Unfinished swaps from earlier versions will be skipped.
### Fixed

View File

@ -8,9 +8,11 @@ status = [
"test (macos-latest)",
"docker_tests (happy_path)",
"docker_tests (happy_path_restart_bob_after_xmr_locked)",
"docker_tests (happy_path_restart_alice_after_xmr_locked)",
"docker_tests (happy_path_restart_bob_before_xmr_locked)",
"docker_tests (bob_refunds_using_cancel_and_refund_command)",
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force)",
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)",
"docker_tests (punish)"
"docker_tests (punish)",
"docker_tests (alice_punishes_after_restart_punish_timelock_expired)"
]

View File

@ -1,6 +1,6 @@
use crate::asb::Rate;
#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct FixedRate(Rate);
impl FixedRate {

View File

@ -84,6 +84,43 @@ where
// terminate forever.
self.send_transfer_proof.push(future::pending().boxed());
let unfinished_swaps = match self.db.unfinished_alice() {
Ok(unfinished_swaps) => unfinished_swaps,
Err(_) => {
tracing::error!("Failed to load unfinished swaps");
return;
}
};
for (swap_id, state) in unfinished_swaps {
let peer_id = match self.db.get_peer_id(swap_id) {
Ok(peer_id) => peer_id,
Err(_) => {
tracing::warn!(%swap_id, "Resuming swap skipped because no peer-id found for swap in database");
continue;
}
};
let handle = self.new_handle(peer_id);
let swap = Swap {
event_loop_handle: handle,
bitcoin_wallet: self.bitcoin_wallet.clone(),
monero_wallet: self.monero_wallet.clone(),
env_config: self.env_config,
db: self.db.clone(),
state: state.into(),
swap_id,
};
match self.swap_sender.send(swap).await {
Ok(_) => tracing::info!(%swap_id, "Resuming swap"),
Err(_) => {
tracing::warn!(%swap_id, "Failed to resume swap because receiver has been dropped")
}
}
}
loop {
tokio::select! {
swarm_event = self.swarm.next_event() => {
@ -264,8 +301,18 @@ where
swap_id,
};
if let Err(error) = self.swap_sender.send(swap).await {
tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error);
// TODO: Consider adding separate components for start/rsume of swaps
// swaps save peer id so we can resume
match self.db.insert_peer_id(swap_id, bob_peer_id).await {
Ok(_) => {
if let Err(error) = self.swap_sender.send(swap).await {
tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error);
}
}
Err(error) => {
tracing::warn!(%swap_id, "Unable to save peer-id, swap cannot be spawned: {}", error);
}
}
}

View File

@ -0,0 +1,62 @@
pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::bob_run_until::is_btc_locked;
use harness::FastPunishConfig;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
/// Bob locks Btc and Alice locks Xmr. Bob does not act; he fails to send Alice
/// the encsig and fail to refund or redeem. Alice punishes.
#[tokio::test]
async fn alice_punishes_after_restart_if_punish_timelock_expired() {
harness::setup_test(FastPunishConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_bitcoin_wallet = alice_swap.bitcoin_wallet.clone();
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let alice_state = alice_swap.await??;
// Ensure punish timelock is expired
if let AliceState::XmrLockTransactionSent { state3, .. } = alice_state {
alice_bitcoin_wallet
.subscribe_to(state3.tx_lock)
.await
.wait_until_confirmed_with(state3.punish_timelock)
.await?;
} else {
panic!(
"\
Alice in unexpected state {}",
alice_state
);
}
ctx.restart_alice().await;
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_state = alice_swap.await??;
ctx.assert_alice_punished(alice_state).await;
// Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await?;
ctx.assert_bob_punished(bob_state).await;
Ok(())
})
.await;
}

View File

@ -0,0 +1,40 @@
pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::SlowCancelConfig;
use swap::protocol::alice::AliceState;
use swap::protocol::{alice, bob};
#[tokio::test]
async fn given_alice_restarts_after_xmr_is_locked_resume_swap() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, _) = ctx.bob_swap().await;
let bob_swap = tokio::spawn(bob::run(bob_swap));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_state = alice_swap.await??;
assert!(matches!(
alice_state,
AliceState::XmrLockTransactionSent { .. }
));
ctx.restart_alice().await;
let alice_swap = ctx.alice_next_swap().await;
assert!(matches!(
alice_swap.state,
AliceState::XmrLockTransactionSent { .. }
));
let alice_state = alice::run(alice_swap).await?;
ctx.assert_alice_redeemed(alice_state).await;
let bob_state = bob_swap.await??;
ctx.assert_bob_redeemed(bob_state).await;
Ok(())
})
.await;
}

View File

@ -29,8 +29,9 @@ use tempfile::tempdir;
use testcontainers::clients::Cli;
use testcontainers::{Container, Docker, RunArgs};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;
use tokio::time::interval;
use tokio::time::{interval, timeout};
use tracing_subscriber::util::SubscriberInitExt;
use url::Url;
use uuid::Uuid;
@ -79,24 +80,40 @@ impl BobParams {
}
}
pub struct BobEventLoopJoinHandle(JoinHandle<()>);
pub struct BobApplicationHandle(JoinHandle<()>);
impl BobEventLoopJoinHandle {
impl BobApplicationHandle {
pub fn abort(&self) {
self.0.abort()
}
}
pub struct AliceEventLoopJoinHandle(JoinHandle<()>);
pub struct AliceApplicationHandle {
handle: JoinHandle<()>,
peer_id: PeerId,
}
impl AliceApplicationHandle {
pub fn abort(&self) {
self.handle.abort()
}
}
pub struct TestContext {
env_config: Config,
btc_amount: bitcoin::Amount,
xmr_amount: monero::Amount,
alice_seed: Seed,
alice_db_path: PathBuf,
alice_listen_address: Multiaddr,
alice_starting_balances: StartingBalances,
alice_bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_monero_wallet: Arc<monero::Wallet>,
alice_swap_handle: mpsc::Receiver<Swap>,
alice_handle: AliceApplicationHandle,
bob_params: BobParams,
bob_starting_balances: StartingBalances,
@ -105,11 +122,30 @@ pub struct TestContext {
}
impl TestContext {
pub async fn alice_next_swap(&mut self) -> alice::Swap {
self.alice_swap_handle.recv().await.unwrap()
pub async fn restart_alice(&mut self) {
self.alice_handle.abort();
let (alice_handle, alice_swap_handle) = start_alice(
&self.alice_seed,
self.alice_db_path.clone(),
self.alice_listen_address.clone(),
self.env_config,
self.alice_bitcoin_wallet.clone(),
self.alice_monero_wallet.clone(),
);
self.alice_handle = alice_handle;
self.alice_swap_handle = alice_swap_handle;
}
pub async fn bob_swap(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) {
pub async fn alice_next_swap(&mut self) -> alice::Swap {
timeout(Duration::from_secs(10), self.alice_swap_handle.recv())
.await
.expect("No Alice swap within 10 seconds, aborting because this test is waiting for a swap forever...")
.unwrap()
}
pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) {
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap();
let swap = self
@ -123,13 +159,13 @@ impl TestContext {
let join_handle = tokio::spawn(event_loop.run());
(swap, BobEventLoopJoinHandle(join_handle))
(swap, BobApplicationHandle(join_handle))
}
pub async fn stop_and_resume_bob_from_db(
&mut self,
join_handle: BobEventLoopJoinHandle,
) -> (bob::Swap, BobEventLoopJoinHandle) {
join_handle: BobApplicationHandle,
) -> (bob::Swap, BobApplicationHandle) {
join_handle.abort();
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap();
@ -144,7 +180,7 @@ impl TestContext {
let join_handle = tokio::spawn(event_loop.run());
(swap, BobEventLoopJoinHandle(join_handle))
(swap, BobApplicationHandle(join_handle))
}
pub async fn assert_alice_redeemed(&mut self, state: AliceState) {
@ -462,20 +498,12 @@ where
btc: bitcoin::Amount::ZERO,
};
let port = get_port().expect("Failed to find a free port");
let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
let electrs_rpc_port = containers
.electrs
.get_host_port(harness::electrs::RPC_PORT)
.expect("Could not map electrs rpc port");
let alice_seed = Seed::random().unwrap();
let bob_seed = Seed::random().unwrap();
let (alice_bitcoin_wallet, alice_monero_wallet) = init_test_wallets(
MONERO_WALLET_NAME_ALICE,
containers.bitcoind_url.clone(),
@ -483,16 +511,27 @@ where
alice_starting_balances.clone(),
tempdir().unwrap().path(),
electrs_rpc_port,
alice_seed,
&alice_seed,
env_config,
)
.await;
let db_path = tempdir().unwrap();
let alice_db = Arc::new(Database::open(db_path.path()).unwrap());
let alice_listen_port = get_port().expect("Failed to find a free port");
let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", alice_listen_port)
.parse()
.expect("failed to parse Alice's address");
let alice_seed = Seed::random().unwrap();
let alice_db_path = tempdir().unwrap().into_path();
let (alice_handle, alice_swap_handle) = start_alice(
&alice_seed,
alice_db_path.clone(),
alice_listen_address.clone(),
env_config,
alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(),
);
let bob_seed = Seed::random().unwrap();
let bob_starting_balances = StartingBalances {
xmr: monero::Amount::ZERO,
btc: btc_amount * 10,
@ -505,47 +544,34 @@ where
bob_starting_balances.clone(),
tempdir().unwrap().path(),
electrs_rpc_port,
bob_seed,
&bob_seed,
env_config,
)
.await;
let mut alice_swarm = swarm::new::<alice::Behaviour>(&alice_seed).unwrap();
Swarm::listen_on(&mut alice_swarm, alice_listen_address.clone()).unwrap();
let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new(
alice_swarm,
env_config,
alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(),
alice_db,
FixedRate::default(),
bitcoin::Amount::ONE_BTC,
)
.unwrap();
let alice_peer_id = alice_event_loop.peer_id();
tokio::spawn(alice_event_loop.run());
let bob_params = BobParams {
seed: Seed::random().unwrap(),
db_path: tempdir().unwrap().path().to_path_buf(),
swap_id: Uuid::new_v4(),
bitcoin_wallet: bob_bitcoin_wallet.clone(),
monero_wallet: bob_monero_wallet.clone(),
alice_address: alice_listen_address,
alice_peer_id,
alice_address: alice_listen_address.clone(),
alice_peer_id: alice_handle.peer_id,
env_config,
};
let test = TestContext {
env_config,
btc_amount,
xmr_amount,
alice_seed,
alice_db_path,
alice_listen_address,
alice_starting_balances,
alice_bitcoin_wallet,
alice_monero_wallet,
alice_swap_handle,
alice_handle,
bob_params,
bob_starting_balances,
bob_bitcoin_wallet,
@ -555,6 +581,36 @@ where
testfn(test).await.unwrap()
}
fn start_alice(
seed: &Seed,
db_path: PathBuf,
listen_address: Multiaddr,
env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
) -> (AliceApplicationHandle, Receiver<alice::Swap>) {
let db = Arc::new(Database::open(db_path.as_path()).unwrap());
let mut swarm = swarm::new::<alice::Behaviour>(&seed).unwrap();
Swarm::listen_on(&mut swarm, listen_address).unwrap();
let (event_loop, swap_handle) = alice::EventLoop::new(
swarm,
env_config,
bitcoin_wallet,
monero_wallet,
db,
FixedRate::default(),
bitcoin::Amount::ONE_BTC,
)
.unwrap();
let peer_id = event_loop.peer_id();
let handle = tokio::spawn(event_loop.run());
(AliceApplicationHandle { handle, peer_id }, swap_handle)
}
fn random_prefix() -> String {
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
@ -714,7 +770,7 @@ async fn init_test_wallets(
starting_balances: StartingBalances,
datadir: &Path,
electrum_rpc_port: u16,
seed: Seed,
seed: &Seed,
env_config: Config,
) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) {
monero
@ -847,7 +903,7 @@ impl GetConfig for FastPunishConfig {
fn get_config() -> Config {
Config {
bitcoin_cancel_timelock: CancelTimelock::new(10),
bitcoin_punish_timelock: PunishTimelock::new(1),
bitcoin_punish_timelock: PunishTimelock::new(10),
..env::Regtest::get_config()
}
}