WIP: Tried to final state outliving the actor

This commit is contained in:
rishflab 2021-01-14 14:40:49 +11:00
parent 0fe1f7e173
commit 9e6b56564d
6 changed files with 120 additions and 124 deletions

View File

@ -30,7 +30,7 @@ use swap::{
alice, alice,
alice::{swap::AliceActor, AliceState}, alice::{swap::AliceActor, AliceState},
bob, bob,
bob::BobState, bob::{swap::BobActor, BobState},
}, },
seed::Seed, seed::Seed,
trace::init_tracing, trace::init_tracing,
@ -332,18 +332,12 @@ async fn bob_swap(
let bob_behaviour = bob::Behaviour::new(network::Seed::new(seed)); let bob_behaviour = bob::Behaviour::new(network::Seed::new(seed));
let bob_transport = build(bob_behaviour.identity())?; let bob_transport = build(bob_behaviour.identity())?;
let (event_loop, handle) = let (event_loop, event_loop_handle) =
bob::event_loop::EventLoop::new(bob_transport, bob_behaviour, alice_peer_id, alice_addr)?; bob::event_loop::EventLoop::new(bob_transport, bob_behaviour, alice_peer_id, alice_addr)?;
let swap = bob::swap::swap( let mut bob_actor = BobActor::new(event_loop_handle, bitcoin_wallet, monero_wallet, db);
state,
handle, let swap = bob_actor.swap(state, swap_id);
db,
bitcoin_wallet.clone(),
monero_wallet.clone(),
OsRng,
swap_id,
);
tokio::spawn(event_loop.run()); tokio::spawn(event_loop.run());
swap.await swap.await

View File

@ -59,6 +59,7 @@ impl AliceActor {
} }
} }
// TODO: Make a swap abstraction that contains the state and swap id
pub async fn swap(self, start_state: AliceState) -> Result<AliceState> { pub async fn swap(self, start_state: AliceState) -> Result<AliceState> {
self.run_until(start_state, is_complete).await self.run_until(start_state, is_complete).await
} }

View File

@ -1,6 +1,6 @@
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use async_recursion::async_recursion; use async_recursion::async_recursion;
use rand::{CryptoRng, RngCore}; use rand::{rngs::OsRng, CryptoRng, RngCore};
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tracing::info; use tracing::info;
@ -8,122 +8,103 @@ use uuid::Uuid;
use crate::{ use crate::{
bitcoin, bitcoin,
config::Config,
database::{Database, Swap}, database::{Database, Swap},
monero, monero,
protocol::bob::{self, event_loop::EventLoopHandle, state::*}, protocol::bob::{self, event_loop::EventLoopHandle, state::*},
ExpiredTimelocks, SwapAmounts, ExpiredTimelocks, SwapAmounts,
}; };
pub struct BobActor<R> pub struct BobActor {
where
R: RngCore + CryptoRng + Send,
{
event_loop_handle: EventLoopHandle, event_loop_handle: EventLoopHandle,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
db: Database, db: Database,
config: Config,
swap_id: Uuid,
rng: R,
} }
impl<R> BobActor<R> impl BobActor {
where
R: RngCore + CryptoRng + Send,
{
pub fn new( pub fn new(
event_loop_handle: EventLoopHandle, event_loop_handle: EventLoopHandle,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
db: Database, db: Database,
config: Config,
swap_id: Uuid,
rng: R,
) -> Self { ) -> Self {
Self { Self {
event_loop_handle, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
db, db,
config,
swap_id,
rng,
} }
} }
pub async fn swap(self, start_state: BobState) -> Result<BobState> { // TODO: Make a swap abstraction that contains the state and swap id
self.run_until(start_state, is_complete).await pub async fn swap(&mut self, start_state: BobState, swap_id: Uuid) -> Result<BobState> {
self.run_until(start_state, is_complete, swap_id).await
} }
// State machine driver for swap execution // State machine driver for swap execution
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[async_recursion] #[async_recursion]
pub async fn run_until( pub async fn run_until(
mut self, &mut self,
state: BobState, state: BobState,
is_target_state: fn(&BobState) -> bool, is_target_state: fn(&BobState) -> bool,
swap_id: Uuid,
) -> Result<BobState> { ) -> Result<BobState> {
let BobActor {
mut event_loop_handle,
bitcoin_wallet,
monero_wallet,
db,
config,
swap_id,
mut rng,
} = self;
info!("Current state: {}", state); info!("Current state: {}", state);
if is_target_state(&state) { if is_target_state(&state) {
Ok(state) Ok(state)
} else { } else {
match state { match state {
BobState::Started { state0, amounts } => { BobState::Started { state0, amounts } => {
event_loop_handle.dial().await?; self.event_loop_handle.dial().await?;
let state2 = negotiate( let state2 = negotiate(
state0, state0,
amounts, amounts,
&mut event_loop_handle, &mut self.event_loop_handle,
&mut rng, &mut OsRng,
bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
) )
.await?; .await?;
let state = BobState::Negotiated(state2); let state = BobState::Negotiated(state2);
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
BobState::Negotiated(state2) => { BobState::Negotiated(state2) => {
// Do not lock Bitcoin if not connected to Alice. // Do not lock Bitcoin if not connected to Alice.
event_loop_handle.dial().await?; self.event_loop_handle.dial().await?;
// Alice and Bob have exchanged info // Alice and Bob have exchanged info
let state3 = state2.lock_btc(bitcoin_wallet.as_ref()).await?; let state3 = state2.lock_btc(self.bitcoin_wallet.as_ref()).await?;
let state = BobState::BtcLocked(state3); let state = BobState::BtcLocked(state3);
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
// Bob has locked Btc // Bob has locked Btc
// Watch for Alice to Lock Xmr or for cancel timelock to elapse // Watch for Alice to Lock Xmr or for cancel timelock to elapse
BobState::BtcLocked(state3) => { BobState::BtcLocked(state3) => {
let state = if let ExpiredTimelocks::None = let state = if let ExpiredTimelocks::None =
state3.current_epoch(bitcoin_wallet.as_ref()).await? state3.current_epoch(self.bitcoin_wallet.as_ref()).await?
{ {
event_loop_handle.dial().await?; self.event_loop_handle.dial().await?;
let msg2_watcher = event_loop_handle.recv_message2(); let msg2_watcher = self.event_loop_handle.recv_message2();
let cancel_timelock_expires = let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); state3.wait_for_cancel_timelock_to_expire(self.bitcoin_wallet.as_ref());
select! { select! {
msg2 = msg2_watcher => { msg2 = msg2_watcher => {
let xmr_lock_watcher = state3.clone() let xmr_lock_watcher = state3.clone()
.watch_for_lock_xmr(monero_wallet.as_ref(), msg2?); .watch_for_lock_xmr(self.monero_wallet.as_ref(), msg2?);
let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(self.bitcoin_wallet.as_ref());
select! { select! {
state4 = xmr_lock_watcher => { state4 = xmr_lock_watcher => {
@ -146,14 +127,16 @@ where
BobState::CancelTimelockExpired(state4) BobState::CancelTimelockExpired(state4)
}; };
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
BobState::XmrLocked(state) => { BobState::XmrLocked(state) => {
let state = if let ExpiredTimelocks::None = let state = if let ExpiredTimelocks::None =
state.expired_timelock(bitcoin_wallet.as_ref()).await? state.expired_timelock(self.bitcoin_wallet.as_ref()).await?
{ {
event_loop_handle.dial().await?; self.event_loop_handle.dial().await?;
// Alice has locked Xmr // Alice has locked Xmr
// Bob sends Alice his key // Bob sends Alice his key
let tx_redeem_encsig = state.tx_redeem_encsig(); let tx_redeem_encsig = state.tx_redeem_encsig();
@ -161,8 +144,8 @@ where
let state4_clone = state.clone(); let state4_clone = state.clone();
// TODO(Franck): Refund if message cannot be sent. // TODO(Franck): Refund if message cannot be sent.
let enc_sig_sent_watcher = let enc_sig_sent_watcher =
event_loop_handle.send_message3(tx_redeem_encsig); self.event_loop_handle.send_message3(tx_redeem_encsig);
let bitcoin_wallet = bitcoin_wallet.clone(); let bitcoin_wallet = self.bitcoin_wallet.clone();
let cancel_timelock_expires = state4_clone let cancel_timelock_expires = state4_clone
.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); .wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref());
@ -178,18 +161,20 @@ where
BobState::CancelTimelockExpired(state) BobState::CancelTimelockExpired(state)
}; };
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
BobState::EncSigSent(state) => { BobState::EncSigSent(state) => {
let state = if let ExpiredTimelocks::None = let state = if let ExpiredTimelocks::None =
state.expired_timelock(bitcoin_wallet.as_ref()).await? state.expired_timelock(self.bitcoin_wallet.as_ref()).await?
{ {
let state_clone = state.clone(); let state_clone = state.clone();
let redeem_watcher = let redeem_watcher =
state_clone.watch_for_redeem_btc(bitcoin_wallet.as_ref()); state_clone.watch_for_redeem_btc(self.bitcoin_wallet.as_ref());
let cancel_timelock_expires = let cancel_timelock_expires = state_clone
state_clone.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); .wait_for_cancel_timelock_to_expire(self.bitcoin_wallet.as_ref());
select! { select! {
state5 = redeem_watcher => { state5 = redeem_watcher => {
@ -204,49 +189,58 @@ where
}; };
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
BobState::BtcRedeemed(state) => { BobState::BtcRedeemed(state) => {
// Bob redeems XMR using revealed s_a // Bob redeems XMR using revealed s_a
state.claim_xmr(monero_wallet.as_ref()).await?; state.claim_xmr(self.monero_wallet.as_ref()).await?;
let state = BobState::XmrRedeemed; let state = BobState::XmrRedeemed;
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
BobState::CancelTimelockExpired(state4) => { BobState::CancelTimelockExpired(state4) => {
if state4 if state4
.check_for_tx_cancel(bitcoin_wallet.as_ref()) .check_for_tx_cancel(self.bitcoin_wallet.as_ref())
.await .await
.is_err() .is_err()
{ {
state4.submit_tx_cancel(bitcoin_wallet.as_ref()).await?; state4
.submit_tx_cancel(self.bitcoin_wallet.as_ref())
.await?;
} }
let state = BobState::BtcCancelled(state4); let state = BobState::BtcCancelled(state4);
db.insert_latest_state(swap_id, Swap::Bob(state.clone().into())) self.db
.insert_latest_state(swap_id, Swap::Bob(state.clone().into()))
.await?; .await?;
self.run_until(state, is_target_state).await self.run_until(state, is_target_state, swap_id).await
} }
BobState::BtcCancelled(state) => { BobState::BtcCancelled(state) => {
// Bob has cancelled the swap // Bob has cancelled the swap
let state = match state.expired_timelock(bitcoin_wallet.as_ref()).await? { let state = match state.expired_timelock(self.bitcoin_wallet.as_ref()).await? {
ExpiredTimelocks::None => { ExpiredTimelocks::None => {
bail!("Internal error: canceled state reached before cancel timelock was expired"); bail!("Internal error: canceled state reached before cancel timelock was expired");
} }
ExpiredTimelocks::Cancel => { ExpiredTimelocks::Cancel => {
state.refund_btc(bitcoin_wallet.as_ref()).await?; state.refund_btc(self.bitcoin_wallet.as_ref()).await?;
BobState::BtcRefunded(state) BobState::BtcRefunded(state)
} }
ExpiredTimelocks::Punish => BobState::BtcPunished, ExpiredTimelocks::Punish => BobState::BtcPunished,
}; };
let db_state = state.clone().into(); let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?; self.db
self.run_until(state, is_target_state).await .insert_latest_state(swap_id, Swap::Bob(db_state))
.await?;
self.run_until(state, is_target_state, swap_id).await
} }
BobState::BtcRefunded(state4) => Ok(BobState::BtcRefunded(state4)), BobState::BtcRefunded(state4) => Ok(BobState::BtcRefunded(state4)),
BobState::BtcPunished => Ok(BobState::BtcPunished), BobState::BtcPunished => Ok(BobState::BtcPunished),

View File

@ -14,3 +14,18 @@ async fn happy_path() {
}) })
.await; .await;
} }
// #[tokio::test]
// async fn happy_path() {
// testutils::test(|alice_node, bob_node| async move {
// let alice_start_state = unimplemented!();
// let bob_start_state = unimplemented!();
//
// let (alice_end_state, bob_end_state) =
// join!(alice_node.swap(alice_start_state), bob_node.swap(bo));
//
// alice_node.assert_btc_redeemed(alice_end_state);
// bob_node.assert_btc_redeemed(bob_end_state);
// })
// .await;
// }

View File

@ -23,7 +23,6 @@ use tokio::select;
use uuid::Uuid; use uuid::Uuid;
pub struct Alice { pub struct Alice {
state: AliceState,
actor: AliceActor, actor: AliceActor,
event_loop: EventLoop, event_loop: EventLoop,
} }
@ -39,7 +38,7 @@ impl Alice {
listen: Multiaddr, listen: Multiaddr,
config: Config, config: Config,
seed: Seed, seed: Seed,
) -> Alice { ) -> (Alice, AliceState) {
let (alice_btc_wallet, alice_xmr_wallet) = init_wallets( let (alice_btc_wallet, alice_xmr_wallet) = init_wallets(
"alice", "alice",
bitcoind, bitcoind,
@ -66,23 +65,25 @@ impl Alice {
Uuid::new_v4(), Uuid::new_v4(),
); );
Alice { (
state: alice_start_state, Alice {
actor: alice_actor, actor: alice_actor,
event_loop, event_loop,
} },
alice_start_state,
)
} }
pub fn peer_id(&self) -> PeerId { pub fn peer_id(&self) -> PeerId {
self.event_loop.peer_id() self.event_loop.peer_id()
} }
pub async fn swap(mut self) -> Result<()> { pub async fn swap(&mut self) -> Result<()> {
let final_state = select! { let final_state = select! {
res = self.actor.swap(self.state) => res.unwrap(), res = self.actor.swap(self.state) => res.unwrap(),
_ = self.event_loop.run() => panic!("The event loop should never finish") _ = self.event_loop.run() => panic!("The event loop should never finish")
}; };
self.state = final_state; self.final_state = Some(final_state);
Ok(()) Ok(())
} }

View File

@ -1,31 +1,31 @@
use crate::testutils::init_wallets; use crate::testutils::init_wallets;
use anyhow::Result; use anyhow::Result;
use bitcoin_harness::Bitcoind; use bitcoin_harness::Bitcoind;
use futures::future::{select, Select};
use libp2p::{core::Multiaddr, PeerId}; use libp2p::{core::Multiaddr, PeerId};
use monero_harness::Monero; use monero_harness::Monero;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{pin::Pin, sync::Arc}; use std::sync::Arc;
use swap::{ use swap::{
bitcoin, bitcoin,
config::Config, config::Config,
database::Database, database::Database,
monero, network, monero, network,
network::transport::build, network::transport::build,
protocol::{bob, bob::BobState}, protocol::{
bob,
bob::{swap::BobActor, BobState, EventLoop},
},
seed::Seed, seed::Seed,
SwapAmounts, SwapAmounts,
}; };
use tempfile::tempdir; use tempfile::tempdir;
use tokio::select;
use uuid::Uuid; use uuid::Uuid;
pub struct Bob { pub struct Bob {
state: BobState, actor: BobActor,
event_loop: bob::event_loop::EventLoop, event_loop: EventLoop,
event_loop_handle: bob::event_loop::EventLoopHandle, final_state: Option<BobState>,
bitcoin_wallet: Arc<swap::bitcoin::Wallet>,
monero_wallet: Arc<swap::monero::Wallet>,
db: Database,
} }
impl Bob { impl Bob {
@ -50,38 +50,29 @@ impl Bob {
) )
.await; .await;
let bob_state =
init_bob_state(btc_to_swap, xmr_to_swap, bob_btc_wallet.clone(), config).await;
let (event_loop, event_loop_handle) = init_bob_event_loop(alice_peer_id, alice_multiaddr); let (event_loop, event_loop_handle) = init_bob_event_loop(alice_peer_id, alice_multiaddr);
let bob_db_dir = tempdir().unwrap(); let bob_db_dir = tempdir().unwrap();
let bob_db = Database::open(bob_db_dir.path()).unwrap(); let bob_db = Database::open(bob_db_dir.path()).unwrap();
let bob_actor = BobActor::new(event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db);
let bob_state =
init_bob_state(btc_to_swap, xmr_to_swap, bob_btc_wallet.clone(), config).await;
Bob { Bob {
state: bob_state, final_state: Some(bob_state),
actor: bob_actor,
event_loop, event_loop,
event_loop_handle,
bitcoin_wallet: bob_btc_wallet,
monero_wallet: bob_xmr_wallet,
db: bob_db,
} }
} }
pub async fn swap( pub async fn swap(&mut self) -> Result<()> {
&self, let final_state = select! {
) -> Select<Pin<Box<Result<BobState>>>, Pin<Box<Result<bob::EventLoop>>>> { res = self.actor.swap(bob_state, Uuid::new_v4()) => res.unwrap(),
let bob_swap_fut = bob::swap::swap( _ = self.event_loop.run() => panic!("The event loop should never finish")
self.state.clone(), };
self.event_loop_handle, self.final_state = Some(final_state);
self.db, Ok(())
self.bitcoin_wallet,
self.monero_wallet,
OsRng,
Uuid::new_v4(),
);
let bob_fut = select(Box::pin(bob_swap_fut), Box::pin(self.event_loop.run()));
bob_fut
} }
pub async fn assert_btc_redeemed(&self) {} pub async fn assert_btc_redeemed(&self) {}