Merge pull request #71 from comit-network/eventloop-review

Few improvements on event loop
This commit is contained in:
rishflab 2020-12-10 15:19:16 +11:00 committed by GitHub
commit c2956b9d2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 163 additions and 893 deletions

View File

@ -2,39 +2,22 @@
//! Alice holds XMR and wishes receive BTC. //! Alice holds XMR and wishes receive BTC.
use self::{amounts::*, message0::*, message1::*, message2::*, message3::*}; use self::{amounts::*, message0::*, message1::*, message2::*, message3::*};
use crate::{ use crate::{
bitcoin,
bitcoin::TX_LOCK_MINE_TIMEOUT,
monero,
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
request_response::AliceToBob, request_response::AliceToBob,
transport::SwapTransport, transport::SwapTransport,
TokioExecutor, TokioExecutor,
}, },
state, SwapAmounts,
storage::Database,
SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK,
}; };
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait;
use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _};
use genawaiter::GeneratorState;
use libp2p::{ use libp2p::{
core::{identity::Keypair, Multiaddr}, core::{identity::Keypair, Multiaddr},
request_response::ResponseChannel, request_response::ResponseChannel,
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use rand::rngs::OsRng; use tracing::{debug, info};
use std::{sync::Arc, time::Duration}; use xmr_btc::{alice::State0, bob};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use uuid::Uuid;
use xmr_btc::{
alice::{self, action_generator, Action, ReceiveBitcoinRedeemEncsig, State0},
bitcoin::BroadcastSignedTransaction,
bob, cross_curve_dleq,
monero::{CreateWalletForOutput, Transfer},
};
mod amounts; mod amounts;
pub mod event_loop; pub mod event_loop;
@ -45,237 +28,6 @@ mod message2;
mod message3; mod message3;
pub mod swap; pub mod swap;
pub async fn swap(
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Database,
listen: Multiaddr,
transport: SwapTransport,
behaviour: Behaviour,
) -> Result<()> {
struct Network {
swarm: Arc<Mutex<Swarm>>,
channel: Option<ResponseChannel<AliceToBob>>,
}
impl Network {
pub async fn send_message2(&mut self, proof: monero::TransferProof) {
match self.channel.take() {
None => warn!("Channel not found, did you call this twice?"),
Some(channel) => {
let mut guard = self.swarm.lock().await;
guard.send_message2(channel, alice::Message2 {
tx_lock_proof: proof,
});
info!("Sent transfer proof");
}
}
}
}
// TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed
// to `ConstantBackoff`.
#[async_trait]
impl ReceiveBitcoinRedeemEncsig for Network {
async fn receive_bitcoin_redeem_encsig(&mut self) -> bitcoin::EncryptedSignature {
#[derive(Debug)]
struct UnexpectedMessage;
let encsig = (|| async {
let mut guard = self.swarm.lock().await;
let encsig = match guard.next().await {
OutEvent::Message3(msg) => msg.tx_redeem_encsig,
other => {
warn!("Expected Bob's Bitcoin redeem encsig, got: {:?}", other);
return Err(backoff::Error::Transient(UnexpectedMessage));
}
};
Result::<_, backoff::Error<UnexpectedMessage>>::Ok(encsig)
})
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await
.expect("transient errors to be retried");
info!("Received Bitcoin redeem encsig");
encsig
}
}
let mut swarm = new_swarm(listen, transport, behaviour)?;
let message0: bob::Message0;
let mut state0: Option<alice::State0> = None;
let mut last_amounts: Option<SwapAmounts> = None;
// TODO: This loop is a neat idea for local development, as it allows us to keep
// Alice up and let Bob keep trying to connect, request amounts and/or send the
// first message of the handshake, but it comes at the cost of needing to handle
// mutable state, which has already been the source of a bug at one point. This
// is an obvious candidate for refactoring
loop {
match swarm.next().await {
OutEvent::ConnectionEstablished(bob) => {
info!("Connection established with: {}", bob);
}
OutEvent::Request(amounts::OutEvent { btc, channel }) => {
let amounts = calculate_amounts(btc);
last_amounts = Some(amounts);
swarm.send_amounts(channel, amounts);
let SwapAmounts { btc, xmr } = amounts;
let redeem_address = bitcoin_wallet.as_ref().new_address().await?;
let punish_address = redeem_address.clone();
// TODO: Pass this in using <R: RngCore + CryptoRng>
let rng = &mut OsRng;
let a = bitcoin::SecretKey::new_random(rng);
let s_a = cross_curve_dleq::Scalar::random(rng);
let v_a = monero::PrivateViewKey::new_random(rng);
let state = State0::new(
a,
s_a,
v_a,
btc,
xmr,
REFUND_TIMELOCK,
PUNISH_TIMELOCK,
redeem_address,
punish_address,
);
state0 = Some(state)
}
OutEvent::Message0(msg) => {
// We don't want Bob to be able to crash us by sending an out of
// order message. Keep looping if Bob has not requested amounts.
if last_amounts.is_some() {
// TODO: We should verify the amounts and notify Bob if they have changed.
message0 = msg;
break;
}
}
other => panic!("Unexpected event: {:?}", other),
};
}
let state1 = state0.expect("to be set").receive(message0)?;
let (state2, channel) = match swarm.next().await {
OutEvent::Message1 { msg, channel } => {
let state2 = state1.receive(msg);
(state2, channel)
}
other => panic!("Unexpected event: {:?}", other),
};
let msg = state2.next_message();
swarm.send_message1(channel, msg);
let (state3, channel) = match swarm.next().await {
OutEvent::Message2 { msg, channel } => {
let state3 = state2.receive(msg)?;
(state3, channel)
}
other => panic!("Unexpected event: {:?}", other),
};
let swap_id = Uuid::new_v4();
db.insert_latest_state(swap_id, state::Alice::Negotiated(state3.clone()).into())
.await?;
info!("Handshake complete, we now have State3 for Alice.");
let network = Arc::new(Mutex::new(Network {
swarm: Arc::new(Mutex::new(swarm)),
channel: Some(channel),
}));
let mut action_generator = action_generator(
network.clone(),
bitcoin_wallet.clone(),
state3.clone(),
TX_LOCK_MINE_TIMEOUT,
);
loop {
let state = action_generator.async_resume().await;
tracing::info!("Resumed execution of generator, got: {:?}", state);
match state {
GeneratorState::Yielded(Action::LockXmr {
amount,
public_spend_key,
public_view_key,
}) => {
db.insert_latest_state(swap_id, state::Alice::BtcLocked(state3.clone()).into())
.await?;
let (transfer_proof, _) = monero_wallet
.transfer(public_spend_key, public_view_key, amount)
.await?;
db.insert_latest_state(swap_id, state::Alice::XmrLocked(state3.clone()).into())
.await?;
let mut guard = network.as_ref().lock().await;
guard.send_message2(transfer_proof).await;
info!("Sent transfer proof");
}
GeneratorState::Yielded(Action::RedeemBtc(tx)) => {
db.insert_latest_state(
swap_id,
state::Alice::BtcRedeemable {
state: state3.clone(),
redeem_tx: tx.clone(),
}
.into(),
)
.await?;
let _ = bitcoin_wallet.broadcast_signed_transaction(tx).await?;
}
GeneratorState::Yielded(Action::CancelBtc(tx)) => {
let _ = bitcoin_wallet.broadcast_signed_transaction(tx).await?;
}
GeneratorState::Yielded(Action::PunishBtc(tx)) => {
db.insert_latest_state(swap_id, state::Alice::BtcPunishable(state3.clone()).into())
.await?;
let _ = bitcoin_wallet.broadcast_signed_transaction(tx).await?;
}
GeneratorState::Yielded(Action::CreateMoneroWalletForOutput {
spend_key,
view_key,
}) => {
db.insert_latest_state(
swap_id,
state::Alice::BtcRefunded {
state: state3.clone(),
spend_key,
view_key,
}
.into(),
)
.await?;
monero_wallet
.create_and_load_wallet_for_output(spend_key, view_key)
.await?;
}
GeneratorState::Complete(()) => {
db.insert_latest_state(swap_id, state::Alice::SwapComplete.into())
.await?;
return Ok(());
}
}
}
}
pub type Swarm = libp2p::Swarm<Behaviour>; pub type Swarm = libp2p::Swarm<Behaviour>;
pub fn new_swarm( pub fn new_swarm(
@ -433,31 +185,3 @@ impl Behaviour {
debug!("Sent Message2"); debug!("Sent Message2");
} }
} }
fn calculate_amounts(btc: ::bitcoin::Amount) -> SwapAmounts {
// TODO (Franck): This should instead verify that the received amounts matches
// the command line arguments This value corresponds to 100 XMR per BTC
const PICONERO_PER_SAT: u64 = 1_000_000;
let picos = btc.as_sat() * PICONERO_PER_SAT;
let xmr = monero::Amount::from_piconero(picos);
SwapAmounts { btc, xmr }
}
#[cfg(test)]
mod tests {
use super::*;
const ONE_BTC: u64 = 100_000_000;
const HUNDRED_XMR: u64 = 100_000_000_000_000;
#[test]
fn one_bitcoin_equals_a_hundred_moneroj() {
let btc = ::bitcoin::Amount::from_sat(ONE_BTC);
let want = monero::Amount::from_piconero(HUNDRED_XMR);
let SwapAmounts { xmr: got, .. } = calculate_amounts(btc);
assert_eq!(got, want);
}
}

View File

@ -3,7 +3,7 @@ use crate::{
network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor},
SwapAmounts, SwapAmounts,
}; };
use anyhow::{Context, Result}; use anyhow::{anyhow, Context, Result};
use futures::FutureExt; use futures::FutureExt;
use libp2p::{ use libp2p::{
core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm,
@ -30,15 +30,15 @@ impl<T> Default for Channels<T> {
} }
pub struct EventLoopHandle { pub struct EventLoopHandle {
pub msg0: Receiver<bob::Message0>, msg0: Receiver<bob::Message0>,
pub msg1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>, msg1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>,
pub msg2: Receiver<(bob::Message2, ResponseChannel<AliceToBob>)>, msg2: Receiver<(bob::Message2, ResponseChannel<AliceToBob>)>,
pub msg3: Receiver<bob::Message3>, msg3: Receiver<bob::Message3>,
pub request: Receiver<crate::alice::amounts::OutEvent>, request: Receiver<crate::alice::amounts::OutEvent>,
pub conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
pub send_amounts: Sender<(ResponseChannel<AliceToBob>, SwapAmounts)>, send_amounts: Sender<(ResponseChannel<AliceToBob>, SwapAmounts)>,
pub send_msg1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>, send_msg1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>,
pub send_msg2: Sender<(ResponseChannel<AliceToBob>, alice::Message2)>, send_msg2: Sender<(ResponseChannel<AliceToBob>, alice::Message2)>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -46,41 +46,42 @@ impl EventLoopHandle {
self.conn_established self.conn_established
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive connection established from Bob")) .ok_or_else(|| anyhow!("Failed to receive connection established from Bob"))
} }
pub async fn recv_message0(&mut self) -> Result<bob::Message0> { pub async fn recv_message0(&mut self) -> Result<bob::Message0> {
self.msg0 self.msg0
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Bob")) .ok_or_else(|| anyhow!("Failed to receive message 0 from Bob"))
} }
pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel<AliceToBob>)> { pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel<AliceToBob>)> {
self.msg1 self.msg1
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Bob")) .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob"))
} }
pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel<AliceToBob>)> { pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel<AliceToBob>)> {
self.msg2 self.msg2
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed o receive message 2 from Bob")) .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob"))
} }
pub async fn recv_message3(&mut self) -> Result<bob::Message3> { pub async fn recv_message3(&mut self) -> Result<bob::Message3> {
self.msg3.recv().await.ok_or_else(|| { self.msg3
anyhow::Error::msg("Failed to receive Bitcoin encrypted signature from Bob") .recv()
}) .await
.ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob"))
} }
pub async fn recv_request(&mut self) -> Result<crate::alice::amounts::OutEvent> { pub async fn recv_request(&mut self) -> Result<crate::alice::amounts::OutEvent> {
self.request self.request
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive amounts request from Bob")) .ok_or_else(|| anyhow!("Failed to receive amounts request from Bob"))
} }
pub async fn send_amounts( pub async fn send_amounts(
@ -112,16 +113,16 @@ impl EventLoopHandle {
} }
pub struct EventLoop { pub struct EventLoop {
pub swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
pub msg0: Sender<bob::Message0>, msg0: Sender<bob::Message0>,
pub msg1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>, msg1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>,
pub msg2: Sender<(bob::Message2, ResponseChannel<AliceToBob>)>, msg2: Sender<(bob::Message2, ResponseChannel<AliceToBob>)>,
pub msg3: Sender<bob::Message3>, msg3: Sender<bob::Message3>,
pub request: Sender<crate::alice::amounts::OutEvent>, request: Sender<crate::alice::amounts::OutEvent>,
pub conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
pub send_amounts: Receiver<(ResponseChannel<AliceToBob>, SwapAmounts)>, send_amounts: Receiver<(ResponseChannel<AliceToBob>, SwapAmounts)>,
pub send_msg1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>, send_msg1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>,
pub send_msg2: Receiver<(ResponseChannel<AliceToBob>, alice::Message2)>, send_msg2: Receiver<(ResponseChannel<AliceToBob>, alice::Message2)>,
} }
impl EventLoop { impl EventLoop {

View File

@ -30,21 +30,20 @@ use xmr_btc::{
pub async fn negotiate( pub async fn negotiate(
state0: xmr_btc::alice::State0, state0: xmr_btc::alice::State0,
amounts: SwapAmounts, amounts: SwapAmounts,
// a: bitcoin::SecretKey, event_loop_handle: &mut EventLoopHandle,
// s_a: cross_curve_dleq::Scalar,
// v_a: monero::PrivateViewKey,
swarm_handle: &mut EventLoopHandle,
// bitcoin_wallet: Arc<bitcoin::Wallet>,
config: Config, config: Config,
) -> Result<(ResponseChannel<AliceToBob>, State3)> { ) -> Result<(ResponseChannel<AliceToBob>, State3)> {
trace!("Starting negotiate"); trace!("Starting negotiate");
// todo: we can move this out, we dont need to timeout here // todo: we can move this out, we dont need to timeout here
let _peer_id = timeout(config.bob_time_to_act, swarm_handle.recv_conn_established()) let _peer_id = timeout(
config.bob_time_to_act,
event_loop_handle.recv_conn_established(),
)
.await .await
.context("Failed to receive dial connection from Bob")??; .context("Failed to receive dial connection from Bob")??;
let event = timeout(config.bob_time_to_act, swarm_handle.recv_request()) let event = timeout(config.bob_time_to_act, event_loop_handle.recv_request())
.await .await
.context("Failed to receive amounts from Bob")??; .context("Failed to receive amounts from Bob")??;
@ -56,23 +55,25 @@ pub async fn negotiate(
); );
} }
swarm_handle.send_amounts(event.channel, amounts).await?; event_loop_handle
.send_amounts(event.channel, amounts)
.await?;
let bob_message0 = timeout(config.bob_time_to_act, swarm_handle.recv_message0()).await??; let bob_message0 = timeout(config.bob_time_to_act, event_loop_handle.recv_message0()).await??;
let state1 = state0.receive(bob_message0)?; let state1 = state0.receive(bob_message0)?;
let (bob_message1, channel) = let (bob_message1, channel) =
timeout(config.bob_time_to_act, swarm_handle.recv_message1()).await??; timeout(config.bob_time_to_act, event_loop_handle.recv_message1()).await??;
let state2 = state1.receive(bob_message1); let state2 = state1.receive(bob_message1);
swarm_handle event_loop_handle
.send_message1(channel, state2.next_message()) .send_message1(channel, state2.next_message())
.await?; .await?;
let (bob_message2, channel) = let (bob_message2, channel) =
timeout(config.bob_time_to_act, swarm_handle.recv_message2()).await??; timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??;
let state3 = state2.receive(bob_message2)?; let state3 = state2.receive(bob_message2)?;
@ -107,7 +108,7 @@ pub async fn lock_xmr<W>(
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<AliceToBob>,
amounts: SwapAmounts, amounts: SwapAmounts,
state3: State3, state3: State3,
swarm: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
monero_wallet: Arc<W>, monero_wallet: Arc<W>,
) -> Result<()> ) -> Result<()>
where where
@ -126,7 +127,7 @@ where
// TODO(Franck): Wait for Monero to be confirmed once // TODO(Franck): Wait for Monero to be confirmed once
swarm event_loop_handle
.send_message2(channel, alice::Message2 { .send_message2(channel, alice::Message2 {
tx_lock_proof: transfer_proof, tx_lock_proof: transfer_proof,
}) })
@ -136,10 +137,10 @@ where
} }
pub async fn wait_for_bitcoin_encrypted_signature( pub async fn wait_for_bitcoin_encrypted_signature(
swarm: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<EncryptedSignature> { ) -> Result<EncryptedSignature> {
let msg3 = timeout(timeout_duration, swarm.recv_message3()) let msg3 = timeout(timeout_duration, event_loop_handle.recv_message3())
.await .await
.context("Failed to receive Bitcoin encrypted signature from Bob")??; .context("Failed to receive Bitcoin encrypted signature from Bob")??;
Ok(msg3.tx_redeem_encsig) Ok(msg3.tx_redeem_encsig)

View File

@ -76,7 +76,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
} => { } => {
if let BobToAlice::Message0(msg) = request { if let BobToAlice::Message0(msg) = request {
debug!("Received Message0"); debug!("Received Message0");
// TODO(Franck): Move this business logic out of the network behaviour.
let response = AliceToBob::Message0(self.state.next_message(&mut OsRng)); let response = AliceToBob::Message0(self.state.next_message(&mut OsRng));
self.rr.send_response(channel, response); self.rr.send_response(channel, response);

View File

@ -104,7 +104,7 @@ impl fmt::Display for AliceState {
pub async fn swap( pub async fn swap(
state: AliceState, state: AliceState,
swarm: EventLoopHandle, event_loop_handle: EventLoopHandle,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>, monero_wallet: Arc<crate::monero::Wallet>,
config: Config, config: Config,
@ -112,7 +112,7 @@ pub async fn swap(
run_until( run_until(
state, state,
is_complete, is_complete,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -142,18 +142,19 @@ pub fn is_xmr_locked(state: &AliceState) -> bool {
pub async fn run_until( pub async fn run_until(
state: AliceState, state: AliceState,
is_target_state: fn(&AliceState) -> bool, is_target_state: fn(&AliceState) -> bool,
mut swarm: EventLoopHandle, mut event_loop_handle: EventLoopHandle,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>, monero_wallet: Arc<crate::monero::Wallet>,
config: Config, config: Config,
) -> Result<(AliceState, EventLoopHandle)> { ) -> Result<(AliceState, EventLoopHandle)> {
info!("Current state:{}", state); info!("Current state:{}", state);
if is_target_state(&state) { if is_target_state(&state) {
Ok((state, swarm)) Ok((state, event_loop_handle))
} else { } else {
match state { match state {
AliceState::Started { amounts, state0 } => { AliceState::Started { amounts, state0 } => {
let (channel, state3) = negotiate(state0, amounts, &mut swarm, config).await?; let (channel, state3) =
negotiate(state0, amounts, &mut event_loop_handle, config).await?;
run_until( run_until(
AliceState::Negotiated { AliceState::Negotiated {
@ -162,7 +163,7 @@ pub async fn run_until(
state3, state3,
}, },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -185,7 +186,7 @@ pub async fn run_until(
state3, state3,
}, },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -201,7 +202,7 @@ pub async fn run_until(
channel, channel,
amounts, amounts,
state3.clone(), state3.clone(),
&mut swarm, &mut event_loop_handle,
monero_wallet.clone(), monero_wallet.clone(),
) )
.await?; .await?;
@ -209,7 +210,7 @@ pub async fn run_until(
run_until( run_until(
AliceState::XmrLocked { state3 }, AliceState::XmrLocked { state3 },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -220,7 +221,7 @@ pub async fn run_until(
// Our Monero is locked, we need to go through the cancellation process if this // Our Monero is locked, we need to go through the cancellation process if this
// step fails // step fails
match wait_for_bitcoin_encrypted_signature( match wait_for_bitcoin_encrypted_signature(
&mut swarm, &mut event_loop_handle,
config.monero_max_finality_time, config.monero_max_finality_time,
) )
.await .await
@ -232,7 +233,7 @@ pub async fn run_until(
encrypted_signature, encrypted_signature,
}, },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -243,7 +244,7 @@ pub async fn run_until(
run_until( run_until(
AliceState::WaitingToCancel { state3 }, AliceState::WaitingToCancel { state3 },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -269,7 +270,7 @@ pub async fn run_until(
return run_until( return run_until(
AliceState::WaitingToCancel { state3 }, AliceState::WaitingToCancel { state3 },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -291,7 +292,7 @@ pub async fn run_until(
run_until( run_until(
AliceState::BtcRedeemed, AliceState::BtcRedeemed,
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -312,7 +313,7 @@ pub async fn run_until(
run_until( run_until(
AliceState::BtcCancelled { state3, tx_cancel }, AliceState::BtcCancelled { state3, tx_cancel },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
config, config,
@ -339,7 +340,7 @@ pub async fn run_until(
run_until( run_until(
AliceState::BtcPunishable { tx_refund, state3 }, AliceState::BtcPunishable { tx_refund, state3 },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
monero_wallet, monero_wallet,
config, config,
@ -354,7 +355,7 @@ pub async fn run_until(
state3, state3,
}, },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
monero_wallet, monero_wallet,
config, config,
@ -381,7 +382,7 @@ pub async fn run_until(
.create_and_load_wallet_for_output(spend_key, view_key) .create_and_load_wallet_for_output(spend_key, view_key)
.await?; .await?;
Ok((AliceState::XmrRefunded, swarm)) Ok((AliceState::XmrRefunded, event_loop_handle))
} }
AliceState::BtcPunishable { tx_refund, state3 } => { AliceState::BtcPunishable { tx_refund, state3 } => {
let signed_tx_punish = build_bitcoin_punish_transaction( let signed_tx_punish = build_bitcoin_punish_transaction(
@ -410,7 +411,7 @@ pub async fn run_until(
run_until( run_until(
AliceState::Punished, AliceState::Punished,
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
monero_wallet, monero_wallet,
config, config,
@ -425,7 +426,7 @@ pub async fn run_until(
state3, state3,
}, },
is_target_state, is_target_state,
swarm, event_loop_handle,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
monero_wallet, monero_wallet,
config, config,
@ -434,10 +435,10 @@ pub async fn run_until(
} }
} }
} }
AliceState::XmrRefunded => Ok((AliceState::XmrRefunded, swarm)), AliceState::XmrRefunded => Ok((AliceState::XmrRefunded, event_loop_handle)),
AliceState::BtcRedeemed => Ok((AliceState::BtcRedeemed, swarm)), AliceState::BtcRedeemed => Ok((AliceState::BtcRedeemed, event_loop_handle)),
AliceState::Punished => Ok((AliceState::Punished, swarm)), AliceState::Punished => Ok((AliceState::Punished, event_loop_handle)),
AliceState::SafelyAborted => Ok((AliceState::SafelyAborted, swarm)), AliceState::SafelyAborted => Ok((AliceState::SafelyAborted, event_loop_handle)),
} }
} }
} }

View File

@ -13,275 +13,8 @@
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
use anyhow::Result; use anyhow::Result;
use futures::{channel::mpsc, StreamExt};
use libp2p::Multiaddr;
use prettytable::{row, Table};
use rand::rngs::OsRng;
use std::{io, io::Write, process, sync::Arc};
use structopt::StructOpt;
use swap::{
alice, bitcoin, bob,
cli::Options,
monero,
network::transport::{build, build_tor, SwapTransport},
recover::recover,
storage::Database,
Cmd, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK,
};
use tracing::info;
use xmr_btc::{alice::State0, cross_curve_dleq};
#[macro_use]
extern crate prettytable;
// TODO: Add root seed file instead of generating new seed each run.
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let opt = Options::from_args(); unimplemented!()
// This currently creates the directory if it's not there in the first place
let db = Database::open(std::path::Path::new("./.swap-db/")).unwrap();
match opt {
Options::Alice {
bitcoind_url,
monerod_url,
listen_addr,
tor_port,
} => {
info!("running swap node as Alice ...");
let bitcoin_wallet = bitcoin::Wallet::new("alice", bitcoind_url)
.await
.expect("failed to create bitcoin wallet");
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let monero_wallet = Arc::new(monero::Wallet::new(monerod_url));
let rng = &mut OsRng;
let a = bitcoin::SecretKey::new_random(rng);
let s_a = cross_curve_dleq::Scalar::random(rng);
let v_a = xmr_btc::monero::PrivateViewKey::new_random(rng);
let redeem_address = bitcoin_wallet.as_ref().new_address().await?;
let punish_address = redeem_address.clone();
let state0 = State0::new(
a,
s_a,
v_a,
// todo: get from CLI args
bitcoin::Amount::from_sat(100),
// todo: get from CLI args
monero::Amount::from_piconero(1000000),
REFUND_TIMELOCK,
PUNISH_TIMELOCK,
redeem_address,
punish_address,
);
let behaviour = alice::Behaviour::new(state0);
let local_key_pair = behaviour.identity();
let (listen_addr, _ac, transport) = match tor_port {
Some(tor_port) => {
let tor_secret_key = torut::onion::TorSecretKeyV3::generate();
let onion_address = tor_secret_key
.public()
.get_onion_address()
.get_address_without_dot_onion();
let onion_address_string = format!("/onion3/{}:{}", onion_address, tor_port);
let addr: Multiaddr = onion_address_string.parse()?;
let ac = create_tor_service(tor_secret_key, tor_port).await?;
let transport = build_tor(local_key_pair, Some((addr.clone(), tor_port)))?;
(addr, Some(ac), transport)
}
None => {
let transport = build(local_key_pair)?;
(listen_addr, None, transport)
}
};
swap_as_alice(
bitcoin_wallet,
monero_wallet,
db,
listen_addr,
transport,
behaviour,
)
.await?;
}
Options::Bob {
alice_addr,
satoshis,
bitcoind_url,
monerod_url,
tor,
} => {
info!("running swap node as Bob ...");
let behaviour = bob::Behaviour::default();
let local_key_pair = behaviour.identity();
let transport = match tor {
true => build_tor(local_key_pair, None)?,
false => build(local_key_pair)?,
};
let bitcoin_wallet = bitcoin::Wallet::new("bob", bitcoind_url)
.await
.expect("failed to create bitcoin wallet");
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let monero_wallet = Arc::new(monero::Wallet::new(monerod_url));
swap_as_bob(
bitcoin_wallet,
monero_wallet,
db,
satoshis,
alice_addr,
transport,
behaviour,
)
.await?;
}
Options::History => {
let mut table = Table::new();
table.add_row(row!["SWAP ID", "STATE"]);
for (swap_id, state) in db.all()? {
table.add_row(row![swap_id, state]);
}
// Print the table to stdout
table.printstd();
}
Options::Recover {
swap_id,
bitcoind_url,
monerod_url,
} => {
let state = db.get_state(swap_id)?;
let bitcoin_wallet = bitcoin::Wallet::new("bob", bitcoind_url)
.await
.expect("failed to create bitcoin wallet");
let monero_wallet = monero::Wallet::new(monerod_url);
recover(bitcoin_wallet, monero_wallet, state).await?;
}
}
Ok(())
}
async fn create_tor_service(
tor_secret_key: torut::onion::TorSecretKeyV3,
tor_port: u16,
) -> Result<swap::tor::AuthenticatedConnection> {
// TODO use configurable ports for tor connection
let mut authenticated_connection = swap::tor::UnauthenticatedConnection::default()
.init_authenticated_connection()
.await?;
tracing::info!("Tor authenticated.");
authenticated_connection
.add_service(tor_port, &tor_secret_key)
.await?;
tracing::info!("Tor service added.");
Ok(authenticated_connection)
}
async fn swap_as_alice(
bitcoin_wallet: Arc<swap::bitcoin::Wallet>,
monero_wallet: Arc<swap::monero::Wallet>,
db: Database,
addr: Multiaddr,
transport: SwapTransport,
behaviour: alice::Behaviour,
) -> Result<()> {
alice::swap(
bitcoin_wallet,
monero_wallet,
db,
addr,
transport,
behaviour,
)
.await
}
async fn swap_as_bob(
bitcoin_wallet: Arc<swap::bitcoin::Wallet>,
monero_wallet: Arc<swap::monero::Wallet>,
db: Database,
sats: u64,
alice: Multiaddr,
transport: SwapTransport,
behaviour: bob::Behaviour,
) -> Result<()> {
let (cmd_tx, mut cmd_rx) = mpsc::channel(1);
let (mut rsp_tx, rsp_rx) = mpsc::channel(1);
tokio::spawn(bob::swap(
bitcoin_wallet,
monero_wallet,
db,
sats,
alice,
cmd_tx,
rsp_rx,
transport,
behaviour,
));
loop {
let read = cmd_rx.next().await;
match read {
Some(cmd) => match cmd {
Cmd::VerifyAmounts(p) => {
let rsp = verify(p);
rsp_tx.try_send(rsp)?;
if rsp == Rsp::Abort {
process::exit(0);
}
}
},
None => {
info!("Channel closed from other end");
return Ok(());
}
}
}
}
fn verify(amounts: SwapAmounts) -> Rsp {
let mut s = String::new();
println!("Got rate from Alice for XMR/BTC swap\n");
println!("{}", amounts);
print!("Would you like to continue with this swap [y/N]: ");
let _ = io::stdout().flush();
io::stdin()
.read_line(&mut s)
.expect("Did not enter a correct string");
if let Some('\n') = s.chars().next_back() {
s.pop();
}
if let Some('\r') = s.chars().next_back() {
s.pop();
}
if !is_yes(&s) {
println!("No worries, try again later - Alice updates her rate regularly");
return Rsp::Abort;
}
Rsp::VerifiedAmounts
}
fn is_yes(s: &str) -> bool {
matches!(s, "y" | "Y" | "yes" | "YES" | "Yes")
} }

View File

@ -1,20 +1,22 @@
//! Run an XMR/BTC swap in the role of Bob. //! Run an XMR/BTC swap in the role of Bob.
//! Bob holds BTC and wishes receive XMR. //! Bob holds BTC and wishes receive XMR.
use anyhow::Result; use self::{amounts::*, message0::*, message1::*, message2::*, message3::*};
use crate::{
use async_trait::async_trait; network::{
use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; peer_tracker::{self, PeerTracker},
use futures::{ transport::SwapTransport,
channel::mpsc::{Receiver, Sender}, TokioExecutor,
FutureExt, StreamExt, },
SwapAmounts,
};
use anyhow::Result;
use libp2p::{core::identity::Keypair, NetworkBehaviour, PeerId};
use tracing::{debug, info};
use xmr_btc::{
alice,
bitcoin::EncryptedSignature,
bob::{self},
}; };
use genawaiter::GeneratorState;
use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId};
use rand::rngs::OsRng;
use std::{process, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use uuid::Uuid;
mod amounts; mod amounts;
pub mod event_loop; pub mod event_loop;
@ -25,219 +27,6 @@ mod message2;
mod message3; mod message3;
pub mod swap; pub mod swap;
use self::{amounts::*, message0::*, message1::*, message2::*, message3::*};
use crate::{
bitcoin::{self, TX_LOCK_MINE_TIMEOUT},
monero,
network::{
peer_tracker::{self, PeerTracker},
transport::SwapTransport,
TokioExecutor,
},
state,
storage::Database,
Cmd, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK,
};
use xmr_btc::{
alice,
bitcoin::{BroadcastSignedTransaction, EncryptedSignature, SignTxLock},
bob::{self, action_generator, ReceiveTransferProof, State0},
monero::CreateWalletForOutput,
};
#[allow(clippy::too_many_arguments)]
pub async fn swap(
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Database,
btc: u64,
addr: Multiaddr,
mut cmd_tx: Sender<Cmd>,
mut rsp_rx: Receiver<Rsp>,
transport: SwapTransport,
behaviour: Behaviour,
) -> Result<()> {
struct Network(Swarm);
// TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed
// to `ConstantBackoff`.
#[async_trait]
impl ReceiveTransferProof for Network {
async fn receive_transfer_proof(&mut self) -> monero::TransferProof {
#[derive(Debug)]
struct UnexpectedMessage;
let future = self.0.next().shared();
let proof = (|| async {
let proof = match future.clone().await {
OutEvent::Message2(msg) => msg.tx_lock_proof,
other => {
warn!("Expected transfer proof, got: {:?}", other);
return Err(backoff::Error::Transient(UnexpectedMessage));
}
};
Result::<_, backoff::Error<UnexpectedMessage>>::Ok(proof)
})
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await
.expect("transient errors to be retried");
info!("Received transfer proof");
proof
}
}
let mut swarm = new_swarm(transport, behaviour)?;
libp2p::Swarm::dial_addr(&mut swarm, addr)?;
let alice = match swarm.next().await {
OutEvent::ConnectionEstablished(alice) => alice,
other => panic!("unexpected event: {:?}", other),
};
info!("Connection established with: {}", alice);
swarm.request_amounts(alice.clone(), btc);
let (btc, xmr) = match swarm.next().await {
OutEvent::Amounts(amounts) => {
info!("Got amounts from Alice: {:?}", amounts);
let cmd = Cmd::VerifyAmounts(amounts);
cmd_tx.try_send(cmd)?;
let response = rsp_rx.next().await;
if response == Some(Rsp::Abort) {
info!("User rejected amounts proposed by Alice, aborting...");
process::exit(0);
}
info!("User accepted amounts proposed by Alice");
(amounts.btc, amounts.xmr)
}
other => panic!("unexpected event: {:?}", other),
};
let refund_address = bitcoin_wallet.new_address().await?;
// TODO: Pass this in using <R: RngCore + CryptoRng>
let rng = &mut OsRng;
let state0 = State0::new(
rng,
btc,
xmr,
REFUND_TIMELOCK,
PUNISH_TIMELOCK,
refund_address,
);
info!("Commencing handshake");
swarm.send_message0(alice.clone(), state0.next_message(rng));
let state1 = match swarm.next().await {
OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?,
other => panic!("unexpected event: {:?}", other),
};
swarm.send_message1(alice.clone(), state1.next_message());
let state2 = match swarm.next().await {
OutEvent::Message1(msg) => {
state1.receive(msg)? // TODO: Same as above.
}
other => panic!("unexpected event: {:?}", other),
};
let swap_id = Uuid::new_v4();
db.insert_latest_state(swap_id, state::Bob::Handshaken(state2.clone()).into())
.await?;
swarm.send_message2(alice.clone(), state2.next_message());
info!("Handshake complete");
let network = Arc::new(Mutex::new(Network(swarm)));
let mut action_generator = action_generator(
network.clone(),
monero_wallet.clone(),
bitcoin_wallet.clone(),
state2.clone(),
TX_LOCK_MINE_TIMEOUT,
);
loop {
let state = action_generator.async_resume().await;
info!("Resumed execution of generator, got: {:?}", state);
// TODO: Protect against transient errors
// TODO: Ignore transaction-already-in-block-chain errors
match state {
GeneratorState::Yielded(bob::Action::LockBtc(tx_lock)) => {
let signed_tx_lock = bitcoin_wallet.sign_tx_lock(tx_lock).await?;
let _ = bitcoin_wallet
.broadcast_signed_transaction(signed_tx_lock)
.await?;
db.insert_latest_state(swap_id, state::Bob::BtcLocked(state2.clone()).into())
.await?;
}
GeneratorState::Yielded(bob::Action::SendBtcRedeemEncsig(tx_redeem_encsig)) => {
db.insert_latest_state(swap_id, state::Bob::XmrLocked(state2.clone()).into())
.await?;
let mut guard = network.as_ref().lock().await;
guard.0.send_message3(alice.clone(), tx_redeem_encsig);
info!("Sent Bitcoin redeem encsig");
// FIXME: Having to wait for Alice's response here is a big problem, because
// we're stuck if she doesn't send her response back. I believe this is
// currently necessary, so we may have to rework this and/or how we use libp2p
match guard.0.next().shared().await {
OutEvent::Message3 => {
debug!("Got Message3 empty response");
}
other => panic!("unexpected event: {:?}", other),
};
}
GeneratorState::Yielded(bob::Action::CreateXmrWalletForOutput {
spend_key,
view_key,
}) => {
db.insert_latest_state(swap_id, state::Bob::BtcRedeemed(state2.clone()).into())
.await?;
monero_wallet
.create_and_load_wallet_for_output(spend_key, view_key)
.await?;
}
GeneratorState::Yielded(bob::Action::CancelBtc(tx_cancel)) => {
db.insert_latest_state(swap_id, state::Bob::BtcRefundable(state2.clone()).into())
.await?;
let _ = bitcoin_wallet
.broadcast_signed_transaction(tx_cancel)
.await?;
}
GeneratorState::Yielded(bob::Action::RefundBtc(tx_refund)) => {
db.insert_latest_state(swap_id, state::Bob::BtcRefundable(state2.clone()).into())
.await?;
let _ = bitcoin_wallet
.broadcast_signed_transaction(tx_refund)
.await?;
}
GeneratorState::Complete(()) => {
db.insert_latest_state(swap_id, state::Bob::SwapComplete.into())
.await?;
return Ok(());
}
}
}
}
pub type Swarm = libp2p::Swarm<Behaviour>; pub type Swarm = libp2p::Swarm<Behaviour>;
pub fn new_swarm(transport: SwapTransport, behaviour: Behaviour) -> Result<Swarm> { pub fn new_swarm(transport: SwapTransport, behaviour: Behaviour) -> Result<Swarm> {

View File

@ -2,7 +2,7 @@ use crate::{
bob::{Behaviour, OutEvent}, bob::{Behaviour, OutEvent},
network::{transport::SwapTransport, TokioExecutor}, network::{transport::SwapTransport, TokioExecutor},
}; };
use anyhow::Result; use anyhow::{anyhow, Result};
use futures::FutureExt; use futures::FutureExt;
use libp2p::{core::Multiaddr, PeerId}; use libp2p::{core::Multiaddr, PeerId};
use tokio::{ use tokio::{
@ -31,16 +31,16 @@ impl<T> Default for Channels<T> {
} }
pub struct EventLoopHandle { pub struct EventLoopHandle {
pub msg0: Receiver<alice::Message0>, msg0: Receiver<alice::Message0>,
pub msg1: Receiver<alice::Message1>, msg1: Receiver<alice::Message1>,
pub msg2: Receiver<alice::Message2>, msg2: Receiver<alice::Message2>,
pub request_amounts: Sender<(PeerId, ::bitcoin::Amount)>, request_amounts: Sender<(PeerId, ::bitcoin::Amount)>,
pub conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
pub dial_alice: Sender<Multiaddr>, dial_alice: Sender<Multiaddr>,
pub send_msg0: Sender<(PeerId, bob::Message0)>, send_msg0: Sender<(PeerId, bob::Message0)>,
pub send_msg1: Sender<(PeerId, bob::Message1)>, send_msg1: Sender<(PeerId, bob::Message1)>,
pub send_msg2: Sender<(PeerId, bob::Message2)>, send_msg2: Sender<(PeerId, bob::Message2)>,
pub send_msg3: Sender<(PeerId, EncryptedSignature)>, send_msg3: Sender<(PeerId, EncryptedSignature)>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -48,28 +48,28 @@ impl EventLoopHandle {
self.conn_established self.conn_established
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive connection established from Bob")) .ok_or_else(|| anyhow!("Failed to receive connection established from Bob"))
} }
pub async fn recv_message0(&mut self) -> Result<alice::Message0> { pub async fn recv_message0(&mut self) -> Result<alice::Message0> {
self.msg0 self.msg0
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Bob")) .ok_or_else(|| anyhow!("Failed to receive message 0 from Bob"))
} }
pub async fn recv_message1(&mut self) -> Result<alice::Message1> { pub async fn recv_message1(&mut self) -> Result<alice::Message1> {
self.msg1 self.msg1
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Bob")) .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob"))
} }
pub async fn recv_message2(&mut self) -> Result<alice::Message2> { pub async fn recv_message2(&mut self) -> Result<alice::Message2> {
self.msg2 self.msg2
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::Error::msg("Failed o receive message 2 from Bob")) .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob"))
} }
pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> {
@ -113,17 +113,17 @@ impl EventLoopHandle {
} }
pub struct EventLoop { pub struct EventLoop {
pub swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
pub msg0: Sender<alice::Message0>, msg0: Sender<alice::Message0>,
pub msg1: Sender<alice::Message1>, msg1: Sender<alice::Message1>,
pub msg2: Sender<alice::Message2>, msg2: Sender<alice::Message2>,
pub conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
pub request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>, request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>,
pub dial_alice: Receiver<Multiaddr>, dial_alice: Receiver<Multiaddr>,
pub send_msg0: Receiver<(PeerId, bob::Message0)>, send_msg0: Receiver<(PeerId, bob::Message0)>,
pub send_msg1: Receiver<(PeerId, bob::Message1)>, send_msg1: Receiver<(PeerId, bob::Message1)>,
pub send_msg2: Receiver<(PeerId, bob::Message2)>, send_msg2: Receiver<(PeerId, bob::Message2)>,
pub send_msg3: Receiver<(PeerId, EncryptedSignature)>, send_msg3: Receiver<(PeerId, EncryptedSignature)>,
} }
impl EventLoop { impl EventLoop {

View File

@ -53,7 +53,7 @@ impl fmt::Display for BobState {
pub async fn swap<R>( pub async fn swap<R>(
state: BobState, state: BobState,
swarm: EventLoopHandle, event_loop_handle: EventLoopHandle,
db: Database, db: Database,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>, monero_wallet: Arc<crate::monero::Wallet>,
@ -66,7 +66,7 @@ where
run_until( run_until(
state, state,
is_complete, is_complete,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -100,7 +100,7 @@ pub fn is_xmr_locked(state: &BobState) -> bool {
pub async fn run_until<R>( pub async fn run_until<R>(
state: BobState, state: BobState,
is_target_state: fn(&BobState) -> bool, is_target_state: fn(&BobState) -> bool,
mut swarm: EventLoopHandle, mut event_loop_handle: EventLoopHandle,
db: Database, db: Database,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>, monero_wallet: Arc<crate::monero::Wallet>,
@ -124,7 +124,7 @@ where
let state2 = negotiate( let state2 = negotiate(
state0, state0,
amounts, amounts,
&mut swarm, &mut event_loop_handle,
addr, addr,
&mut rng, &mut rng,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
@ -133,7 +133,7 @@ where
run_until( run_until(
BobState::Negotiated(state2, peer_id), BobState::Negotiated(state2, peer_id),
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -149,7 +149,7 @@ where
run_until( run_until(
BobState::BtcLocked(state3, alice_peer_id), BobState::BtcLocked(state3, alice_peer_id),
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -162,7 +162,7 @@ where
// Watch for Alice to Lock Xmr or for t1 to elapse // Watch for Alice to Lock Xmr or for t1 to elapse
BobState::BtcLocked(state3, alice_peer_id) => { BobState::BtcLocked(state3, alice_peer_id) => {
// todo: watch until t1, not indefinetely // todo: watch until t1, not indefinetely
let msg2 = swarm.recv_message2().await?; let msg2 = event_loop_handle.recv_message2().await?;
let state4 = state3 let state4 = state3
.watch_for_lock_xmr(monero_wallet.as_ref(), msg2) .watch_for_lock_xmr(monero_wallet.as_ref(), msg2)
.await?; .await?;
@ -170,7 +170,7 @@ where
run_until( run_until(
BobState::XmrLocked(state4, alice_peer_id), BobState::XmrLocked(state4, alice_peer_id),
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -187,14 +187,14 @@ where
// What if Alice fails to receive this? Should we always resend? // What if Alice fails to receive this? Should we always resend?
// todo: If we cannot dial Alice we should go to EncSigSent. Maybe dialing // todo: If we cannot dial Alice we should go to EncSigSent. Maybe dialing
// should happen in this arm? // should happen in this arm?
swarm event_loop_handle
.send_message3(alice_peer_id.clone(), tx_redeem_encsig) .send_message3(alice_peer_id.clone(), tx_redeem_encsig)
.await?; .await?;
run_until( run_until(
BobState::EncSigSent(state, alice_peer_id), BobState::EncSigSent(state, alice_peer_id),
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -213,7 +213,7 @@ where
run_until( run_until(
BobState::BtcRedeemed(val?), BobState::BtcRedeemed(val?),
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -232,7 +232,7 @@ where
run_until( run_until(
BobState::Cancelled(state), BobState::Cancelled(state),
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -250,7 +250,7 @@ where
run_until( run_until(
BobState::XmrRedeemed, BobState::XmrRedeemed,
is_target_state, is_target_state,
swarm, event_loop_handle,
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,

View File

@ -14,9 +14,6 @@ pub mod state;
pub mod storage; pub mod storage;
pub mod tor; pub mod tor;
pub const REFUND_TIMELOCK: u32 = 50; // Relative timelock, this is number of blocks. TODO: What should it be?
pub const PUNISH_TIMELOCK: u32 = 50; // FIXME: What should this be?
pub type Never = std::convert::Infallible; pub type Never = std::convert::Infallible;
/// Commands sent from Bob to the main task. /// Commands sent from Bob to the main task.

View File

@ -6,7 +6,7 @@ use rand::rngs::OsRng;
use std::sync::Arc; use std::sync::Arc;
use swap::{ use swap::{
alice, alice::swap::AliceState, bob, bob::swap::BobState, network::transport::build, alice, alice::swap::AliceState, bob, bob::swap::BobState, network::transport::build,
storage::Database, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, storage::Database, SwapAmounts,
}; };
use tempfile::tempdir; use tempfile::tempdir;
use testcontainers::clients::Cli; use testcontainers::clients::Cli;
@ -45,6 +45,8 @@ async fn happy_path() {
.parse() .parse()
.expect("failed to parse Alice's address"); .expect("failed to parse Alice's address");
let config = Config::regtest();
let ( let (
alice_state, alice_state,
mut alice_swarm_driver, mut alice_swarm_driver,
@ -60,6 +62,7 @@ async fn happy_path() {
xmr_to_swap, xmr_to_swap,
xmr_alice, xmr_alice,
alice_multiaddr.clone(), alice_multiaddr.clone(),
config,
) )
.await; .await;
@ -73,6 +76,7 @@ async fn happy_path() {
btc_bob, btc_bob,
xmr_to_swap, xmr_to_swap,
xmr_bob, xmr_bob,
config,
) )
.await; .await;
@ -81,7 +85,7 @@ async fn happy_path() {
alice_swarm_handle, alice_swarm_handle,
alice_btc_wallet.clone(), alice_btc_wallet.clone(),
alice_xmr_wallet.clone(), alice_xmr_wallet.clone(),
Config::regtest(), config,
); );
let _alice_swarm_fut = tokio::spawn(async move { alice_swarm_driver.run().await }); let _alice_swarm_fut = tokio::spawn(async move { alice_swarm_driver.run().await });
@ -149,6 +153,8 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
.parse() .parse()
.expect("failed to parse Alice's address"); .expect("failed to parse Alice's address");
let config = Config::regtest();
let ( let (
alice_state, alice_state,
mut alice_swarm, mut alice_swarm,
@ -164,6 +170,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
xmr_to_swap, xmr_to_swap,
alice_xmr_starting_balance, alice_xmr_starting_balance,
alice_multiaddr.clone(), alice_multiaddr.clone(),
config,
) )
.await; .await;
@ -177,6 +184,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
bob_btc_starting_balance, bob_btc_starting_balance,
xmr_to_swap, xmr_to_swap,
bob_xmr_starting_balance, bob_xmr_starting_balance,
config,
) )
.await; .await;
@ -220,6 +228,7 @@ async fn init_alice(
xmr_to_swap: xmr_btc::monero::Amount, xmr_to_swap: xmr_btc::monero::Amount,
xmr_starting_balance: xmr_btc::monero::Amount, xmr_starting_balance: xmr_btc::monero::Amount,
listen: Multiaddr, listen: Multiaddr,
config: Config,
) -> ( ) -> (
AliceState, AliceState,
alice::event_loop::EventLoop, alice::event_loop::EventLoop,
@ -261,8 +270,8 @@ async fn init_alice(
v_a, v_a,
amounts.btc, amounts.btc,
amounts.xmr, amounts.xmr,
REFUND_TIMELOCK, config.bitcoin_refund_timelock,
PUNISH_TIMELOCK, config.bitcoin_punish_timelock,
redeem_address, redeem_address,
punish_address, punish_address,
); );
@ -303,6 +312,7 @@ async fn init_bob(
btc_starting_balance: bitcoin::Amount, btc_starting_balance: bitcoin::Amount,
xmr_to_swap: xmr_btc::monero::Amount, xmr_to_swap: xmr_btc::monero::Amount,
xmr_stating_balance: xmr_btc::monero::Amount, xmr_stating_balance: xmr_btc::monero::Amount,
config: Config,
) -> ( ) -> (
BobState, BobState,
bob::event_loop::EventLoop, bob::event_loop::EventLoop,
@ -346,8 +356,8 @@ async fn init_bob(
&mut OsRng, &mut OsRng,
btc_to_swap, btc_to_swap,
xmr_to_swap, xmr_to_swap,
REFUND_TIMELOCK, config.bitcoin_refund_timelock,
PUNISH_TIMELOCK, config.bitcoin_punish_timelock,
refund_address, refund_address,
); );
let bob_state = BobState::Started { let bob_state = BobState::Started {

View File

@ -7,6 +7,8 @@ pub struct Config {
pub bitcoin_finality_confirmations: u32, pub bitcoin_finality_confirmations: u32,
pub bitcoin_avg_block_time: Duration, pub bitcoin_avg_block_time: Duration,
pub monero_max_finality_time: Duration, pub monero_max_finality_time: Duration,
pub bitcoin_refund_timelock: u32,
pub bitcoin_punish_timelock: u32,
} }
impl Config { impl Config {
@ -19,6 +21,8 @@ impl Config {
// blockchain is slow // blockchain is slow
monero_max_finality_time: (*mainnet::MONERO_AVG_BLOCK_TIME).mul_f64(1.5) monero_max_finality_time: (*mainnet::MONERO_AVG_BLOCK_TIME).mul_f64(1.5)
* mainnet::MONERO_FINALITY_CONFIRMATIONS, * mainnet::MONERO_FINALITY_CONFIRMATIONS,
bitcoin_refund_timelock: mainnet::BITCOIN_REFUND_TIMELOCK,
bitcoin_punish_timelock: mainnet::BITCOIN_PUNISH_TIMELOCK,
} }
} }
@ -31,6 +35,8 @@ impl Config {
// blockchain is slow // blockchain is slow
monero_max_finality_time: (*regtest::MONERO_AVG_BLOCK_TIME).mul_f64(1.5) monero_max_finality_time: (*regtest::MONERO_AVG_BLOCK_TIME).mul_f64(1.5)
* regtest::MONERO_FINALITY_CONFIRMATIONS, * regtest::MONERO_FINALITY_CONFIRMATIONS,
bitcoin_refund_timelock: regtest::BITCOIN_REFUND_TIMELOCK,
bitcoin_punish_timelock: regtest::BITCOIN_PUNISH_TIMELOCK,
} }
} }
} }
@ -48,6 +54,10 @@ mod mainnet {
pub static MONERO_FINALITY_CONFIRMATIONS: u32 = 15; pub static MONERO_FINALITY_CONFIRMATIONS: u32 = 15;
pub static MONERO_AVG_BLOCK_TIME: Lazy<Duration> = Lazy::new(|| Duration::from_secs(2 * 60)); pub static MONERO_AVG_BLOCK_TIME: Lazy<Duration> = Lazy::new(|| Duration::from_secs(2 * 60));
// Set to 12 hours, arbitrary value to be reviewed properly
pub static BITCOIN_REFUND_TIMELOCK: u32 = 72;
pub static BITCOIN_PUNISH_TIMELOCK: u32 = 72;
} }
mod regtest { mod regtest {
@ -63,4 +73,8 @@ mod regtest {
pub static MONERO_FINALITY_CONFIRMATIONS: u32 = 1; pub static MONERO_FINALITY_CONFIRMATIONS: u32 = 1;
pub static MONERO_AVG_BLOCK_TIME: Lazy<Duration> = Lazy::new(|| Duration::from_secs(60)); pub static MONERO_AVG_BLOCK_TIME: Lazy<Duration> = Lazy::new(|| Duration::from_secs(60));
pub static BITCOIN_REFUND_TIMELOCK: u32 = 50;
pub static BITCOIN_PUNISH_TIMELOCK: u32 = 50;
} }