diff --git a/swap/src/alice.rs b/swap/src/alice.rs index 64ff4660..1b7a2505 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -18,10 +18,13 @@ mod amounts; mod message0; mod message1; mod message2; +mod message3; -use self::{amounts::*, message0::*, message1::*, message2::*}; +use self::{amounts::*, message0::*, message1::*, message2::*, message3::*}; use crate::{ - bitcoin, monero, + bitcoin, + bitcoin::TX_LOCK_MINE_TIMEOUT, + monero, network::{ peer_tracker::{self, PeerTracker}, request_response::AliceToBob, @@ -117,7 +120,6 @@ pub async fn swap( ); swarm.set_state0(state0.clone()); - // TODO: Can we verify message 0 before calling this so we never fail? let state1 = state0.receive(message0).expect("failed to receive msg 0"); let (state2, channel) = match swarm.next().await { @@ -146,8 +148,12 @@ pub async fn swap( channel: Some(channel), })); - let mut action_generator = - action_generator(network.clone(), bitcoin_wallet.clone(), state3, 3600); + let mut action_generator = action_generator( + network.clone(), + bitcoin_wallet.clone(), + state3, + TX_LOCK_MINE_TIMEOUT, + ); loop { let state = action_generator.async_resume().await; @@ -242,6 +248,7 @@ pub enum OutEvent { msg: bob::Message2, channel: ResponseChannel, }, + Message3(bob::Message3), } impl From for OutEvent { @@ -284,6 +291,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: message3::OutEvent) -> Self { + match event { + message3::OutEvent::Msg(msg) => OutEvent::Message3(msg), + } + } +} + /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -294,6 +309,7 @@ pub struct Alice { message0: Message0, message1: Message1, message2: Message2, + message3: Message3, #[behaviour(ignore)] identity: Keypair, } @@ -347,6 +363,7 @@ impl Default for Alice { message0: Message0::default(), message1: Message1::default(), message2: Message2::default(), + message3: Message3::default(), identity, } } diff --git a/swap/src/alice/message3.rs b/swap/src/alice/message3.rs new file mode 100644 index 00000000..42e1a600 --- /dev/null +++ b/swap/src/alice/message3.rs @@ -0,0 +1,94 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use xmr_btc::bob; + +#[derive(Debug)] +pub enum OutEvent { + Msg(bob::Message3), +} + +/// A `NetworkBehaviour` that represents receiving of message 3 from Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message3 { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message3 { + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Message3 { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Message3 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request, channel, .. + }, + .. + } => match request { + BobToAlice::Message3(msg) => { + self.events.push_back(OutEvent::Msg(msg)); + // Send back empty response so that the request/response protocol completes. + self.rr.send_response(channel, AliceToBob::Message3); + } + other => debug!("got request: {:?}", other), + }, + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } => panic!("Alice should not get a Response"), + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/bitcoin.rs b/swap/src/bitcoin.rs index fb0f63e5..f59c5eda 100644 --- a/swap/src/bitcoin.rs +++ b/swap/src/bitcoin.rs @@ -12,6 +12,8 @@ use xmr_btc::bitcoin::{ TransactionBlockHeight, TxLock, Txid, WatchForRawTransaction, }; +pub const TX_LOCK_MINE_TIMEOUT: u64 = 3600; + // This is cut'n'paste from xmr_btc/tests/harness/wallet/bitcoin.rs #[derive(Debug)] diff --git a/swap/src/bob.rs b/swap/src/bob.rs index 9bcb302e..97327448 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -1,46 +1,61 @@ //! Run an XMR/BTC swap in the role of Bob. //! Bob holds BTC and wishes receive XMR. use anyhow::Result; +use async_trait::async_trait; use futures::{ channel::mpsc::{Receiver, Sender}, StreamExt, }; +use genawaiter::GeneratorState; use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use rand::rngs::OsRng; -use std::{process, thread}; +use std::{process, sync::Arc}; +use tokio::sync::Mutex; use tracing::{debug, info}; mod amounts; mod message0; mod message1; mod message2; +mod message3; -use self::{amounts::*, message0::*, message1::*, message2::*}; +use self::{amounts::*, message0::*, message1::*, message2::*, message3::*}; use crate::{ + bitcoin, + bitcoin::TX_LOCK_MINE_TIMEOUT, + monero, network::{ peer_tracker::{self, PeerTracker}, transport, TokioExecutor, }, - Cmd, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, + Cmd, Never, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, }; use xmr_btc::{ alice, - bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}, - bob::{self, State0}, + bitcoin::{BroadcastSignedTransaction, EncryptedSignature, SignTxLock}, + bob::{self, action_generator, ReceiveTransferProof, State0}, + monero::CreateWalletForOutput, }; // FIXME: This whole function is horrible, needs total re-write. -pub async fn swap( +pub async fn swap( + bitcoin_wallet: Arc, + monero_wallet: Arc, btc: u64, addr: Multiaddr, mut cmd_tx: Sender, mut rsp_rx: Receiver, refund_address: ::bitcoin::Address, - wallet: W, -) -> Result<()> -where - W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, -{ +) -> Result<()> { + struct Network(Swarm); + + #[async_trait] + impl ReceiveTransferProof for Network { + async fn receive_transfer_proof(&mut self) -> monero::TransferProof { + todo!() + } + } + let mut swarm = new_swarm()?; libp2p::Swarm::dial_addr(&mut swarm, addr)?; @@ -82,11 +97,7 @@ where swarm.send_message0(alice.clone(), state0.next_message(rng)); let state1 = match swarm.next().await { - OutEvent::Message0(msg) => { - // TODO: Verify the response message before calling receive() and handle any - // error gracefully. - state0.receive(&wallet, msg).await? - } + OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?, other => panic!("unexpected event: {:?}", other), }; @@ -102,8 +113,53 @@ where info!("Handshake complete, we now have State2 for Bob."); - thread::park(); - Ok(()) + let network = Arc::new(Mutex::new(Network(swarm))); + + let mut action_generator = action_generator( + network.clone(), + monero_wallet.clone(), + bitcoin_wallet.clone(), + state2, + TX_LOCK_MINE_TIMEOUT, + ); + + loop { + let state = action_generator.async_resume().await; + + info!("resumed execution of bob generator, got: {:?}", state); + + match state { + GeneratorState::Yielded(bob::Action::LockBtc(tx_lock)) => { + let signed_tx_lock = bitcoin_wallet.sign_tx_lock(tx_lock).await?; + let _ = bitcoin_wallet + .broadcast_signed_transaction(signed_tx_lock) + .await?; + } + GeneratorState::Yielded(bob::Action::SendBtcRedeemEncsig(tx_redeem_encsig)) => { + let mut guard = network.as_ref().lock().await; + guard.0.send_message3(alice.clone(), tx_redeem_encsig); + } + GeneratorState::Yielded(bob::Action::CreateXmrWalletForOutput { + spend_key, + view_key, + }) => { + monero_wallet + .create_and_load_wallet_for_output(spend_key, view_key) + .await?; + } + GeneratorState::Yielded(bob::Action::CancelBtc(tx_cancel)) => { + let _ = bitcoin_wallet + .broadcast_signed_transaction(tx_cancel) + .await?; + } + GeneratorState::Yielded(bob::Action::RefundBtc(tx_refund)) => { + let _ = bitcoin_wallet + .broadcast_signed_transaction(tx_refund) + .await?; + } + GeneratorState::Complete(()) => return Ok(()), + } + } } pub type Swarm = libp2p::Swarm; @@ -188,6 +244,12 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(_: Never) -> Self { + panic!("not ever") + } +} + /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -198,6 +260,7 @@ pub struct Bob { message0: Message0, message1: Message1, message2: Message2, + message3: Message3, #[behaviour(ignore)] identity: Keypair, } @@ -233,6 +296,12 @@ impl Bob { self.message2.send(alice, msg) } + /// Sends Bob's fourth message to Alice. + pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) { + let msg = bob::Message3 { tx_redeem_encsig }; + self.message3.send(alice, msg) + } + /// Returns Alice's peer id if we are connected. pub fn peer_id_of_alice(&self) -> Option { self.pt.counterparty_peer_id() @@ -249,6 +318,7 @@ impl Default for Bob { message0: Message0::default(), message1: Message1::default(), message2: Message2::default(), + message3: Message3::default(), identity, } } diff --git a/swap/src/bob/message3.rs b/swap/src/bob/message3.rs new file mode 100644 index 00000000..b9567e81 --- /dev/null +++ b/swap/src/bob/message3.rs @@ -0,0 +1,84 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use std::{ + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::{ + network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}, + Never, +}; +use xmr_btc::bob; + +/// A `NetworkBehaviour` that represents sending message 3 to Alice. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "Never", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message3 { + rr: RequestResponse, +} + +impl Message3 { + pub fn send(&mut self, alice: PeerId, msg: bob::Message3) { + let msg = BobToAlice::Message3(msg); + let _id = self.rr.send_request(&alice, msg); + } + + // TODO: Do we need a custom implementation if we are not bubbling any out + // events? + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, Never>> { + Poll::Pending + } +} + +impl Default for Message3 { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + } + } +} + +impl NetworkBehaviourEventProcess> for Message3 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { .. }, + .. + } => panic!("Bob should never get a request from Alice"), + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { response, .. }, + .. + } => match response { + AliceToBob::Message3 => debug!("Alice correctly responded to message 3"), + other => debug!("unexpected response: {:?}", other), + }, + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/main.rs b/swap/src/main.rs index 6dc1410c..7c93159d 100644 --- a/swap/src/main.rs +++ b/swap/src/main.rs @@ -26,7 +26,6 @@ mod trace; use cli::Options; use swap::{alice, bitcoin, bob, monero, Cmd, Rsp, SwapAmounts}; -use xmr_btc::bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}; // TODO: Add root seed file instead of generating new seed each run. // TODO: Remove all instances of the todo! macro @@ -92,28 +91,31 @@ async fn main() -> Result<()> { } else { info!("running swap node as Bob ..."); - let alice_address = match opt.alice_address { + let alice = match opt.alice_address { Some(addr) => addr, None => bail!("Address required to dial"), }; - let alice_address = multiaddr(&alice_address)?; + let alice = multiaddr(&alice)?; let url = Url::parse(BITCOIND_JSON_RPC_URL).expect("failed to parse url"); let bitcoin_wallet = bitcoin::Wallet::new("bob", &url) .await .expect("failed to create bitcoin wallet"); + let monero_wallet = Arc::new(monero::Wallet::localhost(MONERO_WALLET_RPC_PORT)); + let refund = bitcoin_wallet .new_address() .await .expect("failed to get new address"); + let bitcoin_wallet = Arc::new(bitcoin_wallet); match (opt.piconeros, opt.satoshis) { (Some(_), Some(_)) => bail!("Please supply only a single amount to swap"), (None, None) => bail!("Please supply an amount to swap"), (Some(_picos), _) => todo!("support starting with picos"), (None, Some(sats)) => { - swap_as_bob(sats, alice_address, refund, bitcoin_wallet).await?; + swap_as_bob(bitcoin_wallet, monero_wallet, sats, alice, refund).await?; } }; } @@ -164,18 +166,24 @@ async fn swap_as_alice( } } -async fn swap_as_bob( +async fn swap_as_bob( + bitcoin_wallet: Arc, + monero_wallet: Arc, sats: u64, alice: Multiaddr, refund: ::bitcoin::Address, - wallet: W, -) -> Result<()> -where - W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, -{ +) -> Result<()> { let (cmd_tx, mut cmd_rx) = mpsc::channel(1); let (mut rsp_tx, rsp_rx) = mpsc::channel(1); - tokio::spawn(bob::swap(sats, alice, cmd_tx, rsp_rx, refund, wallet)); + tokio::spawn(bob::swap( + bitcoin_wallet, + monero_wallet, + sats, + alice, + cmd_tx, + rsp_rx, + refund, + )); loop { let read = cmd_rx.next().await; diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 709013b6..697fa052 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -26,6 +26,7 @@ pub enum BobToAlice { Message0(bob::Message0), Message1(bob::Message1), Message2(bob::Message2), + Message3(bob::Message3), } /// Messages Alice sends to Bob. @@ -36,6 +37,7 @@ pub enum AliceToBob { Message0(alice::Message0), Message1(alice::Message1), Message2(alice::Message2), + Message3, // empty response } #[derive(Debug, Clone, Copy, Default)] diff --git a/xmr-btc/src/bob.rs b/xmr-btc/src/bob.rs index ac1180c9..f57d062c 100644 --- a/xmr-btc/src/bob.rs +++ b/xmr-btc/src/bob.rs @@ -28,7 +28,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::time::timeout; +use tokio::{sync::Mutex, time::timeout}; use tracing::error; pub mod message; @@ -62,7 +62,7 @@ pub trait ReceiveTransferProof { /// The argument `bitcoin_tx_lock_timeout` is used to determine how long we will /// wait for Bob, the caller of this function, to lock up the bitcoin. pub fn action_generator( - mut network: N, + network: Arc>, monero_client: Arc, bitcoin_client: Arc, // TODO: Replace this with a new, slimmer struct? @@ -85,7 +85,7 @@ pub fn action_generator( bitcoin_tx_lock_timeout: u64, ) -> GenBoxed where - N: ReceiveTransferProof + Send + Sync + 'static, + N: ReceiveTransferProof + Send + 'static, M: monero::WatchForTransfer + Send + Sync + 'static, B: bitcoin::BlockHeight + bitcoin::TransactionBlockHeight @@ -140,14 +140,19 @@ where .shared(); pin_mut!(poll_until_btc_has_expired); - let transfer_proof = match select( - network.receive_transfer_proof(), - poll_until_btc_has_expired.clone(), - ) - .await - { - Either::Left((proof, _)) => proof, - Either::Right(_) => return Err(SwapFailed::AfterBtcLock(Reason::BtcExpired)), + let transfer_proof = { + let mut guard = network.as_ref().lock().await; + let transfer_proof = match select( + guard.receive_transfer_proof(), + poll_until_btc_has_expired.clone(), + ) + .await + { + Either::Left((proof, _)) => proof, + Either::Right(_) => return Err(SwapFailed::AfterBtcLock(Reason::BtcExpired)), + }; + + transfer_proof }; let S_b_monero = monero::PublicKey::from_private_key(&monero::PrivateKey::from_scalar( diff --git a/xmr-btc/src/bob/message.rs b/xmr-btc/src/bob/message.rs index b6bed872..178c0218 100644 --- a/xmr-btc/src/bob/message.rs +++ b/xmr-btc/src/bob/message.rs @@ -33,9 +33,9 @@ pub struct Message2 { pub(crate) tx_cancel_sig: Signature, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message3 { - pub(crate) tx_redeem_encsig: EncryptedSignature, + pub tx_redeem_encsig: EncryptedSignature, } impl_try_from_parent_enum!(Message0, Message);