Add test for recursive executor

This commit is contained in:
rishflab 2020-11-30 13:25:11 +11:00
parent dca15b6872
commit 24631d464d
9 changed files with 175 additions and 127 deletions

View File

@ -280,7 +280,11 @@ pub async fn swap(
pub type Swarm = libp2p::Swarm<Behaviour>;
fn new_swarm(listen: Multiaddr, transport: SwapTransport, behaviour: Behaviour) -> Result<Swarm> {
pub fn new_swarm(
listen: Multiaddr,
transport: SwapTransport,
behaviour: Behaviour,
) -> Result<Swarm> {
use anyhow::Context as _;
let local_peer_id = behaviour.peer_id();

View File

@ -13,13 +13,11 @@ use crate::{
};
use anyhow::{anyhow, Context, Result};
use async_recursion::async_recursion;
use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic};
use futures::{
future::{select, Either},
pin_mut,
};
use libp2p::request_response::ResponseChannel;
use rand::{CryptoRng, RngCore};
use sha2::Sha256;

View File

@ -19,9 +19,7 @@ use prettytable::{row, Table};
use std::{io, io::Write, process, sync::Arc};
use structopt::StructOpt;
use swap::{
alice::{self, Behaviour},
bitcoin,
bob::{self, Bob},
alice, bitcoin, bob,
cli::Options,
monero,
network::transport::{build, build_tor, SwapTransport},
@ -52,7 +50,7 @@ async fn main() -> Result<()> {
} => {
info!("running swap node as Alice ...");
let behaviour = Behaviour::default();
let behaviour = alice::Behaviour::default();
let local_key_pair = behaviour.identity();
let (listen_addr, _ac, transport) = match tor_port {
@ -100,7 +98,7 @@ async fn main() -> Result<()> {
} => {
info!("running swap node as Bob ...");
let behaviour = Bob::default();
let behaviour = bob::Behaviour::default();
let local_key_pair = behaviour.identity();
let transport = match tor {
@ -180,7 +178,7 @@ async fn swap_as_alice(
db: Database,
addr: Multiaddr,
transport: SwapTransport,
behaviour: Behaviour,
behaviour: alice::Behaviour,
) -> Result<()> {
alice::swap(
bitcoin_wallet,
@ -200,7 +198,7 @@ async fn swap_as_bob(
sats: u64,
alice: Multiaddr,
transport: SwapTransport,
behaviour: Bob,
behaviour: bob::Behaviour,
) -> Result<()> {
let (cmd_tx, mut cmd_rx) = mpsc::channel(1);
let (mut rsp_tx, rsp_rx) = mpsc::channel(1);

View File

@ -17,6 +17,7 @@ use tracing::{debug, info, warn};
use uuid::Uuid;
mod amounts;
mod execution;
mod message0;
mod message1;
mod message2;
@ -53,7 +54,7 @@ pub async fn swap(
mut cmd_tx: Sender<Cmd>,
mut rsp_rx: Receiver<Rsp>,
transport: SwapTransport,
behaviour: Bob,
behaviour: Behaviour,
) -> Result<()> {
struct Network(Swarm);
@ -236,9 +237,9 @@ pub async fn swap(
}
}
pub type Swarm = libp2p::Swarm<Bob>;
pub type Swarm = libp2p::Swarm<Behaviour>;
fn new_swarm(transport: SwapTransport, behaviour: Bob) -> Result<Swarm> {
pub fn new_swarm(transport: SwapTransport, behaviour: Behaviour) -> Result<Swarm> {
let local_peer_id = behaviour.peer_id();
let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
@ -317,7 +318,7 @@ impl From<message3::OutEvent> for OutEvent {
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Bob {
pub struct Behaviour {
pt: PeerTracker,
amounts: Amounts,
message0: Message0,
@ -328,7 +329,7 @@ pub struct Bob {
identity: Keypair,
}
impl Bob {
impl Behaviour {
pub fn identity(&self) -> Keypair {
self.identity.clone()
}
@ -375,8 +376,8 @@ impl Bob {
}
}
impl Default for Bob {
fn default() -> Bob {
impl Default for Behaviour {
fn default() -> Behaviour {
let identity = Keypair::generate_ed25519();
Self {

65
swap/src/bob/execution.rs Normal file
View File

@ -0,0 +1,65 @@
use crate::{
bob::{OutEvent, Swarm},
Cmd, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK,
};
use anyhow::Result;
use rand::{CryptoRng, RngCore};
use std::sync::Arc;
use tokio::{stream::StreamExt, sync::mpsc};
use xmr_btc::bob::{State0, State2};
pub async fn negotiate<R>(
state0: xmr_btc::bob::State0,
amounts: SwapAmounts,
swarm: &mut Swarm,
mut rng: R,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) -> Result<(SwapAmounts, State2)>
where
R: RngCore + CryptoRng + Send,
{
let (mut cmd_tx, _cmd_rx) = mpsc::channel(1);
let (_rsp_tx, mut rsp_rx) = mpsc::channel(1);
// todo: dial the swarm outside
// libp2p::Swarm::dial_addr(&mut swarm, addr)?;
let alice = match swarm.next().await {
OutEvent::ConnectionEstablished(alice) => alice,
other => panic!("unexpected event: {:?}", other),
};
swarm.request_amounts(alice.clone(), amounts.btc.as_sat());
// todo: remove/refactor mspc channel
let (btc, xmr) = match swarm.next().await {
OutEvent::Amounts(amounts) => {
let cmd = Cmd::VerifyAmounts(amounts);
cmd_tx.try_send(cmd)?;
let response = rsp_rx.next().await;
if response == Some(Rsp::Abort) {
panic!("abort response");
}
(amounts.btc, amounts.xmr)
}
other => panic!("unexpected event: {:?}", other),
};
let refund_address = bitcoin_wallet.as_ref().new_address().await?;
swarm.send_message0(alice.clone(), state0.next_message(&mut rng));
let state1 = match swarm.next().await {
OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?,
other => panic!("unexpected event: {:?}", other),
};
swarm.send_message1(alice.clone(), state1.next_message());
let state2 = match swarm.next().await {
OutEvent::Message1(msg) => state1.receive(msg)?,
other => panic!("unexpected event: {:?}", other),
};
swarm.send_message2(alice.clone(), state2.next_message());
Ok((amounts, state2))
}

View File

@ -1,29 +1,24 @@
use crate::{
bob::{OutEvent, Swarm},
state,
bob::{execution::negotiate, OutEvent, Swarm},
storage::Database,
Cmd, Rsp, PUNISH_TIMELOCK, REFUND_TIMELOCK,
SwapAmounts,
};
use anyhow::Result;
use async_recursion::async_recursion;
use futures::{
channel::mpsc::{Receiver, Sender},
StreamExt,
};
use libp2p::PeerId;
use rand::rngs::OsRng;
use std::{process, sync::Arc};
use tracing::info;
use rand::{CryptoRng, RngCore};
use std::sync::Arc;
use uuid::Uuid;
use xmr_btc::bob::{self, State0};
use xmr_btc::bob::{self};
// The same data structure is used for swap execution and recovery.
// This allows for a seamless transition from a failed swap to recovery.
pub enum BobState {
Started(Sender<Cmd>, Receiver<Rsp>, u64, PeerId),
Started {
state0: bob::State0,
amounts: SwapAmounts,
peer_id: PeerId,
},
Negotiated(bob::State2, PeerId),
BtcLocked(bob::State3, PeerId),
XmrLocked(bob::State4, PeerId),
@ -38,80 +33,34 @@ pub enum BobState {
// State machine driver for swap execution
#[async_recursion]
pub async fn swap(
pub async fn swap<R>(
state: BobState,
mut swarm: Swarm,
db: Database,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>,
mut rng: OsRng,
mut rng: R,
swap_id: Uuid,
) -> Result<BobState> {
) -> Result<BobState>
where
R: RngCore + CryptoRng + Send,
{
match state {
BobState::Started(mut cmd_tx, mut rsp_rx, btc, alice_peer_id) => {
// todo: dial the swarm outside
// libp2p::Swarm::dial_addr(&mut swarm, addr)?;
let alice = match swarm.next().await {
OutEvent::ConnectionEstablished(alice) => alice,
other => panic!("unexpected event: {:?}", other),
};
info!("Connection established with: {}", alice);
swarm.request_amounts(alice.clone(), btc);
// todo: remove mspc channel
let (btc, xmr) = match swarm.next().await {
OutEvent::Amounts(amounts) => {
info!("Got amounts from Alice: {:?}", amounts);
let cmd = Cmd::VerifyAmounts(amounts);
cmd_tx.try_send(cmd)?;
let response = rsp_rx.next().await;
if response == Some(Rsp::Abort) {
info!("User rejected amounts proposed by Alice, aborting...");
process::exit(0);
}
info!("User accepted amounts proposed by Alice");
(amounts.btc, amounts.xmr)
}
other => panic!("unexpected event: {:?}", other),
};
let refund_address = bitcoin_wallet.new_address().await?;
let state0 = State0::new(
BobState::Started {
state0,
amounts,
peer_id,
} => {
let (swap_amounts, state2) = negotiate(
state0,
amounts,
&mut swarm,
&mut rng,
btc,
xmr,
REFUND_TIMELOCK,
PUNISH_TIMELOCK,
refund_address,
);
info!("Commencing handshake");
swarm.send_message0(alice.clone(), state0.next_message(&mut rng));
let state1 = match swarm.next().await {
OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?,
other => panic!("unexpected event: {:?}", other),
};
swarm.send_message1(alice.clone(), state1.next_message());
let state2 = match swarm.next().await {
OutEvent::Message1(msg) => state1.receive(msg)?,
other => panic!("unexpected event: {:?}", other),
};
let swap_id = Uuid::new_v4();
db.insert_latest_state(swap_id, state::Bob::Handshaken(state2.clone()).into())
.await?;
swarm.send_message2(alice.clone(), state2.next_message());
info!("Handshake complete");
bitcoin_wallet.clone(),
)
.await?;
swap(
BobState::Negotiated(state2, alice_peer_id),
BobState::Negotiated(state2, peer_id),
swarm,
db,
bitcoin_wallet,
@ -162,7 +111,6 @@ pub async fn swap(
BobState::XmrLocked(state, alice_peer_id) => {
// Alice has locked Xmr
// Bob sends Alice his key
// let cloned = state.clone();
let tx_redeem_encsig = state.tx_redeem_encsig();
// Do we have to wait for a response?
// What if Alice fails to receive this? Should we always resend?

View File

@ -14,8 +14,8 @@ pub mod state;
pub mod storage;
pub mod tor;
const REFUND_TIMELOCK: u32 = 10; // Relative timelock, this is number of blocks. TODO: What should it be?
const PUNISH_TIMELOCK: u32 = 10; // FIXME: What should this be?
pub const REFUND_TIMELOCK: u32 = 10; // Relative timelock, this is number of blocks. TODO: What should it be?
pub const PUNISH_TIMELOCK: u32 = 10; // FIXME: What should this be?
pub type Never = std::convert::Infallible;

View File

@ -1,22 +1,23 @@
use bitcoin_harness::Bitcoind;
use futures::{channel::mpsc, future::try_join};
use futures::{
channel::{
mpsc,
mpsc::{Receiver, Sender},
},
future::try_join,
};
use libp2p::Multiaddr;
use monero_harness::Monero;
use rand::rngs::OsRng;
use std::sync::Arc;
use swap::{
alice, bob, bob::new_swarm, bob_simple, bob_simple::BobState, network::transport::build,
storage::Database,
alice, alice::swap::AliceState, bob, bob::swap::BobState, network::transport::build,
storage::Database, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK,
};
use tempfile::tempdir;
use testcontainers::clients::Cli;
use uuid::Uuid;
use xmr_btc::bitcoin;
// NOTE: For some reason running these tests overflows the stack. In order to
// mitigate this run them with:
//
// RUST_MIN_STACK=100000000 cargo test
use xmr_btc::{bitcoin, cross_curve_dleq};
#[tokio::test]
async fn swap() {
@ -91,7 +92,7 @@ async fn swap() {
let db = Database::open(db_dir.path()).unwrap();
let (cmd_tx, mut _cmd_rx) = mpsc::channel(1);
let (mut rsp_tx, rsp_rx) = mpsc::channel(1);
let bob_behaviour = bob::Bob::default();
let bob_behaviour = bob::Behaviour::default();
let bob_transport = build(bob_behaviour.identity()).unwrap();
let bob_swap = bob::swap(
bob_btc_wallet.clone(),
@ -184,33 +185,66 @@ async fn simple_swap_happy_path() {
));
let bob_xmr_wallet = Arc::new(swap::monero::Wallet(monero.wallet("bob").unwrap().client()));
let alice_behaviour = alice::Alice::default();
// let redeem_address = bitcoin_wallet.as_ref().new_address().await?;
// let punish_address = redeem_address.clone();
let amounts = SwapAmounts {
btc,
xmr: xmr_btc::monero::Amount::from_piconero(xmr),
};
let alice_behaviour = alice::Behaviour::default();
let alice_peer_id = alice_behaviour.peer_id().clone();
let alice_transport = build(alice_behaviour.identity()).unwrap();
let rng = &mut OsRng;
let alice_state = {
let a = bitcoin::SecretKey::new_random(rng);
let s_a = cross_curve_dleq::Scalar::random(rng);
let v_a = xmr_btc::monero::PrivateViewKey::new_random(rng);
AliceState::Started {
amounts,
a,
s_a,
v_a,
}
};
let alice_swarm = alice::new_swarm(alice_multiaddr, alice_transport, alice_behaviour).unwrap();
let alice_swap = alice::swap::swap(
alice_state,
alice_swarm,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
);
let db = Database::open(std::path::Path::new("../.swap-db/")).unwrap();
let alice_swap = todo!();
let db_dir = tempdir().unwrap();
let db = Database::open(db_dir.path()).unwrap();
let (cmd_tx, mut _cmd_rx) = mpsc::channel(1);
let (mut rsp_tx, rsp_rx) = mpsc::channel(1);
let bob_behaviour = bob::Bob::default();
let bob_db_dir = tempdir().unwrap();
let bob_db = Database::open(bob_db_dir.path()).unwrap();
let bob_behaviour = bob::Behaviour::default();
let bob_transport = build(bob_behaviour.identity()).unwrap();
let bob_state = BobState::Started(cmd_tx, rsp_rx, btc_bob.as_sat(), alice_behaviour.peer_id());
let bob_swarm = new_swarm(bob_transport, bob_behaviour).unwrap();
let bob_swap = bob_simple::simple_swap(
let refund_address = bob_btc_wallet.new_address().await.unwrap();
let state0 = xmr_btc::bob::State0::new(
rng,
btc,
xmr_btc::monero::Amount::from_piconero(xmr),
REFUND_TIMELOCK,
PUNISH_TIMELOCK,
refund_address,
);
let bob_state = BobState::Started {
state0,
amounts,
peer_id: alice_peer_id,
};
let bob_swarm = bob::new_swarm(bob_transport, bob_behaviour).unwrap();
let bob_swap = bob::swap::swap(
bob_state,
bob_swarm,
db,
bob_db,
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
);
// automate the verification step by accepting any amounts sent over by Alice
rsp_tx.try_send(swap::Rsp::VerifiedAmounts).unwrap();
try_join(alice_swap, bob_swap).await.unwrap();
let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap();

View File

@ -823,7 +823,7 @@ impl State4 {
where
W: WatchForRawTransaction + TransactionBlockHeight + BlockHeight,
{
let tx_id = self.tx_lock.txid().clone();
let tx_id = self.tx_lock.txid();
let tx_lock_height = bitcoin_wallet.transaction_block_height(tx_id).await;
let t1_timeout =