diff --git a/swap/src/alice.rs b/swap/src/alice.rs index 09423290..958144f1 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -8,13 +8,14 @@ use libp2p::{ }; use rand::rngs::OsRng; use std::thread; -use tracing::debug; +use tracing::{debug, info}; mod amounts; mod message0; mod message1; +mod message2; -use self::{amounts::*, message0::*, message1::*}; +use self::{amounts::*, message0::*, message1::*, message2::*}; use crate::{ network::{ peer_tracker::{self, PeerTracker}, @@ -42,7 +43,7 @@ pub async fn swap( loop { match swarm.next().await { OutEvent::ConnectionEstablished(id) => { - tracing::info!("Connection established with: {}", id); + info!("Connection established with: {}", id); } OutEvent::Request(amounts::OutEvent::Btc { btc, channel }) => { debug!("Got request from Bob to swap {}", btc); @@ -98,9 +99,13 @@ pub async fn swap( let msg = state2.next_message(); swarm.send_message1(channel, msg); - tracing::info!("handshake complete, we now have State2 for Alice."); + let _state3 = match swarm.next().await { + OutEvent::Message2(msg) => state2.receive(msg)?, + other => panic!("Unexpected event: {:?}", other), + }; + + info!("Handshake complete, we now have State3 for Alice."); - tracing::warn!("parking thread ..."); thread::park(); Ok(()) } @@ -124,7 +129,7 @@ fn new_swarm(listen: Multiaddr) -> Result { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Address is not supported: {:#}", listen))?; - tracing::info!("Initialized swarm: {}", local_peer_id); + info!("Initialized swarm: {}", local_peer_id); Ok(swarm) } @@ -139,6 +144,7 @@ pub enum OutEvent { msg: bob::Message1, channel: ResponseChannel, }, + Message2(bob::Message2), } impl From for OutEvent { @@ -173,6 +179,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: message2::OutEvent) -> Self { + match event { + message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), + } + } +} + /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -182,6 +196,7 @@ pub struct Alice { amounts: Amounts, message0: Message0, message1: Message1, + message2: Message2, #[behaviour(ignore)] identity: Keypair, } @@ -225,6 +240,7 @@ impl Default for Alice { amounts: Amounts::default(), message0: Message0::default(), message1: Message1::default(), + message2: Message2::default(), identity, } } diff --git a/swap/src/alice/message2.rs b/swap/src/alice/message2.rs new file mode 100644 index 00000000..57b43bd4 --- /dev/null +++ b/swap/src/alice/message2.rs @@ -0,0 +1,94 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, ResponseChannel, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use xmr_btc::bob; + +#[derive(Debug)] +pub enum OutEvent { + Msg(bob::Message2), +} + +/// A `NetworkBehaviour` that represents receiving of message 2 from Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message2 { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message2 { + pub fn send(&mut self, channel: ResponseChannel, msg: xmr_btc::alice::Message2) { + let msg = AliceToBob::Message2(msg); + self.rr.send_response(channel, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Message2 { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Message2 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { request, .. }, + .. + } => match request { + BobToAlice::Message2(msg) => { + self.events.push_back(OutEvent::Msg(msg)); + } + other => debug!("got request: {:?}", other), + }, + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } => panic!("Alice should not get a Response"), + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/bob.rs b/swap/src/bob.rs index 22c339ee..91e88cc1 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -13,8 +13,9 @@ use tracing::{debug, info}; mod amounts; mod message0; mod message1; +mod message2; -use self::{amounts::*, message0::*, message1::*}; +use self::{amounts::*, message0::*, message1::*, message2::*}; use crate::{ network::{ peer_tracker::{self, PeerTracker}, @@ -24,7 +25,7 @@ use crate::{ }; use xmr_btc::{ alice, - bitcoin::BuildTxLockPsbt, + bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}, bob::{self, State0}, }; @@ -38,7 +39,7 @@ pub async fn swap( wallet: W, ) -> Result<()> where - W: BuildTxLockPsbt + Send + Sync + 'static, + W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, { let mut swarm = new_swarm()?; @@ -93,14 +94,16 @@ where }; swarm.send_message1(alice.clone(), state1.next_message()); - let _state2 = match swarm.next().await { + let state2 = match swarm.next().await { OutEvent::Message1(msg) => { - state1.receive(msg) // TODO: More graceful error handling. + state1.receive(msg)? // TODO: More graceful error handling. } other => panic!("unexpected event: {:?}", other), }; - info!("handshake complete, we now have State2 for Bob."); + swarm.send_message2(alice.clone(), state2.next_message()); + + info!("Handshake complete, we now have State2 for Bob."); thread::park(); Ok(()) @@ -134,6 +137,7 @@ pub enum OutEvent { Amounts(SwapAmounts), Message0(alice::Message0), Message1(alice::Message1), + Message2(alice::Message2), } impl From for OutEvent { @@ -170,6 +174,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: message2::OutEvent) -> Self { + match event { + message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), + } + } +} + /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -179,6 +191,7 @@ pub struct Bob { amounts: Amounts, message0: Message0, message1: Message1, + message2: Message2, #[behaviour(ignore)] identity: Keypair, } @@ -209,6 +222,11 @@ impl Bob { self.message1.send(alice, msg) } + /// Sends Bob's third message to Alice. + pub fn send_message2(&mut self, alice: PeerId, msg: bob::Message2) { + self.message2.send(alice, msg) + } + /// Returns Alice's peer id if we are connected. pub fn peer_id_of_alice(&self) -> Option { self.pt.counterparty_peer_id() @@ -224,6 +242,7 @@ impl Default for Bob { amounts: Amounts::default(), message0: Message0::default(), message1: Message1::default(), + message2: Message2::default(), identity, } } diff --git a/swap/src/bob/message2.rs b/swap/src/bob/message2.rs new file mode 100644 index 00000000..aa8be6bc --- /dev/null +++ b/swap/src/bob/message2.rs @@ -0,0 +1,92 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use xmr_btc::{alice, bob}; + +#[derive(Debug)] +pub enum OutEvent { + Msg(alice::Message2), +} + +/// A `NetworkBehaviour` that represents sending message 2 to Alice. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message2 { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message2 { + pub fn send(&mut self, alice: PeerId, msg: bob::Message2) { + let msg = BobToAlice::Message2(msg); + let _id = self.rr.send_request(&alice, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Message2 { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Message2 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { .. }, + .. + } => panic!("Bob should never get a request from Alice"), + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { response, .. }, + .. + } => match response { + AliceToBob::Message2(msg) => self.events.push_back(OutEvent::Msg(msg)), + other => debug!("got response: {:?}", other), + }, + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/main.rs b/swap/src/main.rs index 5a62120d..b405f6de 100644 --- a/swap/src/main.rs +++ b/swap/src/main.rs @@ -26,7 +26,7 @@ mod trace; use cli::Options; use swap::{alice, bitcoin::Wallet, bob, Cmd, Rsp, SwapAmounts}; -use xmr_btc::bitcoin::BuildTxLockPsbt; +use xmr_btc::bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}; // TODO: Add root seed file instead of generating new seed each run. // TODO: Remove all instances of the todo! macro @@ -109,7 +109,7 @@ async fn swap_as_bob( wallet: W, ) -> Result<()> where - W: BuildTxLockPsbt + Send + Sync + 'static, + W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, { let (cmd_tx, mut cmd_rx) = mpsc::channel(1); let (mut rsp_tx, rsp_rx) = mpsc::channel(1); diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index b0061a9b..709013b6 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -25,6 +25,7 @@ pub enum BobToAlice { AmountsFromXmr(monero::Amount), Message0(bob::Message0), Message1(bob::Message1), + Message2(bob::Message2), } /// Messages Alice sends to Bob. @@ -34,6 +35,7 @@ pub enum AliceToBob { Amounts(SwapAmounts), Message0(alice::Message0), Message1(alice::Message1), + Message2(alice::Message2), } #[derive(Debug, Clone, Copy, Default)] diff --git a/xmr-btc/src/alice/message.rs b/xmr-btc/src/alice/message.rs index 6bbaa9ac..30958a83 100644 --- a/xmr-btc/src/alice/message.rs +++ b/xmr-btc/src/alice/message.rs @@ -29,7 +29,7 @@ pub struct Message1 { pub(crate) tx_refund_encsig: EncryptedSignature, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message2 { pub(crate) tx_lock_proof: monero::TransferProof, } diff --git a/xmr-btc/src/bob/message.rs b/xmr-btc/src/bob/message.rs index bfd977c6..b6bed872 100644 --- a/xmr-btc/src/bob/message.rs +++ b/xmr-btc/src/bob/message.rs @@ -27,7 +27,7 @@ pub struct Message1 { pub(crate) tx_lock: bitcoin::TxLock, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message2 { pub(crate) tx_punish_sig: Signature, pub(crate) tx_cancel_sig: Signature,