From 0f17ec076c09f064e62dd6ccac6d1c08f57b71a7 Mon Sep 17 00:00:00 2001 From: "Tobin C. Harding" Date: Thu, 22 Oct 2020 13:23:12 +1100 Subject: [PATCH] Add message1 --- swap/src/alice.rs | 49 ++++++++++-- swap/src/alice/message0.rs | 2 +- swap/src/alice/message1.rs | 113 +++++++++++++++++++++++++++ swap/src/bob.rs | 40 ++++++++-- swap/src/bob/amounts.rs | 2 +- swap/src/bob/message0.rs | 2 +- swap/src/bob/message1.rs | 98 +++++++++++++++++++++++ swap/src/network/request_response.rs | 2 + xmr-btc/src/alice/message.rs | 2 +- xmr-btc/src/bob/message.rs | 2 +- 10 files changed, 296 insertions(+), 16 deletions(-) create mode 100644 swap/src/alice/message1.rs create mode 100644 swap/src/bob/message1.rs diff --git a/swap/src/alice.rs b/swap/src/alice.rs index d0cedd23..4ad7f196 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -12,8 +12,9 @@ use tracing::debug; mod amounts; mod message0; +mod message1; -use self::{amounts::*, message0::*}; +use self::{amounts::*, message0::*, message1::*}; use crate::{ network::{ peer_tracker::{self, PeerTracker}, @@ -47,7 +48,7 @@ pub async fn swap( debug!("Got request from Bob to swap {}", btc); let p = calculate_amounts(btc); last_amounts = Some(p); - swarm.send(channel, AliceToBob::Amounts(p)); + swarm.send_amounts(channel, p); } OutEvent::Message0(msg) => { debug!("Got message0 from Bob"); @@ -55,6 +56,7 @@ pub async fn swap( message0 = Some(msg); break; } + other => panic!("Unexpected event: {:?}", other), }; } @@ -78,11 +80,25 @@ pub async fn swap( ); swarm.set_state0(state0.clone()); - let _state1 = match message0 { - Some(msg) => state0.receive(msg), + let state1 = match message0 { + Some(msg) => state0.receive(msg).expect("failed to receive msg 0"), None => panic!("should have the message by here"), }; + let (state2, channel) = match swarm.next().await { + OutEvent::Message1 { msg, channel } => { + debug!("Got message1 from Bob"); + let state2 = state1.receive(msg); + (state2, channel) + } + other => panic!("Unexpected event: {:?}", other), + }; + + let msg = state2.next_message(); + swarm.send_message1(channel, msg); + + tracing::info!("handshake complete, we now have State2 for Alice."); + tracing::warn!("parking thread ..."); thread::park(); Ok(()) @@ -118,6 +134,10 @@ pub enum OutEvent { ConnectionEstablished(PeerId), Request(amounts::OutEvent), Message0(bob::Message0), + Message1 { + msg: bob::Message1, + channel: ResponseChannel, + }, } impl From for OutEvent { @@ -144,6 +164,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: message1::OutEvent) -> Self { + match event { + message1::OutEvent::Msg { msg, channel } => OutEvent::Message1 { msg, channel }, + } + } +} + /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -152,6 +180,7 @@ pub struct Alice { pt: PeerTracker, amounts: Amounts, message0: Message0, + message1: Message1, #[behaviour(ignore)] identity: Keypair, } @@ -166,13 +195,22 @@ impl Alice { } /// Alice always sends her messages as a response to a request from Bob. - pub fn send(&mut self, channel: ResponseChannel, msg: AliceToBob) { + pub fn send_amounts(&mut self, channel: ResponseChannel, p: SwapParams) { + let msg = AliceToBob::Amounts(p); self.amounts.send(channel, msg); } pub fn set_state0(&mut self, state: State0) { let _ = self.message0.set_state(state); } + + pub fn send_message1( + &mut self, + channel: ResponseChannel, + msg: xmr_btc::alice::Message1, + ) { + self.message1.send(channel, msg) + } } impl Default for Alice { @@ -184,6 +222,7 @@ impl Default for Alice { pt: PeerTracker::default(), amounts: Amounts::new(timeout), message0: Message0::new(timeout), + message1: Message1::new(timeout), identity, } } diff --git a/swap/src/alice/message0.rs b/swap/src/alice/message0.rs index ad62586c..ec85d57a 100644 --- a/swap/src/alice/message0.rs +++ b/swap/src/alice/message0.rs @@ -23,7 +23,7 @@ pub enum OutEvent { Msg(bob::Message0), } -/// A `NetworkBehaviour` that represents getting the amounts of an XMR/BTC swap. +/// A `NetworkBehaviour` that represents send/recv of message 0. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] diff --git a/swap/src/alice/message1.rs b/swap/src/alice/message1.rs new file mode 100644 index 00000000..4995e99a --- /dev/null +++ b/swap/src/alice/message1.rs @@ -0,0 +1,113 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, ResponseChannel, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::error; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}; +use xmr_btc::bob; + +#[derive(Debug)] +pub enum OutEvent { + Msg { + /// Received message from Bob. + msg: bob::Message1, + /// Channel to send back Alice's message 1. + channel: ResponseChannel, + }, +} + +/// A `NetworkBehaviour` that represents send/recv of message 1. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message1 { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message1 { + pub fn new(timeout: Duration) -> Self { + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } + + pub fn send(&mut self, channel: ResponseChannel, msg: xmr_btc::alice::Message1) { + let msg = AliceToBob::Message1(msg); + self.rr.send_response(channel, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess> for Message1 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + peer: _, + message: + RequestResponseMessage::Request { + request, + request_id: _, + channel, + }, + } => match request { + BobToAlice::Message1(msg) => { + self.events.push_back(OutEvent::Msg { msg, channel }); + } + other => panic!("unexpected request: {:?}", other), + }, + RequestResponseEvent::Message { + peer: _, + message: + RequestResponseMessage::Response { + response: _, + request_id: _, + }, + } => panic!("unexpected response"), + RequestResponseEvent::InboundFailure { + peer: _, + request_id: _, + error, + } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { + peer: _, + request_id: _, + error, + } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/bob.rs b/swap/src/bob.rs index ed38cf87..53eec7d0 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -8,12 +8,13 @@ use futures::{ use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use rand::rngs::OsRng; use std::{process, thread, time::Duration}; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; mod amounts; mod message0; +mod message1; -use self::{amounts::*, message0::*}; +use self::{amounts::*, message0::*, message1::*}; use crate::{ network::{ peer_tracker::{self, PeerTracker}, @@ -80,15 +81,26 @@ where PUNISH_TIMELOCK, refund_address, ); + swarm.send_message0(alice.clone(), state0.next_message(rng)); - let _state1 = match swarm.next().await { + let state1 = match swarm.next().await { OutEvent::Message0(msg) => { - state0.receive(&wallet, msg) // TODO: More graceful error handling. + state0.receive(&wallet, msg).await? // TODO: More graceful error + // handling. } other => panic!("unexpected event: {:?}", other), }; - warn!("parking thread ..."); + swarm.send_message1(alice.clone(), state1.next_message()); + let _state2 = match swarm.next().await { + OutEvent::Message1(msg) => { + state1.receive(msg) // TODO: More graceful error handling. + } + other => panic!("unexpected event: {:?}", other), + }; + + info!("handshake complete, we now have State2 for Bob."); + thread::park(); Ok(()) } @@ -120,6 +132,7 @@ pub enum OutEvent { ConnectionEstablished(PeerId), Amounts(amounts::OutEvent), Message0(alice::Message0), + Message1(alice::Message1), } impl From for OutEvent { @@ -146,6 +159,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: message1::OutEvent) -> Self { + match event { + message1::OutEvent::Msg(msg) => OutEvent::Message1(msg), + } + } +} + /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -154,6 +175,7 @@ pub struct Bob { pt: PeerTracker, amounts: Amounts, message0: Message0, + message1: Message1, #[behaviour(ignore)] identity: Keypair, } @@ -174,11 +196,16 @@ impl Bob { debug!("Requesting amounts from: {}", alice); } - /// Sends Bob's first state message to Alice. + /// Sends Bob's first message to Alice. pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { self.message0.send(alice, msg) } + /// Sends Bob's second message to Alice. + pub fn send_message1(&mut self, alice: PeerId, msg: bob::Message1) { + self.message1.send(alice, msg) + } + /// Returns Alice's peer id if we are connected. pub fn peer_id_of_alice(&self) -> Option { self.pt.counterparty_peer_id() @@ -194,6 +221,7 @@ impl Default for Bob { pt: PeerTracker::default(), amounts: Amounts::new(timeout), message0: Message0::new(timeout), + message1: Message1::new(timeout), identity, } } diff --git a/swap/src/bob/amounts.rs b/swap/src/bob/amounts.rs index 4443a1e1..6d7ed714 100644 --- a/swap/src/bob/amounts.rs +++ b/swap/src/bob/amounts.rs @@ -85,7 +85,7 @@ impl NetworkBehaviourEventProcess> }, } => match response { AliceToBob::Amounts(p) => self.events.push_back(OutEvent::Amounts(p)), - AliceToBob::Message0(_) => panic!("shouldn't get message0 here"), + other => panic!("unexpected response: {:?}", other), }, RequestResponseEvent::InboundFailure { .. } => { diff --git a/swap/src/bob/message0.rs b/swap/src/bob/message0.rs index 8e3a0ad8..fd20c841 100644 --- a/swap/src/bob/message0.rs +++ b/swap/src/bob/message0.rs @@ -80,7 +80,7 @@ impl NetworkBehaviourEventProcess> }, } => match response { AliceToBob::Message0(msg) => self.events.push_back(OutEvent::Msg(msg)), - AliceToBob::Amounts(_) => panic!("shouldn't get amounts here"), + other => panic!("unexpected response: {:?}", other), }, RequestResponseEvent::InboundFailure { .. } => { diff --git a/swap/src/bob/message1.rs b/swap/src/bob/message1.rs new file mode 100644 index 00000000..43d24850 --- /dev/null +++ b/swap/src/bob/message1.rs @@ -0,0 +1,98 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::error; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}; +use xmr_btc::{alice, bob}; + +#[derive(Debug)] +pub enum OutEvent { + Msg(alice::Message1), +} + +/// A `NetworkBehaviour` that represents send/recv of message 1. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message1 { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message1 { + pub fn new(timeout: Duration) -> Self { + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } + + pub fn send(&mut self, alice: PeerId, msg: bob::Message1) { + let msg = BobToAlice::Message1(msg); + let _id = self.rr.send_request(&alice, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess> for Message1 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + peer: _, + message: RequestResponseMessage::Request { .. }, + } => panic!("Bob should never get a request from Alice"), + RequestResponseEvent::Message { + peer: _, + message: + RequestResponseMessage::Response { + response, + request_id: _, + }, + } => match response { + AliceToBob::Message1(msg) => self.events.push_back(OutEvent::Msg(msg)), + other => panic!("unexpected response: {:?}", other), + }, + + RequestResponseEvent::InboundFailure { .. } => { + panic!("Bob should never get a request from Alice, so should never get an InboundFailure"); + } + RequestResponseEvent::OutboundFailure { + peer: _, + request_id: _, + error, + } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index b6e582c5..1b45a9dd 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -21,6 +21,7 @@ pub enum BobToAlice { AmountsFromBtc(::bitcoin::Amount), AmountsFromXmr(monero::Amount), Message0(bob::Message0), + Message1(bob::Message1), } /// Messages Alice sends to Bob. @@ -29,6 +30,7 @@ pub enum BobToAlice { pub enum AliceToBob { Amounts(SwapParams), Message0(alice::Message0), + Message1(alice::Message1), } #[derive(Debug, Clone, Copy, Default)] diff --git a/xmr-btc/src/alice/message.rs b/xmr-btc/src/alice/message.rs index 79b66986..6bbaa9ac 100644 --- a/xmr-btc/src/alice/message.rs +++ b/xmr-btc/src/alice/message.rs @@ -23,7 +23,7 @@ pub struct Message0 { pub(crate) punish_address: bitcoin::Address, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message1 { pub(crate) tx_cancel_sig: Signature, pub(crate) tx_refund_encsig: EncryptedSignature, diff --git a/xmr-btc/src/bob/message.rs b/xmr-btc/src/bob/message.rs index 2826a545..bfd977c6 100644 --- a/xmr-btc/src/bob/message.rs +++ b/xmr-btc/src/bob/message.rs @@ -22,7 +22,7 @@ pub struct Message0 { pub(crate) refund_address: bitcoin::Address, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message1 { pub(crate) tx_lock: bitcoin::TxLock, }