Execute Bob's on-chain protocol after handshake

Co-authored-by: Tobin C. Harding <tobin@coblox.tech>
This commit is contained in:
Lucas Soriano del Pino 2020-10-27 13:26:40 +11:00
parent 4ee82a5a2a
commit a4e4c27bee
9 changed files with 329 additions and 47 deletions

View File

@ -18,10 +18,13 @@ mod amounts;
mod message0; mod message0;
mod message1; mod message1;
mod message2; mod message2;
mod message3;
use self::{amounts::*, message0::*, message1::*, message2::*}; use self::{amounts::*, message0::*, message1::*, message2::*, message3::*};
use crate::{ use crate::{
bitcoin, monero, bitcoin,
bitcoin::TX_LOCK_MINE_TIMEOUT,
monero,
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
request_response::AliceToBob, request_response::AliceToBob,
@ -117,7 +120,6 @@ pub async fn swap(
); );
swarm.set_state0(state0.clone()); swarm.set_state0(state0.clone());
// TODO: Can we verify message 0 before calling this so we never fail?
let state1 = state0.receive(message0).expect("failed to receive msg 0"); let state1 = state0.receive(message0).expect("failed to receive msg 0");
let (state2, channel) = match swarm.next().await { let (state2, channel) = match swarm.next().await {
@ -146,8 +148,12 @@ pub async fn swap(
channel: Some(channel), channel: Some(channel),
})); }));
let mut action_generator = let mut action_generator = action_generator(
action_generator(network.clone(), bitcoin_wallet.clone(), state3, 3600); network.clone(),
bitcoin_wallet.clone(),
state3,
TX_LOCK_MINE_TIMEOUT,
);
loop { loop {
let state = action_generator.async_resume().await; let state = action_generator.async_resume().await;
@ -242,6 +248,7 @@ pub enum OutEvent {
msg: bob::Message2, msg: bob::Message2,
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<AliceToBob>,
}, },
Message3(bob::Message3),
} }
impl From<peer_tracker::OutEvent> for OutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
@ -284,6 +291,14 @@ impl From<message2::OutEvent> for OutEvent {
} }
} }
impl From<message3::OutEvent> for OutEvent {
fn from(event: message3::OutEvent) -> Self {
match event {
message3::OutEvent::Msg(msg) => OutEvent::Message3(msg),
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
@ -294,6 +309,7 @@ pub struct Alice {
message0: Message0, message0: Message0,
message1: Message1, message1: Message1,
message2: Message2, message2: Message2,
message3: Message3,
#[behaviour(ignore)] #[behaviour(ignore)]
identity: Keypair, identity: Keypair,
} }
@ -347,6 +363,7 @@ impl Default for Alice {
message0: Message0::default(), message0: Message0::default(),
message1: Message1::default(), message1: Message1::default(),
message2: Message2::default(), message2: Message2::default(),
message3: Message3::default(),
identity, identity,
} }
} }

View File

@ -0,0 +1,94 @@
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT};
use xmr_btc::bob;
#[derive(Debug)]
pub enum OutEvent {
Msg(bob::Message3),
}
/// A `NetworkBehaviour` that represents receiving of message 3 from Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Message3 {
rr: RequestResponse<Codec>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Message3 {
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Message3 {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Message3 {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => match request {
BobToAlice::Message3(msg) => {
self.events.push_back(OutEvent::Msg(msg));
// Send back empty response so that the request/response protocol completes.
self.rr.send_response(channel, AliceToBob::Message3);
}
other => debug!("got request: {:?}", other),
},
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Alice should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
}
}
}

View File

@ -12,6 +12,8 @@ use xmr_btc::bitcoin::{
TransactionBlockHeight, TxLock, Txid, WatchForRawTransaction, TransactionBlockHeight, TxLock, Txid, WatchForRawTransaction,
}; };
pub const TX_LOCK_MINE_TIMEOUT: u64 = 3600;
// This is cut'n'paste from xmr_btc/tests/harness/wallet/bitcoin.rs // This is cut'n'paste from xmr_btc/tests/harness/wallet/bitcoin.rs
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,46 +1,61 @@
//! Run an XMR/BTC swap in the role of Bob. //! Run an XMR/BTC swap in the role of Bob.
//! Bob holds BTC and wishes receive XMR. //! Bob holds BTC and wishes receive XMR.
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait;
use futures::{ use futures::{
channel::mpsc::{Receiver, Sender}, channel::mpsc::{Receiver, Sender},
StreamExt, StreamExt,
}; };
use genawaiter::GeneratorState;
use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{process, thread}; use std::{process, sync::Arc};
use tokio::sync::Mutex;
use tracing::{debug, info}; use tracing::{debug, info};
mod amounts; mod amounts;
mod message0; mod message0;
mod message1; mod message1;
mod message2; mod message2;
mod message3;
use self::{amounts::*, message0::*, message1::*, message2::*}; use self::{amounts::*, message0::*, message1::*, message2::*, message3::*};
use crate::{ use crate::{
bitcoin,
bitcoin::TX_LOCK_MINE_TIMEOUT,
monero,
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
transport, TokioExecutor, transport, TokioExecutor,
}, },
Cmd, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, Cmd, Never, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK,
}; };
use xmr_btc::{ use xmr_btc::{
alice, alice,
bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}, bitcoin::{BroadcastSignedTransaction, EncryptedSignature, SignTxLock},
bob::{self, State0}, bob::{self, action_generator, ReceiveTransferProof, State0},
monero::CreateWalletForOutput,
}; };
// FIXME: This whole function is horrible, needs total re-write. // FIXME: This whole function is horrible, needs total re-write.
pub async fn swap<W>( pub async fn swap(
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
btc: u64, btc: u64,
addr: Multiaddr, addr: Multiaddr,
mut cmd_tx: Sender<Cmd>, mut cmd_tx: Sender<Cmd>,
mut rsp_rx: Receiver<Rsp>, mut rsp_rx: Receiver<Rsp>,
refund_address: ::bitcoin::Address, refund_address: ::bitcoin::Address,
wallet: W, ) -> Result<()> {
) -> Result<()> struct Network(Swarm);
where
W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, #[async_trait]
{ impl ReceiveTransferProof for Network {
async fn receive_transfer_proof(&mut self) -> monero::TransferProof {
todo!()
}
}
let mut swarm = new_swarm()?; let mut swarm = new_swarm()?;
libp2p::Swarm::dial_addr(&mut swarm, addr)?; libp2p::Swarm::dial_addr(&mut swarm, addr)?;
@ -82,11 +97,7 @@ where
swarm.send_message0(alice.clone(), state0.next_message(rng)); swarm.send_message0(alice.clone(), state0.next_message(rng));
let state1 = match swarm.next().await { let state1 = match swarm.next().await {
OutEvent::Message0(msg) => { OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?,
// TODO: Verify the response message before calling receive() and handle any
// error gracefully.
state0.receive(&wallet, msg).await?
}
other => panic!("unexpected event: {:?}", other), other => panic!("unexpected event: {:?}", other),
}; };
@ -102,8 +113,53 @@ where
info!("Handshake complete, we now have State2 for Bob."); info!("Handshake complete, we now have State2 for Bob.");
thread::park(); let network = Arc::new(Mutex::new(Network(swarm)));
Ok(())
let mut action_generator = action_generator(
network.clone(),
monero_wallet.clone(),
bitcoin_wallet.clone(),
state2,
TX_LOCK_MINE_TIMEOUT,
);
loop {
let state = action_generator.async_resume().await;
info!("resumed execution of bob generator, got: {:?}", state);
match state {
GeneratorState::Yielded(bob::Action::LockBtc(tx_lock)) => {
let signed_tx_lock = bitcoin_wallet.sign_tx_lock(tx_lock).await?;
let _ = bitcoin_wallet
.broadcast_signed_transaction(signed_tx_lock)
.await?;
}
GeneratorState::Yielded(bob::Action::SendBtcRedeemEncsig(tx_redeem_encsig)) => {
let mut guard = network.as_ref().lock().await;
guard.0.send_message3(alice.clone(), tx_redeem_encsig);
}
GeneratorState::Yielded(bob::Action::CreateXmrWalletForOutput {
spend_key,
view_key,
}) => {
monero_wallet
.create_and_load_wallet_for_output(spend_key, view_key)
.await?;
}
GeneratorState::Yielded(bob::Action::CancelBtc(tx_cancel)) => {
let _ = bitcoin_wallet
.broadcast_signed_transaction(tx_cancel)
.await?;
}
GeneratorState::Yielded(bob::Action::RefundBtc(tx_refund)) => {
let _ = bitcoin_wallet
.broadcast_signed_transaction(tx_refund)
.await?;
}
GeneratorState::Complete(()) => return Ok(()),
}
}
} }
pub type Swarm = libp2p::Swarm<Bob>; pub type Swarm = libp2p::Swarm<Bob>;
@ -188,6 +244,12 @@ impl From<message2::OutEvent> for OutEvent {
} }
} }
impl From<Never> for OutEvent {
fn from(_: Never) -> Self {
panic!("not ever")
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
@ -198,6 +260,7 @@ pub struct Bob {
message0: Message0, message0: Message0,
message1: Message1, message1: Message1,
message2: Message2, message2: Message2,
message3: Message3,
#[behaviour(ignore)] #[behaviour(ignore)]
identity: Keypair, identity: Keypair,
} }
@ -233,6 +296,12 @@ impl Bob {
self.message2.send(alice, msg) self.message2.send(alice, msg)
} }
/// Sends Bob's fourth message to Alice.
pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) {
let msg = bob::Message3 { tx_redeem_encsig };
self.message3.send(alice, msg)
}
/// Returns Alice's peer id if we are connected. /// Returns Alice's peer id if we are connected.
pub fn peer_id_of_alice(&self) -> Option<PeerId> { pub fn peer_id_of_alice(&self) -> Option<PeerId> {
self.pt.counterparty_peer_id() self.pt.counterparty_peer_id()
@ -249,6 +318,7 @@ impl Default for Bob {
message0: Message0::default(), message0: Message0::default(),
message1: Message1::default(), message1: Message1::default(),
message2: Message2::default(), message2: Message2::default(),
message3: Message3::default(),
identity, identity,
} }
} }

84
swap/src/bob/message3.rs Normal file
View File

@ -0,0 +1,84 @@
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use std::{
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT},
Never,
};
use xmr_btc::bob;
/// A `NetworkBehaviour` that represents sending message 3 to Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Never", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Message3 {
rr: RequestResponse<Codec>,
}
impl Message3 {
pub fn send(&mut self, alice: PeerId, msg: bob::Message3) {
let msg = BobToAlice::Message3(msg);
let _id = self.rr.send_request(&alice, msg);
}
// TODO: Do we need a custom implementation if we are not bubbling any out
// events?
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, Never>> {
Poll::Pending
}
}
impl Default for Message3 {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Protocol, ProtocolSupport::Full)],
config,
),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Message3 {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => panic!("Bob should never get a request from Alice"),
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => match response {
AliceToBob::Message3 => debug!("Alice correctly responded to message 3"),
other => debug!("unexpected response: {:?}", other),
},
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
}
}
}

View File

@ -26,7 +26,6 @@ mod trace;
use cli::Options; use cli::Options;
use swap::{alice, bitcoin, bob, monero, Cmd, Rsp, SwapAmounts}; use swap::{alice, bitcoin, bob, monero, Cmd, Rsp, SwapAmounts};
use xmr_btc::bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock};
// TODO: Add root seed file instead of generating new seed each run. // TODO: Add root seed file instead of generating new seed each run.
// TODO: Remove all instances of the todo! macro // TODO: Remove all instances of the todo! macro
@ -92,28 +91,31 @@ async fn main() -> Result<()> {
} else { } else {
info!("running swap node as Bob ..."); info!("running swap node as Bob ...");
let alice_address = match opt.alice_address { let alice = match opt.alice_address {
Some(addr) => addr, Some(addr) => addr,
None => bail!("Address required to dial"), None => bail!("Address required to dial"),
}; };
let alice_address = multiaddr(&alice_address)?; let alice = multiaddr(&alice)?;
let url = Url::parse(BITCOIND_JSON_RPC_URL).expect("failed to parse url"); let url = Url::parse(BITCOIND_JSON_RPC_URL).expect("failed to parse url");
let bitcoin_wallet = bitcoin::Wallet::new("bob", &url) let bitcoin_wallet = bitcoin::Wallet::new("bob", &url)
.await .await
.expect("failed to create bitcoin wallet"); .expect("failed to create bitcoin wallet");
let monero_wallet = Arc::new(monero::Wallet::localhost(MONERO_WALLET_RPC_PORT));
let refund = bitcoin_wallet let refund = bitcoin_wallet
.new_address() .new_address()
.await .await
.expect("failed to get new address"); .expect("failed to get new address");
let bitcoin_wallet = Arc::new(bitcoin_wallet);
match (opt.piconeros, opt.satoshis) { match (opt.piconeros, opt.satoshis) {
(Some(_), Some(_)) => bail!("Please supply only a single amount to swap"), (Some(_), Some(_)) => bail!("Please supply only a single amount to swap"),
(None, None) => bail!("Please supply an amount to swap"), (None, None) => bail!("Please supply an amount to swap"),
(Some(_picos), _) => todo!("support starting with picos"), (Some(_picos), _) => todo!("support starting with picos"),
(None, Some(sats)) => { (None, Some(sats)) => {
swap_as_bob(sats, alice_address, refund, bitcoin_wallet).await?; swap_as_bob(bitcoin_wallet, monero_wallet, sats, alice, refund).await?;
} }
}; };
} }
@ -164,18 +166,24 @@ async fn swap_as_alice(
} }
} }
async fn swap_as_bob<W>( async fn swap_as_bob(
bitcoin_wallet: Arc<swap::bitcoin::Wallet>,
monero_wallet: Arc<swap::monero::Wallet>,
sats: u64, sats: u64,
alice: Multiaddr, alice: Multiaddr,
refund: ::bitcoin::Address, refund: ::bitcoin::Address,
wallet: W, ) -> Result<()> {
) -> Result<()>
where
W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static,
{
let (cmd_tx, mut cmd_rx) = mpsc::channel(1); let (cmd_tx, mut cmd_rx) = mpsc::channel(1);
let (mut rsp_tx, rsp_rx) = mpsc::channel(1); let (mut rsp_tx, rsp_rx) = mpsc::channel(1);
tokio::spawn(bob::swap(sats, alice, cmd_tx, rsp_rx, refund, wallet)); tokio::spawn(bob::swap(
bitcoin_wallet,
monero_wallet,
sats,
alice,
cmd_tx,
rsp_rx,
refund,
));
loop { loop {
let read = cmd_rx.next().await; let read = cmd_rx.next().await;

View File

@ -26,6 +26,7 @@ pub enum BobToAlice {
Message0(bob::Message0), Message0(bob::Message0),
Message1(bob::Message1), Message1(bob::Message1),
Message2(bob::Message2), Message2(bob::Message2),
Message3(bob::Message3),
} }
/// Messages Alice sends to Bob. /// Messages Alice sends to Bob.
@ -36,6 +37,7 @@ pub enum AliceToBob {
Message0(alice::Message0), Message0(alice::Message0),
Message1(alice::Message1), Message1(alice::Message1),
Message2(alice::Message2), Message2(alice::Message2),
Message3, // empty response
} }
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]

View File

@ -28,7 +28,7 @@ use std::{
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use tokio::time::timeout; use tokio::{sync::Mutex, time::timeout};
use tracing::error; use tracing::error;
pub mod message; pub mod message;
@ -62,7 +62,7 @@ pub trait ReceiveTransferProof {
/// The argument `bitcoin_tx_lock_timeout` is used to determine how long we will /// The argument `bitcoin_tx_lock_timeout` is used to determine how long we will
/// wait for Bob, the caller of this function, to lock up the bitcoin. /// wait for Bob, the caller of this function, to lock up the bitcoin.
pub fn action_generator<N, M, B>( pub fn action_generator<N, M, B>(
mut network: N, network: Arc<Mutex<N>>,
monero_client: Arc<M>, monero_client: Arc<M>,
bitcoin_client: Arc<B>, bitcoin_client: Arc<B>,
// TODO: Replace this with a new, slimmer struct? // TODO: Replace this with a new, slimmer struct?
@ -85,7 +85,7 @@ pub fn action_generator<N, M, B>(
bitcoin_tx_lock_timeout: u64, bitcoin_tx_lock_timeout: u64,
) -> GenBoxed<Action, (), ()> ) -> GenBoxed<Action, (), ()>
where where
N: ReceiveTransferProof + Send + Sync + 'static, N: ReceiveTransferProof + Send + 'static,
M: monero::WatchForTransfer + Send + Sync + 'static, M: monero::WatchForTransfer + Send + Sync + 'static,
B: bitcoin::BlockHeight B: bitcoin::BlockHeight
+ bitcoin::TransactionBlockHeight + bitcoin::TransactionBlockHeight
@ -140,14 +140,19 @@ where
.shared(); .shared();
pin_mut!(poll_until_btc_has_expired); pin_mut!(poll_until_btc_has_expired);
let transfer_proof = match select( let transfer_proof = {
network.receive_transfer_proof(), let mut guard = network.as_ref().lock().await;
poll_until_btc_has_expired.clone(), let transfer_proof = match select(
) guard.receive_transfer_proof(),
.await poll_until_btc_has_expired.clone(),
{ )
Either::Left((proof, _)) => proof, .await
Either::Right(_) => return Err(SwapFailed::AfterBtcLock(Reason::BtcExpired)), {
Either::Left((proof, _)) => proof,
Either::Right(_) => return Err(SwapFailed::AfterBtcLock(Reason::BtcExpired)),
};
transfer_proof
}; };
let S_b_monero = monero::PublicKey::from_private_key(&monero::PrivateKey::from_scalar( let S_b_monero = monero::PublicKey::from_private_key(&monero::PrivateKey::from_scalar(

View File

@ -33,9 +33,9 @@ pub struct Message2 {
pub(crate) tx_cancel_sig: Signature, pub(crate) tx_cancel_sig: Signature,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message3 { pub struct Message3 {
pub(crate) tx_redeem_encsig: EncryptedSignature, pub tx_redeem_encsig: EncryptedSignature,
} }
impl_try_from_parent_enum!(Message0, Message); impl_try_from_parent_enum!(Message0, Message);