From 9a5e35c1bd21c49d4256615dea1aa38909929f9d Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 10:12:36 +1100 Subject: [PATCH 01/10] Rename `bob::Message2` to `Message5` As per the proposed changed in the sequence diagram. The aim is to have a unique terminology per message instead of having the same name for 2 consequent messages that share the same behaviour. Note that the aim is to remove the shared `RequestResponse` behaviours. --- swap/src/network/request_response.rs | 12 +++++------ swap/src/protocol/alice.rs | 14 ++++++------- swap/src/protocol/alice/event_loop.rs | 19 +++++++++--------- .../alice/{message3.rs => message5.rs} | 20 +++++++++---------- swap/src/protocol/alice/state.rs | 4 ++-- swap/src/protocol/alice/steps.rs | 2 +- swap/src/protocol/bob.rs | 18 ++++++++--------- swap/src/protocol/bob/event_loop.rs | 2 +- .../protocol/bob/{message3.rs => message5.rs} | 18 ++++++++--------- swap/src/protocol/bob/state.rs | 6 +++--- 10 files changed, 58 insertions(+), 57 deletions(-) rename swap/src/protocol/alice/{message3.rs => message5.rs} (86%) rename swap/src/protocol/bob/{message3.rs => message5.rs} (86%) diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 1fd46224..3d9c5b9d 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -1,4 +1,4 @@ -use crate::protocol::{alice, bob}; +use crate::protocol::{alice, bob, bob::Message5}; use async_trait::async_trait; use futures::prelude::*; use libp2p::{ @@ -25,7 +25,7 @@ pub enum BobToAlice { Message0(Box), Message1(bob::Message1), Message2(bob::Message2), - Message3(bob::Message3), + Message5(Message5), } /// Messages Alice sends to Bob. @@ -35,7 +35,7 @@ pub enum AliceToBob { Message0(Box), Message1(Box), Message2(alice::Message2), - Message3, // empty response + Message5, // empty response } #[derive(Debug, Clone, Copy, Default)] @@ -51,7 +51,7 @@ pub struct Message1Protocol; pub struct Message2Protocol; #[derive(Debug, Clone, Copy, Default)] -pub struct Message3Protocol; +pub struct Message5Protocol; impl ProtocolName for Swap { fn protocol_name(&self) -> &[u8] { @@ -77,9 +77,9 @@ impl ProtocolName for Message2Protocol { } } -impl ProtocolName for Message3Protocol { +impl ProtocolName for Message5Protocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message3/1.0.0" + b"/xmr/btc/message5/1.0.0" } } diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 51c7134b..a640b854 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -21,7 +21,7 @@ use crate::{ transport::build, Seed as NetworkSeed, }, - protocol::{bob, SwapAmounts}, + protocol::{bob, bob::Message5, SwapAmounts}, seed::Seed, }; use anyhow::{bail, Result}; @@ -37,7 +37,7 @@ pub mod event_loop; mod message0; mod message1; mod message2; -mod message3; +mod message5; pub mod state; mod steps; pub mod swap; @@ -235,7 +235,7 @@ pub enum OutEvent { msg: bob::Message2, channel: ResponseChannel, }, - Message3(bob::Message3), + Message5(Message5), } impl From for OutEvent { @@ -281,10 +281,10 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: message3::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: message5::OutEvent) -> Self { match event { - message3::OutEvent::Msg(msg) => OutEvent::Message3(msg), + message5::OutEvent::Msg(msg) => OutEvent::Message5(msg), } } } @@ -299,7 +299,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, - message3: message3::Behaviour, + message5: message5::Behaviour, } impl Behaviour { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index a648d1a6..bdd626bf 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -4,6 +4,7 @@ use crate::{ alice, alice::{Behaviour, OutEvent, SwapResponse}, bob, + bob::Message5, }, }; use anyhow::{anyhow, Context, Result}; @@ -37,7 +38,7 @@ pub struct EventLoopHandle { msg0: Receiver<(bob::Message0, ResponseChannel)>, msg1: Receiver<(bob::Message1, ResponseChannel)>, msg2: Receiver<(bob::Message2, ResponseChannel)>, - msg3: Receiver, + msg5: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, @@ -75,8 +76,8 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob")) } - pub async fn recv_message3(&mut self) -> Result { - self.msg3 + pub async fn recv_message5(&mut self) -> Result { + self.msg5 .recv() .await .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) @@ -137,7 +138,7 @@ pub struct EventLoop { msg0: Sender<(bob::Message0, ResponseChannel)>, msg1: Sender<(bob::Message1, ResponseChannel)>, msg2: Sender<(bob::Message2, ResponseChannel)>, - msg3: Sender, + msg5: Sender, request: Sender, conn_established: Sender, send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, @@ -165,7 +166,7 @@ impl EventLoop { let msg0 = Channels::new(); let msg1 = Channels::new(); let msg2 = Channels::new(); - let msg3 = Channels::new(); + let msg5 = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); let send_swap_response = Channels::new(); @@ -178,7 +179,7 @@ impl EventLoop { msg0: msg0.sender, msg1: msg1.sender, msg2: msg2.sender, - msg3: msg3.sender, + msg5: msg5.sender, request: request.sender, conn_established: conn_established.sender, send_swap_response: send_swap_response.receiver, @@ -191,7 +192,7 @@ impl EventLoop { msg0: msg0.receiver, msg1: msg1.receiver, msg2: msg2.receiver, - msg3: msg3.receiver, + msg5: msg5.receiver, request: request.receiver, conn_established: conn_established.receiver, send_swap_response: send_swap_response.sender, @@ -220,8 +221,8 @@ impl EventLoop { OutEvent::Message2 { msg, channel } => { let _ = self.msg2.send((msg, channel)).await; } - OutEvent::Message3(msg) => { - let _ = self.msg3.send(msg).await; + OutEvent::Message5(msg) => { + let _ = self.msg5.send(msg).await; } OutEvent::Request(event) => { let _ = self.request.send(*event).await; diff --git a/swap/src/protocol/alice/message3.rs b/swap/src/protocol/alice/message5.rs similarity index 86% rename from swap/src/protocol/alice/message3.rs rename to swap/src/protocol/alice/message5.rs index 9c85cc96..fd6d40d4 100644 --- a/swap/src/protocol/alice/message3.rs +++ b/swap/src/protocol/alice/message5.rs @@ -1,6 +1,6 @@ use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT}, - protocol::bob, + network::request_response::{AliceToBob, BobToAlice, Codec, Message5Protocol, TIMEOUT}, + protocol::bob::Message5, }; use libp2p::{ request_response::{ @@ -19,15 +19,15 @@ use tracing::{debug, error}; #[derive(Debug)] pub enum OutEvent { - Msg(bob::Message3), + Msg(Message5), } -/// A `NetworkBehaviour` that represents receiving of message 3 from Bob. +/// A `NetworkBehaviour` that represents receiving of message 5 from Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -37,7 +37,7 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -55,7 +55,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( Codec::default(), - vec![(Message3Protocol, ProtocolSupport::Full)], + vec![(Message5Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -73,11 +73,11 @@ impl NetworkBehaviourEventProcess> }, .. } => { - if let BobToAlice::Message3(msg) = request { - debug!("Received Message3"); + if let BobToAlice::Message5(msg) = request { + debug!("Received message 5"); self.events.push_back(OutEvent::Msg(msg)); // Send back empty response so that the request/response protocol completes. - self.rr.send_response(channel, AliceToBob::Message3); + self.rr.send_response(channel, AliceToBob::Message5); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index e838568f..118a3791 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -9,7 +9,7 @@ use crate::{ monero, monero::CreateWalletForOutput, network::request_response::AliceToBob, - protocol::{alice, bob, SwapAmounts}, + protocol::{alice, bob, bob::Message5, SwapAmounts}, }; use anyhow::{anyhow, Context, Result}; use ecdsa_fun::{ @@ -482,7 +482,7 @@ impl State5 { } } - pub fn receive(self, msg: bob::Message3) -> State6 { + pub fn receive(self, msg: Message5) -> State6 { State6 { a: self.a, B: self.B, diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index d6be5ae8..c2a89415 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -146,7 +146,7 @@ pub async fn wait_for_bitcoin_encrypted_signature( event_loop_handle: &mut EventLoopHandle, ) -> Result { let msg3 = event_loop_handle - .recv_message3() + .recv_message5() .await .context("Failed to receive Bitcoin encrypted signature from Bob")?; diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 20e8d2a2..7431209e 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -26,7 +26,7 @@ pub use self::{ message0::Message0, message1::Message1, message2::Message2, - message3::Message3, + message5::Message5, state::*, swap::{run, run_until}, swap_request::*, @@ -36,7 +36,7 @@ pub mod event_loop; mod message0; mod message1; mod message2; -mod message3; +mod message5; pub mod state; pub mod swap; mod swap_request; @@ -211,7 +211,7 @@ pub enum OutEvent { Message0(Box), Message1(Box), Message2(alice::Message2), - Message3, + Message5, } impl From for OutEvent { @@ -254,10 +254,10 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: message3::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: message5::OutEvent) -> Self { match event { - message3::OutEvent::Msg => OutEvent::Message3, + message5::OutEvent::Msg => OutEvent::Message5, } } } @@ -272,7 +272,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, - message3: message3::Behaviour, + message5: message5::Behaviour, } impl Behaviour { @@ -302,8 +302,8 @@ impl Behaviour { /// Sends Bob's fourth message to Alice. pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) { - let msg = bob::Message3 { tx_redeem_encsig }; - self.message3.send(alice, msg); + let msg = Message5 { tx_redeem_encsig }; + self.message5.send(alice, msg); debug!("Sent Message3"); } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 99130973..45cbac17 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -217,7 +217,7 @@ impl EventLoop { OutEvent::Message2(msg) => { let _ = self.msg2.send(msg).await; } - OutEvent::Message3 => info!("Alice acknowledged message 3 received"), + OutEvent::Message5 => info!("Alice acknowledged message 5 received"), } }, option = self.dial_alice.next().fuse() => { diff --git a/swap/src/protocol/bob/message3.rs b/swap/src/protocol/bob/message5.rs similarity index 86% rename from swap/src/protocol/bob/message3.rs rename to swap/src/protocol/bob/message5.rs index f9703ad1..801e6532 100644 --- a/swap/src/protocol/bob/message3.rs +++ b/swap/src/protocol/bob/message5.rs @@ -1,6 +1,6 @@ use crate::{ bitcoin::EncryptedSignature, - network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT}, + network::request_response::{AliceToBob, BobToAlice, Codec, Message5Protocol, TIMEOUT}, }; use libp2p::{ request_response::{ @@ -19,7 +19,7 @@ use std::{ use tracing::error; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message3 { +pub struct Message5 { pub tx_redeem_encsig: EncryptedSignature, } @@ -28,19 +28,19 @@ pub enum OutEvent { Msg, } -/// A `NetworkBehaviour` that represents sending message 3 to Alice. +/// A `NetworkBehaviour` that represents sending message 5 to Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: Message3) { - let msg = BobToAlice::Message3(msg); + pub fn send(&mut self, alice: PeerId, msg: Message5) { + let msg = BobToAlice::Message5(msg); let _id = self.rr.send_request(&alice, msg); } @@ -48,7 +48,7 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -66,7 +66,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( Codec::default(), - vec![(Message3Protocol, ProtocolSupport::Full)], + vec![(Message5Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -85,7 +85,7 @@ impl NetworkBehaviourEventProcess> message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::Message3 = response { + if let AliceToBob::Message5 = response { self.events.push_back(OutEvent::Msg); } } diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index f06cfd17..0498e81e 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -9,7 +9,7 @@ use crate::{ config::Config, monero, monero::{monero_private_key, TransferProof}, - protocol::{alice, bob, SwapAmounts}, + protocol::{alice, bob, bob::Message5, SwapAmounts}, }; use anyhow::{anyhow, Result}; use ecdsa_fun::{ @@ -438,11 +438,11 @@ pub struct State4 { } impl State4 { - pub fn next_message(&self) -> bob::Message3 { + pub fn next_message(&self) -> Message5 { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()); - bob::Message3 { tx_redeem_encsig } + Message5 { tx_redeem_encsig } } pub fn tx_redeem_encsig(&self) -> EncryptedSignature { From edb93624f39f6a8f37d53db746a94ae5d4491859 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 11:05:46 +1100 Subject: [PATCH 02/10] Introduce one shot code To allow alice to be the requester for message 4. --- swap/src/network/request_response.rs | 106 ++++++++++++++++++++++++++- swap/src/protocol/alice/message5.rs | 24 +++--- swap/src/protocol/bob/message5.rs | 20 ++--- 3 files changed, 126 insertions(+), 24 deletions(-) diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 3d9c5b9d..68e9d803 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -19,13 +19,14 @@ const BUF_SIZE: usize = 1024 * 1024; // Codec for each Message and a macro that implements them. /// Messages Bob sends to Alice. +// TODO: Remove this once network changes are done +#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BobToAlice { SwapRequest(bob::SwapRequest), Message0(Box), Message1(bob::Message1), Message2(bob::Message2), - Message5(Message5), } /// Messages Alice sends to Bob. @@ -35,7 +36,19 @@ pub enum AliceToBob { Message0(Box), Message1(Box), Message2(alice::Message2), - Message5, // empty response +} + +/// Messages sent from one party to the other. +/// All responses are empty +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Request { + Message5(Message5), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +/// Response are only used for acknowledgement purposes. +pub enum Response { + Message5, } #[derive(Debug, Clone, Copy, Default)] @@ -171,3 +184,92 @@ where Ok(()) } } + +#[derive(Clone, Copy, Debug, Default)] +pub struct OneShotCodec

{ + phantom: PhantomData

, +} + +#[async_trait] +impl

RequestResponseCodec for OneShotCodec

+where + P: Send + Sync + Clone + ProtocolName, +{ + type Protocol = P; + type Request = Request; + type Response = Response; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + debug!("enter read_request"); + let message = upgrade::read_one(io, BUF_SIZE) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = Request::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_request error: {:?}", e); + io::Error::new(io::ErrorKind::Other, e) + })?; + + Ok(msg) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + debug!("enter read_response"); + let message = upgrade::read_one(io, BUF_SIZE) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = Response::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_response error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + + Ok(msg) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let bytes = + serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + debug!("enter write_response"); + let bytes = serde_cbor::to_vec(&res).map_err(|e| { + tracing::debug!("serde write_reponse error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } +} diff --git a/swap/src/protocol/alice/message5.rs b/swap/src/protocol/alice/message5.rs index fd6d40d4..00cc7d8f 100644 --- a/swap/src/protocol/alice/message5.rs +++ b/swap/src/protocol/alice/message5.rs @@ -1,5 +1,5 @@ use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message5Protocol, TIMEOUT}, + network::request_response::{Message5Protocol, OneShotCodec, Request, Response, TIMEOUT}, protocol::bob::Message5, }; use libp2p::{ @@ -27,7 +27,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -37,7 +37,8 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll>, OutEvent>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -54,7 +55,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), + OneShotCodec::default(), vec![(Message5Protocol, ProtocolSupport::Full)], config, ), @@ -63,8 +64,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: @@ -73,12 +74,11 @@ impl NetworkBehaviourEventProcess> }, .. } => { - if let BobToAlice::Message5(msg) = request { - debug!("Received message 5"); - self.events.push_back(OutEvent::Msg(msg)); - // Send back empty response so that the request/response protocol completes. - self.rr.send_response(channel, AliceToBob::Message5); - } + let Request::Message5(msg) = request; + debug!("Received message 5"); + self.events.push_back(OutEvent::Msg(msg)); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, Response::Message5); } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, diff --git a/swap/src/protocol/bob/message5.rs b/swap/src/protocol/bob/message5.rs index 801e6532..ee2104da 100644 --- a/swap/src/protocol/bob/message5.rs +++ b/swap/src/protocol/bob/message5.rs @@ -1,6 +1,6 @@ use crate::{ bitcoin::EncryptedSignature, - network::request_response::{AliceToBob, BobToAlice, Codec, Message5Protocol, TIMEOUT}, + network::request_response::{Message5Protocol, OneShotCodec, Request, Response, TIMEOUT}, }; use libp2p::{ request_response::{ @@ -33,14 +33,14 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message5) { - let msg = BobToAlice::Message5(msg); + let msg = Request::Message5(msg); let _id = self.rr.send_request(&alice, msg); } @@ -48,7 +48,8 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll>, OutEvent>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -65,7 +66,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), + OneShotCodec::default(), vec![(Message5Protocol, ProtocolSupport::Full)], config, ), @@ -74,8 +75,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, @@ -85,9 +86,8 @@ impl NetworkBehaviourEventProcess> message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::Message5 = response { - self.events.push_back(OutEvent::Msg); - } + let Response::Message5 = response; + self.events.push_back(OutEvent::Msg); } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); From 124d6f1ebb7f5947556f1fe8ea039f1686789c44 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 11:13:28 +1100 Subject: [PATCH 03/10] Introduced Message 4 sent by Alice to replace message 2 response --- swap/src/network/request_response.rs | 17 ++++- swap/src/protocol/alice.rs | 2 + swap/src/protocol/alice/message4.rs | 101 +++++++++++++++++++++++++++ swap/src/protocol/alice/message5.rs | 11 +-- swap/src/protocol/bob/message5.rs | 7 +- 5 files changed, 127 insertions(+), 11 deletions(-) create mode 100644 swap/src/protocol/alice/message4.rs diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 68e9d803..7eeaa08e 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -1,4 +1,4 @@ -use crate::protocol::{alice, bob, bob::Message5}; +use crate::protocol::{alice, alice::Message4, bob, bob::Message5}; use async_trait::async_trait; use futures::prelude::*; use libp2p::{ @@ -42,12 +42,14 @@ pub enum AliceToBob { /// All responses are empty #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Request { - Message5(Message5), + Message4(Box), + Message5(Box), } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] /// Response are only used for acknowledgement purposes. pub enum Response { + Message4, Message5, } @@ -63,6 +65,9 @@ pub struct Message1Protocol; #[derive(Debug, Clone, Copy, Default)] pub struct Message2Protocol; +#[derive(Debug, Clone, Copy, Default)] +pub struct Message4Protocol; + #[derive(Debug, Clone, Copy, Default)] pub struct Message5Protocol; @@ -90,6 +95,12 @@ impl ProtocolName for Message2Protocol { } } +impl ProtocolName for Message4Protocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/message4/1.0.0" + } +} + impl ProtocolName for Message5Protocol { fn protocol_name(&self) -> &[u8] { b"/xmr/btc/message5/1.0.0" diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index a640b854..940aa92b 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -5,6 +5,7 @@ pub use self::{ message0::Message0, message1::Message1, message2::Message2, + message4::Message4, state::*, swap::{run, run_until}, swap_response::*, @@ -37,6 +38,7 @@ pub mod event_loop; mod message0; mod message1; mod message2; +mod message4; mod message5; pub mod state; mod steps; diff --git a/swap/src/protocol/alice/message4.rs b/swap/src/protocol/alice/message4.rs new file mode 100644 index 00000000..040a3f65 --- /dev/null +++ b/swap/src/protocol/alice/message4.rs @@ -0,0 +1,101 @@ +use crate::{ + monero, + network::request_response::{Message4Protocol, OneShotCodec, Request, Response, TIMEOUT}, +}; +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::error; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Message4 { + pub tx_lock_proof: monero::TransferProof, +} + +#[derive(Debug, Copy, Clone)] +pub enum OutEvent { + Msg, +} + +/// A `NetworkBehaviour` that represents sending message 4 to Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Behaviour { + pub fn send(&mut self, bob: PeerId, msg: Message4) { + let msg = Request::Message4(Box::new(msg)); + let _id = self.rr.send_request(&bob, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> + { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Behaviour { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + OneShotCodec::default(), + vec![(Message4Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { .. }, + .. + } => panic!("Alice should never get a message 4 request from Bob"), + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { response, .. }, + .. + } => { + if let Response::Message4 = response { + self.events.push_back(OutEvent::Msg); + } + } + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/protocol/alice/message5.rs b/swap/src/protocol/alice/message5.rs index 00cc7d8f..f285de7c 100644 --- a/swap/src/protocol/alice/message5.rs +++ b/swap/src/protocol/alice/message5.rs @@ -74,11 +74,12 @@ impl NetworkBehaviourEventProcess> for B }, .. } => { - let Request::Message5(msg) = request; - debug!("Received message 5"); - self.events.push_back(OutEvent::Msg(msg)); - // Send back empty response so that the request/response protocol completes. - let _ = self.rr.send_response(channel, Response::Message5); + if let Request::Message5(msg) = request { + debug!("Received message 5"); + self.events.push_back(OutEvent::Msg(*msg)); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, Response::Message5); + } } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, diff --git a/swap/src/protocol/bob/message5.rs b/swap/src/protocol/bob/message5.rs index ee2104da..385fcfae 100644 --- a/swap/src/protocol/bob/message5.rs +++ b/swap/src/protocol/bob/message5.rs @@ -40,7 +40,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message5) { - let msg = Request::Message5(msg); + let msg = Request::Message5(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } @@ -86,8 +86,9 @@ impl NetworkBehaviourEventProcess> for B message: RequestResponseMessage::Response { response, .. }, .. } => { - let Response::Message5 = response; - self.events.push_back(OutEvent::Msg); + if let Response::Message5 = response { + self.events.push_back(OutEvent::Msg); + } } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); From d2a1937f519c9b9a4432803ff90d1f1ab7110a8b Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 14:21:27 +1100 Subject: [PATCH 04/10] Use `Message4` --- swap/src/database/alice.rs | 4 +- swap/src/network/request_response.rs | 2 +- swap/src/protocol/alice.rs | 28 +++++--- swap/src/protocol/alice/event_loop.rs | 40 ++++++----- swap/src/protocol/alice/message2.rs | 31 +++------ swap/src/protocol/alice/state.rs | 15 +++-- swap/src/protocol/alice/steps.rs | 18 +++-- swap/src/protocol/alice/swap.rs | 20 +++--- swap/src/protocol/bob.rs | 16 ++++- swap/src/protocol/bob/event_loop.rs | 23 ++++--- swap/src/protocol/bob/message2.rs | 15 ++--- swap/src/protocol/bob/message4.rs | 96 +++++++++++++++++++++++++++ swap/src/protocol/bob/swap.rs | 10 ++- 13 files changed, 211 insertions(+), 107 deletions(-) create mode 100644 swap/src/protocol/bob/message4.rs diff --git a/swap/src/database/alice.rs b/swap/src/database/alice.rs index 939ff550..6829d7e8 100644 --- a/swap/src/database/alice.rs +++ b/swap/src/database/alice.rs @@ -83,7 +83,7 @@ impl From for AliceState { match db_state { Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 }, Alice::Negotiated(state3) => AliceState::Negotiated { - channel: None, + bob_peer_id: None, amounts: SwapAmounts { btc: state3.btc, xmr: state3.xmr, @@ -91,7 +91,7 @@ impl From for AliceState { state3: Box::new(state3), }, Alice::BtcLocked(state3) => AliceState::BtcLocked { - channel: None, + bob_peer_id: None, amounts: SwapAmounts { btc: state3.btc, xmr: state3.xmr, diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 7eeaa08e..1e8cbae8 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -35,7 +35,7 @@ pub enum AliceToBob { SwapResponse(alice::SwapResponse), Message0(Box), Message1(Box), - Message2(alice::Message2), + Message2, } /// Messages sent from one party to the other. diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 940aa92b..34827dc8 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -4,7 +4,6 @@ pub use self::{ event_loop::{EventLoop, EventLoopHandle}, message0::Message0, message1::Message1, - message2::Message2, message4::Message4, state::*, swap::{run, run_until}, @@ -234,9 +233,10 @@ pub enum OutEvent { channel: ResponseChannel, }, Message2 { - msg: bob::Message2, - channel: ResponseChannel, + msg: Box, + bob_peer_id: PeerId, }, + Message4, Message5(Message5), } @@ -278,7 +278,18 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: message2::OutEvent) -> Self { match event { - message2::OutEvent::Msg { msg, channel } => OutEvent::Message2 { msg, channel }, + message2::OutEvent::Msg { msg, bob_peer_id } => OutEvent::Message2 { + msg: Box::new(msg), + bob_peer_id, + }, + } + } +} + +impl From for OutEvent { + fn from(event: message4::OutEvent) -> Self { + match event { + message4::OutEvent::Msg => OutEvent::Message4, } } } @@ -301,6 +312,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, + message4: message4::Behaviour, message5: message5::Behaviour, } @@ -327,9 +339,9 @@ impl Behaviour { debug!("Sent Message1"); } - /// Send Message2 to Bob in response to receiving his Message2. - pub fn send_message2(&mut self, channel: ResponseChannel, msg: Message2) { - self.message2.send(channel, msg); - debug!("Sent Message2"); + /// Send Message4 to Bob. + pub fn send_message4(&mut self, bob: PeerId, msg: Message4) { + self.message4.send(bob, msg); + debug!("Sent Message 4"); } } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index bdd626bf..2d0c0a7b 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,7 +2,7 @@ use crate::{ network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::{Behaviour, OutEvent, SwapResponse}, + alice::{Behaviour, Message4, OutEvent, SwapResponse}, bob, bob::Message5, }, @@ -13,6 +13,7 @@ use libp2p::{ core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm, }; use tokio::sync::mpsc::{Receiver, Sender}; +use tracing::trace; #[allow(missing_debug_implementations)] pub struct Channels { @@ -37,14 +38,14 @@ impl Default for Channels { pub struct EventLoopHandle { msg0: Receiver<(bob::Message0, ResponseChannel)>, msg1: Receiver<(bob::Message1, ResponseChannel)>, - msg2: Receiver<(bob::Message2, ResponseChannel)>, + msg2: Receiver, msg5: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, send_msg0: Sender<(ResponseChannel, alice::Message0)>, send_msg1: Sender<(ResponseChannel, alice::Message1)>, - send_msg2: Sender<(ResponseChannel, alice::Message2)>, + send_msg4: Sender<(PeerId, Message4)>, } impl EventLoopHandle { @@ -69,11 +70,11 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob")) } - pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel)> { + pub async fn recv_message2(&mut self) -> Result { self.msg2 .recv() .await - .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob")) + .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) } pub async fn recv_message5(&mut self) -> Result { @@ -122,12 +123,8 @@ impl EventLoopHandle { Ok(()) } - pub async fn send_message2( - &mut self, - channel: ResponseChannel, - msg: alice::Message2, - ) -> Result<()> { - let _ = self.send_msg2.send((channel, msg)).await?; + pub async fn send_message4(&mut self, bob: PeerId, msg: Message4) -> Result<()> { + let _ = self.send_msg4.send((bob, msg)).await?; Ok(()) } } @@ -137,14 +134,14 @@ pub struct EventLoop { swarm: libp2p::Swarm, msg0: Sender<(bob::Message0, ResponseChannel)>, msg1: Sender<(bob::Message1, ResponseChannel)>, - msg2: Sender<(bob::Message2, ResponseChannel)>, + msg2: Sender, msg5: Sender, request: Sender, conn_established: Sender, send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, send_msg0: Receiver<(ResponseChannel, alice::Message0)>, send_msg1: Receiver<(ResponseChannel, alice::Message1)>, - send_msg2: Receiver<(ResponseChannel, alice::Message2)>, + send_msg4: Receiver<(PeerId, Message4)>, } impl EventLoop { @@ -172,7 +169,7 @@ impl EventLoop { let send_swap_response = Channels::new(); let send_msg0 = Channels::new(); let send_msg1 = Channels::new(); - let send_msg2 = Channels::new(); + let send_msg4 = Channels::new(); let driver = EventLoop { swarm, @@ -185,7 +182,7 @@ impl EventLoop { send_swap_response: send_swap_response.receiver, send_msg0: send_msg0.receiver, send_msg1: send_msg1.receiver, - send_msg2: send_msg2.receiver, + send_msg4: send_msg4.receiver, }; let handle = EventLoopHandle { @@ -198,7 +195,7 @@ impl EventLoop { send_swap_response: send_swap_response.sender, send_msg0: send_msg0.sender, send_msg1: send_msg1.sender, - send_msg2: send_msg2.sender, + send_msg4: send_msg4.sender, }; Ok((driver, handle)) @@ -218,9 +215,10 @@ impl EventLoop { OutEvent::Message1 { msg, channel } => { let _ = self.msg1.send((msg, channel)).await; } - OutEvent::Message2 { msg, channel } => { - let _ = self.msg2.send((msg, channel)).await; + OutEvent::Message2 { msg, bob_peer_id : _} => { + let _ = self.msg2.send(*msg).await; } + OutEvent::Message4 => trace!("Bob ack'd message 4"), OutEvent::Message5(msg) => { let _ = self.msg5.send(msg).await; } @@ -244,9 +242,9 @@ impl EventLoop { self.swarm.send_message1(channel, msg); } }, - msg2 = self.send_msg2.next().fuse() => { - if let Some((channel, msg)) = msg2 { - self.swarm.send_message2(channel, msg); + msg4 = self.send_msg4.next().fuse() => { + if let Some((bob_peer_id, msg)) = msg4 { + self.swarm.send_message4(bob_peer_id, msg); } }, } diff --git a/swap/src/protocol/alice/message2.rs b/swap/src/protocol/alice/message2.rs index d383ff11..6e54f8f4 100644 --- a/swap/src/protocol/alice/message2.rs +++ b/swap/src/protocol/alice/message2.rs @@ -1,17 +1,15 @@ use crate::{ - monero, network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}, protocol::bob, }; use libp2p::{ request_response::{ handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, ResponseChannel, + RequestResponseEvent, RequestResponseMessage, }, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, + NetworkBehaviour, PeerId, }; -use serde::{Deserialize, Serialize}; use std::{ collections::VecDeque, task::{Context, Poll}, @@ -22,18 +20,11 @@ use tracing::{debug, error}; #[derive(Debug)] pub enum OutEvent { Msg { - /// Received message from Bob. msg: bob::Message2, - /// Channel to send back Alice's message 2. - channel: ResponseChannel, + bob_peer_id: PeerId, }, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message2 { - pub tx_lock_proof: monero::TransferProof, -} - /// A `NetworkBehaviour` that represents receiving of message 2 from Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] @@ -45,11 +36,6 @@ pub struct Behaviour { } impl Behaviour { - pub fn send(&mut self, channel: ResponseChannel, msg: Message2) { - let msg = AliceToBob::Message2(msg); - self.rr.send_response(channel, msg); - } - fn poll( &mut self, _: &mut Context<'_>, @@ -84,15 +70,20 @@ impl NetworkBehaviourEventProcess> fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Request { request, channel, .. }, - .. } => { if let BobToAlice::Message2(msg) = request { - debug!("Received Message2"); - self.events.push_back(OutEvent::Msg { msg, channel }); + debug!("Received Message 2"); + self.events.push_back(OutEvent::Msg { + msg, + bob_peer_id: peer, + }); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, AliceToBob::Message2); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 118a3791..76f89804 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -8,15 +8,14 @@ use crate::{ }, monero, monero::CreateWalletForOutput, - network::request_response::AliceToBob, - protocol::{alice, bob, bob::Message5, SwapAmounts}, + protocol::{alice, alice::Message4, bob, bob::Message5, SwapAmounts}, }; use anyhow::{anyhow, Context, Result}; use ecdsa_fun::{ adaptor::{Adaptor, EncryptedSignature}, nonce::Deterministic, }; -use libp2p::request_response::ResponseChannel; +use libp2p::PeerId; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; use sha2::Sha256; @@ -30,12 +29,14 @@ pub enum AliceState { state0: State0, }, Negotiated { - channel: Option>, + // TODO: Remove option + bob_peer_id: Option, amounts: SwapAmounts, state3: Box, }, BtcLocked { - channel: Option>, + // TODO: Remove option + bob_peer_id: Option, amounts: SwapAmounts, state3: Box, }, @@ -476,8 +477,8 @@ pub struct State5 { } impl State5 { - pub fn next_message(&self) -> alice::Message2 { - alice::Message2 { + pub fn next_message(&self) -> Message4 { + Message4 { tx_lock_proof: self.tx_lock_proof.clone(), } } diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index c2a89415..ea0274d6 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -10,10 +10,9 @@ use crate::{ config::Config, monero, monero::Transfer, - network::request_response::AliceToBob, protocol::{ alice, - alice::{event_loop::EventLoopHandle, SwapResponse}, + alice::{event_loop::EventLoopHandle, Message4, SwapResponse}, SwapAmounts, }, }; @@ -23,7 +22,7 @@ use futures::{ future::{select, Either}, pin_mut, }; -use libp2p::request_response::ResponseChannel; +use libp2p::PeerId; use rand::rngs::OsRng; use sha2::Sha256; use std::sync::Arc; @@ -35,11 +34,11 @@ pub async fn negotiate( xmr_amount: monero::Amount, event_loop_handle: &mut EventLoopHandle, config: Config, -) -> Result<(ResponseChannel, alice::State3)> { +) -> Result<(PeerId, alice::State3)> { trace!("Starting negotiate"); // todo: we can move this out, we dont need to timeout here - let _peer_id = timeout( + let bob_peer_id = timeout( config.bob_time_to_act, event_loop_handle.recv_conn_established(), ) @@ -73,12 +72,11 @@ pub async fn negotiate( .send_message1(channel, state2.next_message()) .await?; - let (bob_message2, channel) = - timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??; + let bob_message2 = timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??; let state3 = state2.receive(bob_message2)?; - Ok((channel, state3)) + Ok((bob_peer_id, state3)) } // TODO(Franck): Use helper functions from xmr-btc instead of re-writing them @@ -108,7 +106,7 @@ where } pub async fn lock_xmr( - channel: ResponseChannel, + bob_peer_id: PeerId, amounts: SwapAmounts, state3: alice::State3, event_loop_handle: &mut EventLoopHandle, @@ -134,7 +132,7 @@ where // Otherwise Alice might publish the lock tx twice! event_loop_handle - .send_message2(channel, alice::Message2 { + .send_message4(bob_peer_id, Message4 { tx_lock_proof: transfer_proof, }) .await?; diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index ecda7c63..870917c2 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -91,11 +91,11 @@ async fn run_until_internal( } else { match state { AliceState::Started { amounts, state0 } => { - let (channel, state3) = + let (peer_id, state3) = negotiate(state0, amounts.xmr, &mut event_loop_handle, config).await?; let state = AliceState::Negotiated { - channel: Some(channel), + bob_peer_id: Some(peer_id), amounts, state3: Box::new(state3), }; @@ -117,11 +117,11 @@ async fn run_until_internal( } AliceState::Negotiated { state3, - channel, + bob_peer_id, amounts, } => { - let state = match channel { - Some(channel) => { + let state = match bob_peer_id { + Some(bob_peer_id) => { let _ = wait_for_locked_bitcoin( state3.tx_lock.txid(), bitcoin_wallet.clone(), @@ -130,7 +130,7 @@ async fn run_until_internal( .await?; AliceState::BtcLocked { - channel: Some(channel), + bob_peer_id: Some(bob_peer_id), amounts, state3, } @@ -159,14 +159,14 @@ async fn run_until_internal( .await } AliceState::BtcLocked { - channel, + bob_peer_id, amounts, state3, } => { - let state = match channel { - Some(channel) => { + let state = match bob_peer_id { + Some(bob_peer_id) => { lock_xmr( - channel, + bob_peer_id, amounts, *state3.clone(), &mut event_loop_handle, diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 7431209e..9361b02b 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -31,11 +31,13 @@ pub use self::{ swap::{run, run_until}, swap_request::*, }; +use crate::protocol::alice::Message4; pub mod event_loop; mod message0; mod message1; mod message2; +mod message4; mod message5; pub mod state; pub mod swap; @@ -210,7 +212,8 @@ pub enum OutEvent { SwapResponse(alice::SwapResponse), Message0(Box), Message1(Box), - Message2(alice::Message2), + Message2, + Message4(Box), Message5, } @@ -249,7 +252,15 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: message2::OutEvent) -> Self { match event { - message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), + message2::OutEvent::Msg => OutEvent::Message2, + } + } +} + +impl From for OutEvent { + fn from(event: message4::OutEvent) -> Self { + match event { + message4::OutEvent::Msg(msg) => OutEvent::Message4(Box::new(msg)), } } } @@ -272,6 +283,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, + message4: message4::Behaviour, message5: message5::Behaviour, } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 45cbac17..16409ea9 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,7 +3,7 @@ use crate::{ network::{transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::SwapResponse, + alice::{Message4, SwapResponse}, bob::{self, Behaviour, OutEvent, SwapRequest}, }, }; @@ -40,7 +40,7 @@ pub struct EventLoopHandle { swap_response: Receiver, msg0: Receiver, msg1: Receiver, - msg2: Receiver, + msg4: Receiver, conn_established: Receiver, dial_alice: Sender<()>, send_swap_request: Sender, @@ -72,11 +72,11 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) } - pub async fn recv_message2(&mut self) -> Result { - self.msg2 + pub async fn recv_message4(&mut self) -> Result { + self.msg4 .recv() .await - .ok_or_else(|| anyhow!("Failed o receive message 2 from Alice")) + .ok_or_else(|| anyhow!("Failed to receive message 4 from Alice")) } /// Dials other party and wait for the connection to be established. @@ -126,7 +126,7 @@ pub struct EventLoop { swap_response: Sender, msg0: Sender, msg1: Sender, - msg2: Sender, + msg4: Sender, conn_established: Sender, dial_alice: Receiver<()>, send_swap_request: Receiver, @@ -155,7 +155,7 @@ impl EventLoop { let swap_response = Channels::new(); let msg0 = Channels::new(); let msg1 = Channels::new(); - let msg2 = Channels::new(); + let msg4 = Channels::new(); let conn_established = Channels::new(); let dial_alice = Channels::new(); let send_swap_request = Channels::new(); @@ -170,7 +170,7 @@ impl EventLoop { swap_response: swap_response.sender, msg0: msg0.sender, msg1: msg1.sender, - msg2: msg2.sender, + msg4: msg4.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, send_swap_request: send_swap_request.receiver, @@ -184,7 +184,7 @@ impl EventLoop { swap_response: swap_response.receiver, msg0: msg0.receiver, msg1: msg1.receiver, - msg2: msg2.receiver, + msg4: msg4.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, send_swap_request: send_swap_request.sender, @@ -214,8 +214,9 @@ impl EventLoop { OutEvent::Message1(msg) => { let _ = self.msg1.send(*msg).await; } - OutEvent::Message2(msg) => { - let _ = self.msg2.send(msg).await; + OutEvent::Message2 => info!("Alice acknowledged message 2 received"), + OutEvent::Message4(msg) => { + let _ = self.msg4.send(*msg).await; } OutEvent::Message5 => info!("Alice acknowledged message 5 received"), } diff --git a/swap/src/protocol/bob/message2.rs b/swap/src/protocol/bob/message2.rs index edda820e..eeb5f849 100644 --- a/swap/src/protocol/bob/message2.rs +++ b/swap/src/protocol/bob/message2.rs @@ -1,7 +1,4 @@ -use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}, - protocol::alice, -}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}; use ecdsa_fun::Signature; use libp2p::{ request_response::{ @@ -25,9 +22,9 @@ pub struct Message2 { pub(crate) tx_cancel_sig: Signature, } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] pub enum OutEvent { - Msg(alice::Message2), + Msg, } /// A `NetworkBehaviour` that represents sending message 2 to Alice. @@ -87,9 +84,9 @@ impl NetworkBehaviourEventProcess> message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::Message2(msg) = response { - debug!("Received Message2"); - self.events.push_back(OutEvent::Msg(msg)); + if let AliceToBob::Message2 = response { + debug!("Received Message 2 acknowledgement"); + self.events.push_back(OutEvent::Msg); } } RequestResponseEvent::InboundFailure { error, .. } => { diff --git a/swap/src/protocol/bob/message4.rs b/swap/src/protocol/bob/message4.rs new file mode 100644 index 00000000..80a1866d --- /dev/null +++ b/swap/src/protocol/bob/message4.rs @@ -0,0 +1,96 @@ +use crate::{ + network::request_response::{Message4Protocol, OneShotCodec, Request, Response, TIMEOUT}, + protocol::alice::Message4, +}; +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +#[derive(Debug)] +pub enum OutEvent { + Msg(Message4), +} + +/// A `NetworkBehaviour` that represents receiving of message 4 from Alice. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Behaviour { + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> + { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Behaviour { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + OneShotCodec::default(), + vec![(Message4Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request, channel, .. + }, + .. + } => { + if let Request::Message4(msg) = request { + debug!("Received message 4"); + self.events.push_back(OutEvent::Msg(*msg)); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, Response::Message4); + } + } + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } => panic!("Bob should not get a Response"), + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 049ac52f..2f1e21bd 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -130,7 +130,7 @@ where { event_loop_handle.dial().await?; - let msg2_watcher = event_loop_handle.recv_message2(); + let msg4_watcher = event_loop_handle.recv_message4(); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); @@ -143,13 +143,11 @@ where monero_wallet.inner.block_height().await?; select! { - msg2 = msg2_watcher => { - - let msg2 = msg2?; - + msg4 = msg4_watcher => { + let msg4 = msg4?; BobState::XmrLockProofReceived { state: state3, - lock_transfer_proof: msg2.tx_lock_proof, + lock_transfer_proof: msg4.tx_lock_proof, monero_wallet_restore_blockheight } }, From a910bc204636cf5c85f520168aab3c5061ba31d7 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 14:38:16 +1100 Subject: [PATCH 05/10] Box all messages in enum The messages are very different, best to box them and save size on the stack as an enum takes as much space on the stack than its bigger variant. --- swap/src/network/request_response.rs | 10 ++++------ swap/src/protocol/alice/message1.rs | 2 +- swap/src/protocol/alice/message2.rs | 2 +- swap/src/protocol/alice/swap_response.rs | 4 ++-- swap/src/protocol/bob/message1.rs | 2 +- swap/src/protocol/bob/message2.rs | 2 +- swap/src/protocol/bob/swap_request.rs | 6 ++++-- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 1e8cbae8..420e6594 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -19,20 +19,18 @@ const BUF_SIZE: usize = 1024 * 1024; // Codec for each Message and a macro that implements them. /// Messages Bob sends to Alice. -// TODO: Remove this once network changes are done -#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BobToAlice { - SwapRequest(bob::SwapRequest), + SwapRequest(Box), Message0(Box), - Message1(bob::Message1), - Message2(bob::Message2), + Message1(Box), + Message2(Box), } /// Messages Alice sends to Bob. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum AliceToBob { - SwapResponse(alice::SwapResponse), + SwapResponse(Box), Message0(Box), Message1(Box), Message2, diff --git a/swap/src/protocol/alice/message1.rs b/swap/src/protocol/alice/message1.rs index 95050905..497c8625 100644 --- a/swap/src/protocol/alice/message1.rs +++ b/swap/src/protocol/alice/message1.rs @@ -93,7 +93,7 @@ impl NetworkBehaviourEventProcess> } => { if let BobToAlice::Message1(msg) = request { debug!("Received Message1"); - self.events.push_back(OutEvent::Msg { msg, channel }); + self.events.push_back(OutEvent::Msg { msg: *msg, channel }); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/message2.rs b/swap/src/protocol/alice/message2.rs index 6e54f8f4..306b9be5 100644 --- a/swap/src/protocol/alice/message2.rs +++ b/swap/src/protocol/alice/message2.rs @@ -79,7 +79,7 @@ impl NetworkBehaviourEventProcess> if let BobToAlice::Message2(msg) = request { debug!("Received Message 2"); self.events.push_back(OutEvent::Msg { - msg, + msg: *msg, bob_peer_id: peer, }); // Send back empty response so that the request/response protocol completes. diff --git a/swap/src/protocol/alice/swap_response.rs b/swap/src/protocol/alice/swap_response.rs index 47f6ceab..031bd8dd 100644 --- a/swap/src/protocol/alice/swap_response.rs +++ b/swap/src/protocol/alice/swap_response.rs @@ -45,7 +45,7 @@ pub struct Behaviour { impl Behaviour { /// Alice always sends her messages as a response to a request from Bob. pub fn send(&mut self, channel: ResponseChannel, msg: SwapResponse) { - let msg = AliceToBob::SwapResponse(msg); + let msg = AliceToBob::SwapResponse(Box::new(msg)); self.rr.send_response(channel, msg); } @@ -92,7 +92,7 @@ impl NetworkBehaviourEventProcess> } => { if let BobToAlice::SwapRequest(msg) = request { debug!("Received swap request"); - self.events.push_back(OutEvent { msg, channel }) + self.events.push_back(OutEvent { msg: *msg, channel }) } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/bob/message1.rs b/swap/src/protocol/bob/message1.rs index f783af1d..dea01ae3 100644 --- a/swap/src/protocol/bob/message1.rs +++ b/swap/src/protocol/bob/message1.rs @@ -41,7 +41,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message1) { - let msg = BobToAlice::Message1(msg); + let msg = BobToAlice::Message1(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } diff --git a/swap/src/protocol/bob/message2.rs b/swap/src/protocol/bob/message2.rs index eeb5f849..20333d95 100644 --- a/swap/src/protocol/bob/message2.rs +++ b/swap/src/protocol/bob/message2.rs @@ -39,7 +39,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message2) { - let msg = BobToAlice::Message2(msg); + let msg = BobToAlice::Message2(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } diff --git a/swap/src/protocol/bob/swap_request.rs b/swap/src/protocol/bob/swap_request.rs index 121eb6de..5f032081 100644 --- a/swap/src/protocol/bob/swap_request.rs +++ b/swap/src/protocol/bob/swap_request.rs @@ -42,7 +42,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result { - let msg = BobToAlice::SwapRequest(swap_request); + let msg = BobToAlice::SwapRequest(Box::new(swap_request)); let id = self.rr.send_request(&alice, msg); Ok(id) @@ -92,7 +92,9 @@ impl NetworkBehaviourEventProcess> } => { if let AliceToBob::SwapResponse(swap_response) = response { debug!("Received swap response"); - self.events.push_back(OutEvent { swap_response }); + self.events.push_back(OutEvent { + swap_response: *swap_response, + }); } } RequestResponseEvent::InboundFailure { error, .. } => { From 33db688e3a940e6d83750642a36f7e5dbf4a938f Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 14:56:11 +1100 Subject: [PATCH 06/10] Bob peer id can be retrieved from the DB This remove branches where Alice resumes from the DB but cannot contact Bob. --- swap/src/database/alice.rs | 49 +++++++++++++++++++++------ swap/src/lib.rs | 1 + swap/src/main.rs | 6 ++-- swap/src/protocol/alice/state.rs | 6 ++-- swap/src/protocol/alice/swap.rs | 57 ++++++++++---------------------- swap/src/serde_peer_id.rs | 48 +++++++++++++++++++++++++++ 6 files changed, 111 insertions(+), 56 deletions(-) create mode 100644 swap/src/serde_peer_id.rs diff --git a/swap/src/database/alice.rs b/swap/src/database/alice.rs index 6829d7e8..cc004bcf 100644 --- a/swap/src/database/alice.rs +++ b/swap/src/database/alice.rs @@ -5,6 +5,7 @@ use crate::{ protocol::{alice, alice::AliceState, SwapAmounts}, }; use ::bitcoin::hashes::core::fmt::Display; +use libp2p::PeerId; use serde::{Deserialize, Serialize}; // Large enum variant is fine because this is only used for database @@ -16,8 +17,16 @@ pub enum Alice { amounts: SwapAmounts, state0: alice::State0, }, - Negotiated(alice::State3), - BtcLocked(alice::State3), + Negotiated { + state3: alice::State3, + #[serde(with = "crate::serde_peer_id")] + bob_peer_id: PeerId, + }, + BtcLocked { + state3: alice::State3, + #[serde(with = "crate::serde_peer_id")] + bob_peer_id: PeerId, + }, XmrLocked(alice::State3), EncSigLearned { encrypted_signature: EncryptedSignature, @@ -45,8 +54,22 @@ pub enum AliceEndState { impl From<&AliceState> for Alice { fn from(alice_state: &AliceState) -> Self { match alice_state { - AliceState::Negotiated { state3, .. } => Alice::Negotiated(state3.as_ref().clone()), - AliceState::BtcLocked { state3, .. } => Alice::BtcLocked(state3.as_ref().clone()), + AliceState::Negotiated { + state3, + bob_peer_id, + .. + } => Alice::Negotiated { + state3: state3.as_ref().clone(), + bob_peer_id: bob_peer_id.clone(), + }, + AliceState::BtcLocked { + state3, + bob_peer_id, + .. + } => Alice::BtcLocked { + state3: state3.as_ref().clone(), + bob_peer_id: bob_peer_id.clone(), + }, AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.as_ref().clone()), AliceState::EncSigLearned { state3, @@ -82,16 +105,22 @@ impl From for AliceState { fn from(db_state: Alice) -> Self { match db_state { Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 }, - Alice::Negotiated(state3) => AliceState::Negotiated { - bob_peer_id: None, + Alice::Negotiated { + state3, + bob_peer_id, + } => AliceState::Negotiated { + bob_peer_id, amounts: SwapAmounts { btc: state3.btc, xmr: state3.xmr, }, state3: Box::new(state3), }, - Alice::BtcLocked(state3) => AliceState::BtcLocked { - bob_peer_id: None, + Alice::BtcLocked { + state3, + bob_peer_id, + } => AliceState::BtcLocked { + bob_peer_id, amounts: SwapAmounts { btc: state3.btc, xmr: state3.xmr, @@ -157,8 +186,8 @@ impl Display for Alice { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Alice::Started { .. } => write!(f, "Started"), - Alice::Negotiated(_) => f.write_str("Negotiated"), - Alice::BtcLocked(_) => f.write_str("Bitcoin locked"), + Alice::Negotiated { .. } => f.write_str("Negotiated"), + Alice::BtcLocked { .. } => f.write_str("Bitcoin locked"), Alice::XmrLocked(_) => f.write_str("Monero locked"), Alice::CancelTimelockExpired(_) => f.write_str("Cancel timelock is expired"), Alice::BtcCancelled(_) => f.write_str("Bitcoin cancel transaction published"), diff --git a/swap/src/lib.rs b/swap/src/lib.rs index d85355a7..5e51f8d5 100644 --- a/swap/src/lib.rs +++ b/swap/src/lib.rs @@ -26,3 +26,4 @@ pub mod seed; pub mod trace; mod fs; +mod serde_peer_id; diff --git a/swap/src/main.rs b/swap/src/main.rs index 06fd7c00..e5c23fad 100644 --- a/swap/src/main.rs +++ b/swap/src/main.rs @@ -25,16 +25,18 @@ use tracing::{info, log::LevelFilter}; use uuid::Uuid; pub mod bitcoin; -mod cli; pub mod config; pub mod database; -mod fs; pub mod monero; pub mod network; pub mod protocol; pub mod seed; pub mod trace; +mod cli; +mod fs; +mod serde_peer_id; + #[macro_use] extern crate prettytable; diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 76f89804..13145eb2 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -29,14 +29,12 @@ pub enum AliceState { state0: State0, }, Negotiated { - // TODO: Remove option - bob_peer_id: Option, + bob_peer_id: PeerId, amounts: SwapAmounts, state3: Box, }, BtcLocked { - // TODO: Remove option - bob_peer_id: Option, + bob_peer_id: PeerId, amounts: SwapAmounts, state3: Box, }, diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 870917c2..2abe8e67 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -91,11 +91,11 @@ async fn run_until_internal( } else { match state { AliceState::Started { amounts, state0 } => { - let (peer_id, state3) = + let (bob_peer_id, state3) = negotiate(state0, amounts.xmr, &mut event_loop_handle, config).await?; let state = AliceState::Negotiated { - bob_peer_id: Some(peer_id), + bob_peer_id, amounts, state3: Box::new(state3), }; @@ -120,27 +120,14 @@ async fn run_until_internal( bob_peer_id, amounts, } => { - let state = match bob_peer_id { - Some(bob_peer_id) => { - let _ = wait_for_locked_bitcoin( - state3.tx_lock.txid(), - bitcoin_wallet.clone(), - config, - ) + let _ = + wait_for_locked_bitcoin(state3.tx_lock.txid(), bitcoin_wallet.clone(), config) .await?; - AliceState::BtcLocked { - bob_peer_id: Some(bob_peer_id), - amounts, - state3, - } - } - None => { - tracing::info!("Cannot resume swap from negotiated state, aborting"); - - // Alice did not lock Xmr yet - AliceState::SafelyAborted - } + let state = AliceState::BtcLocked { + bob_peer_id, + amounts, + state3, }; let db_state = (&state).into(); @@ -163,26 +150,16 @@ async fn run_until_internal( amounts, state3, } => { - let state = match bob_peer_id { - Some(bob_peer_id) => { - lock_xmr( - bob_peer_id, - amounts, - *state3.clone(), - &mut event_loop_handle, - monero_wallet.clone(), - ) - .await?; + lock_xmr( + bob_peer_id, + amounts, + *state3.clone(), + &mut event_loop_handle, + monero_wallet.clone(), + ) + .await?; - AliceState::XmrLocked { state3 } - } - None => { - tracing::info!("Cannot resume swap from BTC locked state, aborting"); - - // Alice did not lock Xmr yet - AliceState::SafelyAborted - } - }; + let state = AliceState::XmrLocked { state3 }; let db_state = (&state).into(); db.insert_latest_state(swap_id, database::Swap::Alice(db_state)) diff --git a/swap/src/serde_peer_id.rs b/swap/src/serde_peer_id.rs new file mode 100644 index 00000000..b0da3ac2 --- /dev/null +++ b/swap/src/serde_peer_id.rs @@ -0,0 +1,48 @@ +//! A serde module that defines how we want to serialize PeerIds on the +//! HTTP-API. + +use libp2p::PeerId; +use serde::{de::Error, Deserialize, Deserializer, Serializer}; + +pub fn serialize(peer_id: &PeerId, serializer: S) -> Result +where + S: Serializer, +{ + let string = peer_id.to_string(); + serializer.serialize_str(&string) +} + +#[allow(dead_code)] +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let string = String::deserialize(deserializer)?; + let peer_id = string.parse().map_err(D::Error::custom)?; + + Ok(peer_id) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Serialize; + use spectral::prelude::*; + + #[derive(Serialize)] + struct SerializablePeerId(#[serde(with = "super")] PeerId); + + #[test] + fn maker_id_serializes_as_expected() { + let peer_id = SerializablePeerId( + "QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY" + .parse() + .unwrap(), + ); + + let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id"); + + assert_that(&got) + .is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string()); + } +} From 8fd2620b83eac808cea62733190a9ca538695493 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 27 Jan 2021 14:14:20 +1100 Subject: [PATCH 07/10] Improve names for messages 4 and 5 --- docs/sequence.puml | 4 +- swap/src/network/request_response.rs | 22 +++++----- swap/src/protocol/alice.rs | 36 +++++++-------- .../{message5.rs => encrypted_signature.rs} | 26 ++++++----- swap/src/protocol/alice/event_loop.rs | 44 +++++++++---------- swap/src/protocol/alice/state.rs | 17 +++---- swap/src/protocol/alice/steps.rs | 6 +-- .../alice/{message4.rs => transfer_proof.rs} | 21 ++++----- swap/src/protocol/bob.rs | 39 ++++++++-------- .../{message5.rs => encrypted_signature.rs} | 26 +++++------ swap/src/protocol/bob/event_loop.rs | 42 +++++++++--------- swap/src/protocol/bob/state.rs | 22 ++++------ swap/src/protocol/bob/swap.rs | 8 ++-- .../bob/{message4.rs => transfer_proof.rs} | 21 ++++----- 14 files changed, 168 insertions(+), 166 deletions(-) rename swap/src/protocol/alice/{message5.rs => encrypted_signature.rs} (78%) rename swap/src/protocol/alice/{message4.rs => transfer_proof.rs} (79%) rename swap/src/protocol/bob/{message5.rs => encrypted_signature.rs} (76%) rename swap/src/protocol/bob/{message4.rs => transfer_proof.rs} (82%) diff --git a/docs/sequence.puml b/docs/sequence.puml index 189856db..11638d4b 100644 --- a/docs/sequence.puml +++ b/docs/sequence.puml @@ -46,10 +46,10 @@ group Execution Alice ->> Monero: Lock - Alice -> Bob: Message4 + Alice -> Bob: TransferProof note right: xmr lock tx transfer proof\nThis can be removed if Bob watches the blockchain. - Bob -> Alice: Message5 + Bob -> Alice: EncryptedSignature note left: redeem tx enc sig S_a Alice ->> Bitcoin: Redeem diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 420e6594..fa5158d5 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -1,4 +1,4 @@ -use crate::protocol::{alice, alice::Message4, bob, bob::Message5}; +use crate::protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature}; use async_trait::async_trait; use futures::prelude::*; use libp2p::{ @@ -40,15 +40,15 @@ pub enum AliceToBob { /// All responses are empty #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Request { - Message4(Box), - Message5(Box), + TransferProof(Box), + EncryptedSignature(Box), } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] /// Response are only used for acknowledgement purposes. pub enum Response { - Message4, - Message5, + TransferProof, + EncryptedSignature, } #[derive(Debug, Clone, Copy, Default)] @@ -64,10 +64,10 @@ pub struct Message1Protocol; pub struct Message2Protocol; #[derive(Debug, Clone, Copy, Default)] -pub struct Message4Protocol; +pub struct TransferProofProtocol; #[derive(Debug, Clone, Copy, Default)] -pub struct Message5Protocol; +pub struct EncryptedSignatureProtocol; impl ProtocolName for Swap { fn protocol_name(&self) -> &[u8] { @@ -93,15 +93,15 @@ impl ProtocolName for Message2Protocol { } } -impl ProtocolName for Message4Protocol { +impl ProtocolName for TransferProofProtocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message4/1.0.0" + b"/xmr/btc/transfer_proof/1.0.0" } } -impl ProtocolName for Message5Protocol { +impl ProtocolName for EncryptedSignatureProtocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message5/1.0.0" + b"/xmr/btc/encrypted_signature/1.0.0" } } diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 34827dc8..22af28df 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -4,10 +4,10 @@ pub use self::{ event_loop::{EventLoop, EventLoopHandle}, message0::Message0, message1::Message1, - message4::Message4, state::*, swap::{run, run_until}, swap_response::*, + transfer_proof::TransferProof, }; use crate::{ bitcoin, @@ -21,7 +21,7 @@ use crate::{ transport::build, Seed as NetworkSeed, }, - protocol::{bob, bob::Message5, SwapAmounts}, + protocol::{bob, bob::EncryptedSignature, SwapAmounts}, seed::Seed, }; use anyhow::{bail, Result}; @@ -33,16 +33,16 @@ use std::{path::PathBuf, sync::Arc}; use tracing::{debug, info}; use uuid::Uuid; +mod encrypted_signature; pub mod event_loop; mod message0; mod message1; mod message2; -mod message4; -mod message5; pub mod state; mod steps; pub mod swap; mod swap_response; +mod transfer_proof; pub struct Swap { pub state: AliceState, @@ -236,8 +236,8 @@ pub enum OutEvent { msg: Box, bob_peer_id: PeerId, }, - Message4, - Message5(Message5), + TransferProof, + EncryptedSignature(EncryptedSignature), } impl From for OutEvent { @@ -286,18 +286,18 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: message4::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { match event { - message4::OutEvent::Msg => OutEvent::Message4, + transfer_proof::OutEvent::Msg => OutEvent::TransferProof, } } } -impl From for OutEvent { - fn from(event: message5::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { match event { - message5::OutEvent::Msg(msg) => OutEvent::Message5(msg), + encrypted_signature::OutEvent::Msg(msg) => OutEvent::EncryptedSignature(msg), } } } @@ -312,8 +312,8 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, - message4: message4::Behaviour, - message5: message5::Behaviour, + transfer_proof: transfer_proof::Behaviour, + encrypted_signature: encrypted_signature::Behaviour, } impl Behaviour { @@ -339,9 +339,9 @@ impl Behaviour { debug!("Sent Message1"); } - /// Send Message4 to Bob. - pub fn send_message4(&mut self, bob: PeerId, msg: Message4) { - self.message4.send(bob, msg); - debug!("Sent Message 4"); + /// Send Transfer Proof to Bob. + pub fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) { + self.transfer_proof.send(bob, msg); + debug!("Sent Transfer Proof"); } } diff --git a/swap/src/protocol/alice/message5.rs b/swap/src/protocol/alice/encrypted_signature.rs similarity index 78% rename from swap/src/protocol/alice/message5.rs rename to swap/src/protocol/alice/encrypted_signature.rs index f285de7c..b62cf1de 100644 --- a/swap/src/protocol/alice/message5.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -1,6 +1,8 @@ use crate::{ - network::request_response::{Message5Protocol, OneShotCodec, Request, Response, TIMEOUT}, - protocol::bob::Message5, + network::request_response::{ + EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, + }, + protocol::bob::EncryptedSignature, }; use libp2p::{ request_response::{ @@ -19,15 +21,16 @@ use tracing::{debug, error}; #[derive(Debug)] pub enum OutEvent { - Msg(Message5), + Msg(EncryptedSignature), } -/// A `NetworkBehaviour` that represents receiving of message 5 from Bob. +/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted +/// signature from Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -37,8 +40,9 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> - { + ) -> Poll< + NetworkBehaviourAction>, OutEvent>, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -56,7 +60,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(Message5Protocol, ProtocolSupport::Full)], + vec![(EncryptedSignatureProtocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -74,11 +78,11 @@ impl NetworkBehaviourEventProcess> for B }, .. } => { - if let Request::Message5(msg) = request { - debug!("Received message 5"); + if let Request::EncryptedSignature(msg) = request { + debug!("Received encrypted signature"); self.events.push_back(OutEvent::Msg(*msg)); // Send back empty response so that the request/response protocol completes. - let _ = self.rr.send_response(channel, Response::Message5); + let _ = self.rr.send_response(channel, Response::EncryptedSignature); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 2d0c0a7b..424a3c74 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,9 +2,9 @@ use crate::{ network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::{Behaviour, Message4, OutEvent, SwapResponse}, + alice::{Behaviour, OutEvent, SwapResponse, TransferProof}, bob, - bob::Message5, + bob::EncryptedSignature, }, }; use anyhow::{anyhow, Context, Result}; @@ -39,13 +39,13 @@ pub struct EventLoopHandle { msg0: Receiver<(bob::Message0, ResponseChannel)>, msg1: Receiver<(bob::Message1, ResponseChannel)>, msg2: Receiver, - msg5: Receiver, + r_encrypted_signature: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, send_msg0: Sender<(ResponseChannel, alice::Message0)>, send_msg1: Sender<(ResponseChannel, alice::Message1)>, - send_msg4: Sender<(PeerId, Message4)>, + s_transfer_proof: Sender<(PeerId, TransferProof)>, } impl EventLoopHandle { @@ -77,8 +77,8 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) } - pub async fn recv_message5(&mut self) -> Result { - self.msg5 + pub async fn recv_encrypted_signature(&mut self) -> Result { + self.r_encrypted_signature .recv() .await .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) @@ -123,8 +123,8 @@ impl EventLoopHandle { Ok(()) } - pub async fn send_message4(&mut self, bob: PeerId, msg: Message4) -> Result<()> { - let _ = self.send_msg4.send((bob, msg)).await?; + pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { + let _ = self.s_transfer_proof.send((bob, msg)).await?; Ok(()) } } @@ -135,13 +135,13 @@ pub struct EventLoop { msg0: Sender<(bob::Message0, ResponseChannel)>, msg1: Sender<(bob::Message1, ResponseChannel)>, msg2: Sender, - msg5: Sender, + r_encrypted_signature: Sender, request: Sender, conn_established: Sender, send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, send_msg0: Receiver<(ResponseChannel, alice::Message0)>, send_msg1: Receiver<(ResponseChannel, alice::Message1)>, - send_msg4: Receiver<(PeerId, Message4)>, + s_transfer_proof: Receiver<(PeerId, TransferProof)>, } impl EventLoop { @@ -163,39 +163,39 @@ impl EventLoop { let msg0 = Channels::new(); let msg1 = Channels::new(); let msg2 = Channels::new(); - let msg5 = Channels::new(); + let r_encrypted_signature = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); let send_swap_response = Channels::new(); let send_msg0 = Channels::new(); let send_msg1 = Channels::new(); - let send_msg4 = Channels::new(); + let s_transfer_proof = Channels::new(); let driver = EventLoop { swarm, msg0: msg0.sender, msg1: msg1.sender, msg2: msg2.sender, - msg5: msg5.sender, + r_encrypted_signature: r_encrypted_signature.sender, request: request.sender, conn_established: conn_established.sender, send_swap_response: send_swap_response.receiver, send_msg0: send_msg0.receiver, send_msg1: send_msg1.receiver, - send_msg4: send_msg4.receiver, + s_transfer_proof: s_transfer_proof.receiver, }; let handle = EventLoopHandle { msg0: msg0.receiver, msg1: msg1.receiver, msg2: msg2.receiver, - msg5: msg5.receiver, + r_encrypted_signature: r_encrypted_signature.receiver, request: request.receiver, conn_established: conn_established.receiver, send_swap_response: send_swap_response.sender, send_msg0: send_msg0.sender, send_msg1: send_msg1.sender, - send_msg4: send_msg4.sender, + s_transfer_proof: s_transfer_proof.sender, }; Ok((driver, handle)) @@ -218,9 +218,9 @@ impl EventLoop { OutEvent::Message2 { msg, bob_peer_id : _} => { let _ = self.msg2.send(*msg).await; } - OutEvent::Message4 => trace!("Bob ack'd message 4"), - OutEvent::Message5(msg) => { - let _ = self.msg5.send(msg).await; + OutEvent::TransferProof => trace!("Bob ack'd receiving the transfer proof"), + OutEvent::EncryptedSignature(msg) => { + let _ = self.r_encrypted_signature.send(msg).await; } OutEvent::Request(event) => { let _ = self.request.send(*event).await; @@ -242,9 +242,9 @@ impl EventLoop { self.swarm.send_message1(channel, msg); } }, - msg4 = self.send_msg4.next().fuse() => { - if let Some((bob_peer_id, msg)) = msg4 { - self.swarm.send_message4(bob_peer_id, msg); + transfer_proof = self.s_transfer_proof.next().fuse() => { + if let Some((bob_peer_id, msg)) = transfer_proof { + self.swarm.send_transfer_proof(bob_peer_id, msg); } }, } diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 13145eb2..c88e23c0 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -8,13 +8,10 @@ use crate::{ }, monero, monero::CreateWalletForOutput, - protocol::{alice, alice::Message4, bob, bob::Message5, SwapAmounts}, + protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature, SwapAmounts}, }; use anyhow::{anyhow, Context, Result}; -use ecdsa_fun::{ - adaptor::{Adaptor, EncryptedSignature}, - nonce::Deterministic, -}; +use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic}; use libp2p::PeerId; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -42,7 +39,7 @@ pub enum AliceState { state3: Box, }, EncSigLearned { - encrypted_signature: EncryptedSignature, + encrypted_signature: bitcoin::EncryptedSignature, state3: Box, }, BtcRedeemed, @@ -475,13 +472,13 @@ pub struct State5 { } impl State5 { - pub fn next_message(&self) -> Message4 { - Message4 { + pub fn next_message(&self) -> TransferProof { + TransferProof { tx_lock_proof: self.tx_lock_proof.clone(), } } - pub fn receive(self, msg: Message5) -> State6 { + pub fn receive(self, msg: EncryptedSignature) -> State6 { State6 { a: self.a, B: self.B, @@ -553,7 +550,7 @@ pub struct State6 { tx_lock: bitcoin::TxLock, tx_punish_sig_bob: bitcoin::Signature, - tx_redeem_encsig: EncryptedSignature, + tx_redeem_encsig: bitcoin::EncryptedSignature, lock_xmr_fee: monero::Amount, } diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index ea0274d6..37de2b34 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -12,7 +12,7 @@ use crate::{ monero::Transfer, protocol::{ alice, - alice::{event_loop::EventLoopHandle, Message4, SwapResponse}, + alice::{event_loop::EventLoopHandle, SwapResponse, TransferProof}, SwapAmounts, }, }; @@ -132,7 +132,7 @@ where // Otherwise Alice might publish the lock tx twice! event_loop_handle - .send_message4(bob_peer_id, Message4 { + .send_transfer_proof(bob_peer_id, TransferProof { tx_lock_proof: transfer_proof, }) .await?; @@ -144,7 +144,7 @@ pub async fn wait_for_bitcoin_encrypted_signature( event_loop_handle: &mut EventLoopHandle, ) -> Result { let msg3 = event_loop_handle - .recv_message5() + .recv_encrypted_signature() .await .context("Failed to receive Bitcoin encrypted signature from Bob")?; diff --git a/swap/src/protocol/alice/message4.rs b/swap/src/protocol/alice/transfer_proof.rs similarity index 79% rename from swap/src/protocol/alice/message4.rs rename to swap/src/protocol/alice/transfer_proof.rs index 040a3f65..a9bc32b4 100644 --- a/swap/src/protocol/alice/message4.rs +++ b/swap/src/protocol/alice/transfer_proof.rs @@ -1,6 +1,6 @@ use crate::{ monero, - network::request_response::{Message4Protocol, OneShotCodec, Request, Response, TIMEOUT}, + network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, }; use libp2p::{ request_response::{ @@ -19,7 +19,7 @@ use std::{ use tracing::error; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message4 { +pub struct TransferProof { pub tx_lock_proof: monero::TransferProof, } @@ -28,19 +28,20 @@ pub enum OutEvent { Msg, } -/// A `NetworkBehaviour` that represents sending message 4 to Bob. +/// A `NetworkBehaviour` that represents sending the Monero transfer proof to +/// Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { - pub fn send(&mut self, bob: PeerId, msg: Message4) { - let msg = Request::Message4(Box::new(msg)); + pub fn send(&mut self, bob: PeerId, msg: TransferProof) { + let msg = Request::TransferProof(Box::new(msg)); let _id = self.rr.send_request(&bob, msg); } @@ -48,7 +49,7 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); @@ -67,7 +68,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(Message4Protocol, ProtocolSupport::Full)], + vec![(TransferProofProtocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -81,12 +82,12 @@ impl NetworkBehaviourEventProcess> for B RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. - } => panic!("Alice should never get a message 4 request from Bob"), + } => panic!("Alice should never get a transfer proof request from Bob"), RequestResponseEvent::Message { message: RequestResponseMessage::Response { response, .. }, .. } => { - if let Response::Message4 = response { + if let Response::TransferProof = response { self.events.push_back(OutEvent::Msg); } } diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 9361b02b..f4ca2e52 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -2,7 +2,6 @@ //! Bob holds BTC and wishes receive XMR. use crate::{ bitcoin, - bitcoin::EncryptedSignature, config::Config, database, database::Database, @@ -22,26 +21,26 @@ use tracing::{debug, info}; use uuid::Uuid; pub use self::{ + encrypted_signature::EncryptedSignature, event_loop::{EventLoop, EventLoopHandle}, message0::Message0, message1::Message1, message2::Message2, - message5::Message5, state::*, swap::{run, run_until}, swap_request::*, }; -use crate::protocol::alice::Message4; +use crate::protocol::alice::TransferProof; +mod encrypted_signature; pub mod event_loop; mod message0; mod message1; mod message2; -mod message4; -mod message5; pub mod state; pub mod swap; mod swap_request; +mod transfer_proof; pub struct Swap { pub state: BobState, @@ -213,8 +212,8 @@ pub enum OutEvent { Message0(Box), Message1(Box), Message2, - Message4(Box), - Message5, + TransferProof(Box), + EncryptedSignature, } impl From for OutEvent { @@ -257,18 +256,18 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: message4::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { match event { - message4::OutEvent::Msg(msg) => OutEvent::Message4(Box::new(msg)), + transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)), } } } -impl From for OutEvent { - fn from(event: message5::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { match event { - message5::OutEvent::Msg => OutEvent::Message5, + encrypted_signature::OutEvent::Msg => OutEvent::EncryptedSignature, } } } @@ -283,8 +282,8 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, - message4: message4::Behaviour, - message5: message5::Behaviour, + transfer_proof: transfer_proof::Behaviour, + encrypted_signature: encrypted_signature::Behaviour, } impl Behaviour { @@ -313,9 +312,13 @@ impl Behaviour { } /// Sends Bob's fourth message to Alice. - pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) { - let msg = Message5 { tx_redeem_encsig }; - self.message5.send(alice, msg); + pub fn send_encrypted_signature( + &mut self, + alice: PeerId, + tx_redeem_encsig: bitcoin::EncryptedSignature, + ) { + let msg = EncryptedSignature { tx_redeem_encsig }; + self.encrypted_signature.send(alice, msg); debug!("Sent Message3"); } diff --git a/swap/src/protocol/bob/message5.rs b/swap/src/protocol/bob/encrypted_signature.rs similarity index 76% rename from swap/src/protocol/bob/message5.rs rename to swap/src/protocol/bob/encrypted_signature.rs index 385fcfae..ab891feb 100644 --- a/swap/src/protocol/bob/message5.rs +++ b/swap/src/protocol/bob/encrypted_signature.rs @@ -1,6 +1,5 @@ -use crate::{ - bitcoin::EncryptedSignature, - network::request_response::{Message5Protocol, OneShotCodec, Request, Response, TIMEOUT}, +use crate::network::request_response::{ + EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, }; use libp2p::{ request_response::{ @@ -19,8 +18,8 @@ use std::{ use tracing::error; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message5 { - pub tx_redeem_encsig: EncryptedSignature, +pub struct EncryptedSignature { + pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, } #[derive(Debug, Copy, Clone)] @@ -28,19 +27,19 @@ pub enum OutEvent { Msg, } -/// A `NetworkBehaviour` that represents sending message 5 to Alice. +/// A `NetworkBehaviour` that represents sending encrypted signature to Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: Message5) { - let msg = Request::Message5(Box::new(msg)); + pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { + let msg = Request::EncryptedSignature(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } @@ -48,8 +47,9 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> - { + ) -> Poll< + NetworkBehaviourAction>, OutEvent>, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -67,7 +67,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(Message5Protocol, ProtocolSupport::Full)], + vec![(EncryptedSignatureProtocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -86,7 +86,7 @@ impl NetworkBehaviourEventProcess> for B message: RequestResponseMessage::Response { response, .. }, .. } => { - if let Response::Message5 = response { + if let Response::EncryptedSignature = response { self.events.push_back(OutEvent::Msg); } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 16409ea9..d379d03f 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,7 +3,7 @@ use crate::{ network::{transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::{Message4, SwapResponse}, + alice::{SwapResponse, TransferProof}, bob::{self, Behaviour, OutEvent, SwapRequest}, }, }; @@ -40,14 +40,14 @@ pub struct EventLoopHandle { swap_response: Receiver, msg0: Receiver, msg1: Receiver, - msg4: Receiver, + r_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, send_swap_request: Sender, send_msg0: Sender, send_msg1: Sender, send_msg2: Sender, - send_msg3: Sender, + s_encrypted_signature: Sender, } impl EventLoopHandle { @@ -72,11 +72,11 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) } - pub async fn recv_message4(&mut self) -> Result { - self.msg4 + pub async fn recv_transfer_proof(&mut self) -> Result { + self.r_transfer_proof .recv() .await - .ok_or_else(|| anyhow!("Failed to receive message 4 from Alice")) + .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) } /// Dials other party and wait for the connection to be established. @@ -114,7 +114,7 @@ impl EventLoopHandle { } pub async fn send_message3(&mut self, tx_redeem_encsig: EncryptedSignature) -> Result<()> { - let _ = self.send_msg3.send(tx_redeem_encsig).await?; + let _ = self.s_encrypted_signature.send(tx_redeem_encsig).await?; Ok(()) } } @@ -126,14 +126,14 @@ pub struct EventLoop { swap_response: Sender, msg0: Sender, msg1: Sender, - msg4: Sender, + r_transfer_proof: Sender, conn_established: Sender, dial_alice: Receiver<()>, send_swap_request: Receiver, send_msg0: Receiver, send_msg1: Receiver, send_msg2: Receiver, - send_msg3: Receiver, + s_encrypted_signature: Receiver, } impl EventLoop { @@ -155,14 +155,14 @@ impl EventLoop { let swap_response = Channels::new(); let msg0 = Channels::new(); let msg1 = Channels::new(); - let msg4 = Channels::new(); + let r_transfer_proof = Channels::new(); let conn_established = Channels::new(); let dial_alice = Channels::new(); let send_swap_request = Channels::new(); let send_msg0 = Channels::new(); let send_msg1 = Channels::new(); let send_msg2 = Channels::new(); - let send_msg3 = Channels::new(); + let s_encrypted_signature = Channels::new(); let event_loop = EventLoop { swarm, @@ -170,28 +170,28 @@ impl EventLoop { swap_response: swap_response.sender, msg0: msg0.sender, msg1: msg1.sender, - msg4: msg4.sender, + r_transfer_proof: r_transfer_proof.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, send_swap_request: send_swap_request.receiver, send_msg0: send_msg0.receiver, send_msg1: send_msg1.receiver, send_msg2: send_msg2.receiver, - send_msg3: send_msg3.receiver, + s_encrypted_signature: s_encrypted_signature.receiver, }; let handle = EventLoopHandle { swap_response: swap_response.receiver, msg0: msg0.receiver, msg1: msg1.receiver, - msg4: msg4.receiver, + r_transfer_proof: r_transfer_proof.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, send_swap_request: send_swap_request.sender, send_msg0: send_msg0.sender, send_msg1: send_msg1.sender, send_msg2: send_msg2.sender, - send_msg3: send_msg3.sender, + s_encrypted_signature: s_encrypted_signature.sender, }; Ok((event_loop, handle)) @@ -215,10 +215,10 @@ impl EventLoop { let _ = self.msg1.send(*msg).await; } OutEvent::Message2 => info!("Alice acknowledged message 2 received"), - OutEvent::Message4(msg) => { - let _ = self.msg4.send(*msg).await; + OutEvent::TransferProof(msg) => { + let _ = self.r_transfer_proof.send(*msg).await; } - OutEvent::Message5 => info!("Alice acknowledged message 5 received"), + OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"), } }, option = self.dial_alice.next().fuse() => { @@ -259,9 +259,9 @@ impl EventLoop { self.swarm.send_message2(self.alice_peer_id.clone(), msg); } }, - msg3 = self.send_msg3.next().fuse() => { - if let Some(tx_redeem_encsig) = msg3 { - self.swarm.send_message3(self.alice_peer_id.clone(), tx_redeem_encsig); + encrypted_signature = self.s_encrypted_signature.next().fuse() => { + if let Some(tx_redeem_encsig) = encrypted_signature { + self.swarm.send_encrypted_signature(self.alice_peer_id.clone(), tx_redeem_encsig); } } } diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 0498e81e..74f09ca2 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -9,14 +9,10 @@ use crate::{ config::Config, monero, monero::{monero_private_key, TransferProof}, - protocol::{alice, bob, bob::Message5, SwapAmounts}, + protocol::{alice, bob, bob::EncryptedSignature, SwapAmounts}, }; use anyhow::{anyhow, Result}; -use ecdsa_fun::{ - adaptor::{Adaptor, EncryptedSignature}, - nonce::Deterministic, - Signature, -}; +use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic, Signature}; use monero_harness::rpc::wallet::BlockHeight; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -244,7 +240,7 @@ pub struct State2 { pub punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, pub tx_cancel_sig_a: Signature, - pub tx_refund_encsig: EncryptedSignature, + pub tx_refund_encsig: bitcoin::EncryptedSignature, pub min_monero_confirmations: u32, } @@ -313,7 +309,7 @@ pub struct State3 { punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, pub tx_cancel_sig_a: Signature, - pub tx_refund_encsig: EncryptedSignature, + pub tx_refund_encsig: bitcoin::EncryptedSignature, pub min_monero_confirmations: u32, } @@ -433,19 +429,19 @@ pub struct State4 { punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, pub tx_cancel_sig_a: Signature, - pub tx_refund_encsig: EncryptedSignature, + pub tx_refund_encsig: bitcoin::EncryptedSignature, pub monero_wallet_restore_blockheight: u32, } impl State4 { - pub fn next_message(&self) -> Message5 { + pub fn next_message(&self) -> EncryptedSignature { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()); - Message5 { tx_redeem_encsig } + EncryptedSignature { tx_redeem_encsig } } - pub fn tx_redeem_encsig(&self) -> EncryptedSignature { + pub fn tx_redeem_encsig(&self) -> bitcoin::EncryptedSignature { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()) } @@ -615,7 +611,7 @@ pub struct State5 { pub redeem_address: bitcoin::Address, punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, - tx_refund_encsig: EncryptedSignature, + tx_refund_encsig: bitcoin::EncryptedSignature, tx_cancel_sig: Signature, pub monero_wallet_restore_blockheight: u32, } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 2f1e21bd..5e13fad9 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -130,7 +130,7 @@ where { event_loop_handle.dial().await?; - let msg4_watcher = event_loop_handle.recv_message4(); + let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); @@ -143,11 +143,11 @@ where monero_wallet.inner.block_height().await?; select! { - msg4 = msg4_watcher => { - let msg4 = msg4?; + transfer_proof = transfer_proof_watcher => { + let transfer_proof = transfer_proof?; BobState::XmrLockProofReceived { state: state3, - lock_transfer_proof: msg4.tx_lock_proof, + lock_transfer_proof: transfer_proof.tx_lock_proof, monero_wallet_restore_blockheight } }, diff --git a/swap/src/protocol/bob/message4.rs b/swap/src/protocol/bob/transfer_proof.rs similarity index 82% rename from swap/src/protocol/bob/message4.rs rename to swap/src/protocol/bob/transfer_proof.rs index 80a1866d..ee6fc147 100644 --- a/swap/src/protocol/bob/message4.rs +++ b/swap/src/protocol/bob/transfer_proof.rs @@ -1,6 +1,6 @@ use crate::{ - network::request_response::{Message4Protocol, OneShotCodec, Request, Response, TIMEOUT}, - protocol::alice::Message4, + network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, + protocol::alice::TransferProof, }; use libp2p::{ request_response::{ @@ -19,15 +19,16 @@ use tracing::{debug, error}; #[derive(Debug)] pub enum OutEvent { - Msg(Message4), + Msg(TransferProof), } -/// A `NetworkBehaviour` that represents receiving of message 4 from Alice. +/// A `NetworkBehaviour` that represents receiving the transfer proof from +/// Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -37,7 +38,7 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); @@ -56,7 +57,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(Message4Protocol, ProtocolSupport::Full)], + vec![(TransferProofProtocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -74,11 +75,11 @@ impl NetworkBehaviourEventProcess> for B }, .. } => { - if let Request::Message4(msg) = request { - debug!("Received message 4"); + if let Request::TransferProof(msg) = request { + debug!("Received Transfer Proof"); self.events.push_back(OutEvent::Msg(*msg)); // Send back empty response so that the request/response protocol completes. - let _ = self.rr.send_response(channel, Response::Message4); + let _ = self.rr.send_response(channel, Response::TransferProof); } } RequestResponseEvent::Message { From b62ef9c2d93afab6fe4992368da9af7992c10a01 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 27 Jan 2021 14:25:45 +1100 Subject: [PATCH 08/10] Harmonizing naming --- swap/src/protocol/alice/event_loop.rs | 98 ++++++++++----------- swap/src/protocol/bob/event_loop.rs | 119 +++++++++++++------------- swap/src/protocol/bob/swap.rs | 3 +- 3 files changed, 112 insertions(+), 108 deletions(-) diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 424a3c74..b4168788 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -36,16 +36,16 @@ impl Default for Channels { #[derive(Debug)] pub struct EventLoopHandle { - msg0: Receiver<(bob::Message0, ResponseChannel)>, - msg1: Receiver<(bob::Message1, ResponseChannel)>, - msg2: Receiver, - r_encrypted_signature: Receiver, + recv_message0: Receiver<(bob::Message0, ResponseChannel)>, + recv_message1: Receiver<(bob::Message1, ResponseChannel)>, + recv_message2: Receiver, + recv_encrypted_signature: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, - send_msg0: Sender<(ResponseChannel, alice::Message0)>, - send_msg1: Sender<(ResponseChannel, alice::Message1)>, - s_transfer_proof: Sender<(PeerId, TransferProof)>, + send_message0: Sender<(ResponseChannel, alice::Message0)>, + send_message1: Sender<(ResponseChannel, alice::Message1)>, + send_transfer_proof: Sender<(PeerId, TransferProof)>, } impl EventLoopHandle { @@ -57,28 +57,28 @@ impl EventLoopHandle { } pub async fn recv_message0(&mut self) -> Result<(bob::Message0, ResponseChannel)> { - self.msg0 + self.recv_message0 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 0 from Bob")) } pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel)> { - self.msg1 + self.recv_message1 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob")) } pub async fn recv_message2(&mut self) -> Result { - self.msg2 + self.recv_message2 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) } pub async fn recv_encrypted_signature(&mut self) -> Result { - self.r_encrypted_signature + self.recv_encrypted_signature .recv() .await .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) @@ -110,7 +110,7 @@ impl EventLoopHandle { channel: ResponseChannel, msg: alice::Message0, ) -> Result<()> { - let _ = self.send_msg0.send((channel, msg)).await?; + let _ = self.send_message0.send((channel, msg)).await?; Ok(()) } @@ -119,12 +119,12 @@ impl EventLoopHandle { channel: ResponseChannel, msg: alice::Message1, ) -> Result<()> { - let _ = self.send_msg1.send((channel, msg)).await?; + let _ = self.send_message1.send((channel, msg)).await?; Ok(()) } pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { - let _ = self.s_transfer_proof.send((bob, msg)).await?; + let _ = self.send_transfer_proof.send((bob, msg)).await?; Ok(()) } } @@ -132,16 +132,16 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, - msg0: Sender<(bob::Message0, ResponseChannel)>, - msg1: Sender<(bob::Message1, ResponseChannel)>, - msg2: Sender, - r_encrypted_signature: Sender, + recv_message0: Sender<(bob::Message0, ResponseChannel)>, + recv_message1: Sender<(bob::Message1, ResponseChannel)>, + recv_message2: Sender, + recv_encrypted_signature: Sender, request: Sender, conn_established: Sender, send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, - send_msg0: Receiver<(ResponseChannel, alice::Message0)>, - send_msg1: Receiver<(ResponseChannel, alice::Message1)>, - s_transfer_proof: Receiver<(PeerId, TransferProof)>, + send_message0: Receiver<(ResponseChannel, alice::Message0)>, + send_message1: Receiver<(ResponseChannel, alice::Message1)>, + send_transfer_proof: Receiver<(PeerId, TransferProof)>, } impl EventLoop { @@ -160,42 +160,42 @@ impl EventLoop { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Address is not supported: {:#}", listen))?; - let msg0 = Channels::new(); - let msg1 = Channels::new(); - let msg2 = Channels::new(); - let r_encrypted_signature = Channels::new(); + let recv_message0 = Channels::new(); + let recv_message1 = Channels::new(); + let recv_message2 = Channels::new(); + let recv_encrypted_signature = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); let send_swap_response = Channels::new(); - let send_msg0 = Channels::new(); - let send_msg1 = Channels::new(); - let s_transfer_proof = Channels::new(); + let send_message0 = Channels::new(); + let send_message1 = Channels::new(); + let send_transfer_proof = Channels::new(); let driver = EventLoop { swarm, - msg0: msg0.sender, - msg1: msg1.sender, - msg2: msg2.sender, - r_encrypted_signature: r_encrypted_signature.sender, + recv_message0: recv_message0.sender, + recv_message1: recv_message1.sender, + recv_message2: recv_message2.sender, + recv_encrypted_signature: recv_encrypted_signature.sender, request: request.sender, conn_established: conn_established.sender, send_swap_response: send_swap_response.receiver, - send_msg0: send_msg0.receiver, - send_msg1: send_msg1.receiver, - s_transfer_proof: s_transfer_proof.receiver, + send_message0: send_message0.receiver, + send_message1: send_message1.receiver, + send_transfer_proof: send_transfer_proof.receiver, }; let handle = EventLoopHandle { - msg0: msg0.receiver, - msg1: msg1.receiver, - msg2: msg2.receiver, - r_encrypted_signature: r_encrypted_signature.receiver, + recv_message0: recv_message0.receiver, + recv_message1: recv_message1.receiver, + recv_message2: recv_message2.receiver, + recv_encrypted_signature: recv_encrypted_signature.receiver, request: request.receiver, conn_established: conn_established.receiver, send_swap_response: send_swap_response.sender, - send_msg0: send_msg0.sender, - send_msg1: send_msg1.sender, - s_transfer_proof: s_transfer_proof.sender, + send_message0: send_message0.sender, + send_message1: send_message1.sender, + send_transfer_proof: send_transfer_proof.sender, }; Ok((driver, handle)) @@ -210,17 +210,17 @@ impl EventLoop { let _ = self.conn_established.send(alice).await; } OutEvent::Message0 { msg, channel } => { - let _ = self.msg0.send((*msg, channel)).await; + let _ = self.recv_message0.send((*msg, channel)).await; } OutEvent::Message1 { msg, channel } => { - let _ = self.msg1.send((msg, channel)).await; + let _ = self.recv_message1.send((msg, channel)).await; } OutEvent::Message2 { msg, bob_peer_id : _} => { - let _ = self.msg2.send(*msg).await; + let _ = self.recv_message2.send(*msg).await; } OutEvent::TransferProof => trace!("Bob ack'd receiving the transfer proof"), OutEvent::EncryptedSignature(msg) => { - let _ = self.r_encrypted_signature.send(msg).await; + let _ = self.recv_encrypted_signature.send(msg).await; } OutEvent::Request(event) => { let _ = self.request.send(*event).await; @@ -232,17 +232,17 @@ impl EventLoop { self.swarm.send_swap_response(channel, swap_response); } }, - msg0 = self.send_msg0.next().fuse() => { + msg0 = self.send_message0.next().fuse() => { if let Some((channel, msg)) = msg0 { self.swarm.send_message0(channel, msg); } }, - msg1 = self.send_msg1.next().fuse() => { + msg1 = self.send_message1.next().fuse() => { if let Some((channel, msg)) = msg1 { self.swarm.send_message1(channel, msg); } }, - transfer_proof = self.s_transfer_proof.next().fuse() => { + transfer_proof = self.send_transfer_proof.next().fuse() => { if let Some((bob_peer_id, msg)) = transfer_proof { self.swarm.send_transfer_proof(bob_peer_id, msg); } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index d379d03f..eb117432 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -37,43 +37,43 @@ impl Default for Channels { #[derive(Debug)] pub struct EventLoopHandle { - swap_response: Receiver, - msg0: Receiver, - msg1: Receiver, - r_transfer_proof: Receiver, + recv_swap_response: Receiver, + recv_message0: Receiver, + recv_message1: Receiver, + recv_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, send_swap_request: Sender, - send_msg0: Sender, - send_msg1: Sender, - send_msg2: Sender, - s_encrypted_signature: Sender, + send_message0: Sender, + send_message1: Sender, + send_message2: Sender, + send_encrypted_signature: Sender, } impl EventLoopHandle { pub async fn recv_swap_response(&mut self) -> Result { - self.swap_response + self.recv_swap_response .recv() .await .ok_or_else(|| anyhow!("Failed to receive swap response from Alice")) } pub async fn recv_message0(&mut self) -> Result { - self.msg0 + self.recv_message0 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 0 from Alice")) } pub async fn recv_message1(&mut self) -> Result { - self.msg1 + self.recv_message1 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) } pub async fn recv_transfer_proof(&mut self) -> Result { - self.r_transfer_proof + self.recv_transfer_proof .recv() .await .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) @@ -99,22 +99,25 @@ impl EventLoopHandle { } pub async fn send_message0(&mut self, msg: bob::Message0) -> Result<()> { - let _ = self.send_msg0.send(msg).await?; + let _ = self.send_message0.send(msg).await?; Ok(()) } pub async fn send_message1(&mut self, msg: bob::Message1) -> Result<()> { - let _ = self.send_msg1.send(msg).await?; + let _ = self.send_message1.send(msg).await?; Ok(()) } pub async fn send_message2(&mut self, msg: bob::Message2) -> Result<()> { - let _ = self.send_msg2.send(msg).await?; + let _ = self.send_message2.send(msg).await?; Ok(()) } - pub async fn send_message3(&mut self, tx_redeem_encsig: EncryptedSignature) -> Result<()> { - let _ = self.s_encrypted_signature.send(tx_redeem_encsig).await?; + pub async fn send_encrypted_signature( + &mut self, + tx_redeem_encsig: EncryptedSignature, + ) -> Result<()> { + let _ = self.send_encrypted_signature.send(tx_redeem_encsig).await?; Ok(()) } } @@ -123,17 +126,17 @@ impl EventLoopHandle { pub struct EventLoop { swarm: libp2p::Swarm, alice_peer_id: PeerId, - swap_response: Sender, - msg0: Sender, - msg1: Sender, - r_transfer_proof: Sender, - conn_established: Sender, + recv_swap_response: Sender, + recv_message0: Sender, + recv_message1: Sender, + recv_transfer_proof: Sender, dial_alice: Receiver<()>, + conn_established: Sender, send_swap_request: Receiver, - send_msg0: Receiver, - send_msg1: Receiver, - send_msg2: Receiver, - s_encrypted_signature: Receiver, + send_message0: Receiver, + send_message1: Receiver, + send_message2: Receiver, + send_encrypted_signature: Receiver, } impl EventLoop { @@ -153,45 +156,45 @@ impl EventLoop { swarm.add_address(alice_peer_id.clone(), alice_addr); let swap_response = Channels::new(); - let msg0 = Channels::new(); - let msg1 = Channels::new(); - let r_transfer_proof = Channels::new(); - let conn_established = Channels::new(); + let recv_message0 = Channels::new(); + let recv_message1 = Channels::new(); + let recv_transfer_proof = Channels::new(); let dial_alice = Channels::new(); + let conn_established = Channels::new(); let send_swap_request = Channels::new(); - let send_msg0 = Channels::new(); - let send_msg1 = Channels::new(); - let send_msg2 = Channels::new(); - let s_encrypted_signature = Channels::new(); + let send_message0 = Channels::new(); + let send_message1 = Channels::new(); + let send_message2 = Channels::new(); + let send_encrypted_signature = Channels::new(); let event_loop = EventLoop { swarm, alice_peer_id, - swap_response: swap_response.sender, - msg0: msg0.sender, - msg1: msg1.sender, - r_transfer_proof: r_transfer_proof.sender, + recv_swap_response: swap_response.sender, + recv_message0: recv_message0.sender, + recv_message1: recv_message1.sender, + recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, send_swap_request: send_swap_request.receiver, - send_msg0: send_msg0.receiver, - send_msg1: send_msg1.receiver, - send_msg2: send_msg2.receiver, - s_encrypted_signature: s_encrypted_signature.receiver, + send_message0: send_message0.receiver, + send_message1: send_message1.receiver, + send_message2: send_message2.receiver, + send_encrypted_signature: send_encrypted_signature.receiver, }; let handle = EventLoopHandle { - swap_response: swap_response.receiver, - msg0: msg0.receiver, - msg1: msg1.receiver, - r_transfer_proof: r_transfer_proof.receiver, + recv_swap_response: swap_response.receiver, + recv_message0: recv_message0.receiver, + recv_message1: recv_message1.receiver, + recv_transfer_proof: recv_transfer_proof.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, send_swap_request: send_swap_request.sender, - send_msg0: send_msg0.sender, - send_msg1: send_msg1.sender, - send_msg2: send_msg2.sender, - s_encrypted_signature: s_encrypted_signature.sender, + send_message0: send_message0.sender, + send_message1: send_message1.sender, + send_message2: send_message2.sender, + send_encrypted_signature: send_encrypted_signature.sender, }; Ok((event_loop, handle)) @@ -206,17 +209,17 @@ impl EventLoop { let _ = self.conn_established.send(peer_id).await; } OutEvent::SwapResponse(msg) => { - let _ = self.swap_response.send(msg).await; + let _ = self.recv_swap_response.send(msg).await; }, OutEvent::Message0(msg) => { - let _ = self.msg0.send(*msg).await; + let _ = self.recv_message0.send(*msg).await; } OutEvent::Message1(msg) => { - let _ = self.msg1.send(*msg).await; + let _ = self.recv_message1.send(*msg).await; } OutEvent::Message2 => info!("Alice acknowledged message 2 received"), OutEvent::TransferProof(msg) => { - let _ = self.r_transfer_proof.send(*msg).await; + let _ = self.recv_transfer_proof.send(*msg).await; } OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"), } @@ -243,23 +246,23 @@ impl EventLoop { } }, - msg0 = self.send_msg0.next().fuse() => { + msg0 = self.send_message0.next().fuse() => { if let Some(msg) = msg0 { self.swarm.send_message0(self.alice_peer_id.clone(), msg); } } - msg1 = self.send_msg1.next().fuse() => { + msg1 = self.send_message1.next().fuse() => { if let Some(msg) = msg1 { self.swarm.send_message1(self.alice_peer_id.clone(), msg); } }, - msg2 = self.send_msg2.next().fuse() => { + msg2 = self.send_message2.next().fuse() => { if let Some(msg) = msg2 { self.swarm.send_message2(self.alice_peer_id.clone(), msg); } }, - encrypted_signature = self.s_encrypted_signature.next().fuse() => { + encrypted_signature = self.send_encrypted_signature.next().fuse() => { if let Some(tx_redeem_encsig) = encrypted_signature { self.swarm.send_encrypted_signature(self.alice_peer_id.clone(), tx_redeem_encsig); } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 5e13fad9..767372be 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -233,7 +233,8 @@ where let state4_clone = state.clone(); - let enc_sig_sent_watcher = event_loop_handle.send_message3(tx_redeem_encsig); + let enc_sig_sent_watcher = + event_loop_handle.send_encrypted_signature(tx_redeem_encsig); let bitcoin_wallet = bitcoin_wallet.clone(); let cancel_timelock_expires = state4_clone.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); From 2929a8f1013518b23097121f658aebb832935b6d Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 27 Jan 2021 14:27:59 +1100 Subject: [PATCH 09/10] Use the correct protocol support --- swap/src/protocol/alice/encrypted_signature.rs | 2 +- swap/src/protocol/alice/transfer_proof.rs | 2 +- swap/src/protocol/bob/encrypted_signature.rs | 2 +- swap/src/protocol/bob/swap_request.rs | 2 +- swap/src/protocol/bob/transfer_proof.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs index b62cf1de..ca1ea7f3 100644 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -60,7 +60,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(EncryptedSignatureProtocol, ProtocolSupport::Full)], + vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], config, ), events: Default::default(), diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs index a9bc32b4..b9f379a9 100644 --- a/swap/src/protocol/alice/transfer_proof.rs +++ b/swap/src/protocol/alice/transfer_proof.rs @@ -68,7 +68,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(TransferProofProtocol, ProtocolSupport::Full)], + vec![(TransferProofProtocol, ProtocolSupport::Outbound)], config, ), events: Default::default(), diff --git a/swap/src/protocol/bob/encrypted_signature.rs b/swap/src/protocol/bob/encrypted_signature.rs index ab891feb..b3d2815a 100644 --- a/swap/src/protocol/bob/encrypted_signature.rs +++ b/swap/src/protocol/bob/encrypted_signature.rs @@ -67,7 +67,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(EncryptedSignatureProtocol, ProtocolSupport::Full)], + vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], config, ), events: Default::default(), diff --git a/swap/src/protocol/bob/swap_request.rs b/swap/src/protocol/bob/swap_request.rs index 5f032081..b6a16c97 100644 --- a/swap/src/protocol/bob/swap_request.rs +++ b/swap/src/protocol/bob/swap_request.rs @@ -71,7 +71,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( Codec::default(), - vec![(Swap, ProtocolSupport::Full)], + vec![(Swap, ProtocolSupport::Outbound)], config, ), events: Default::default(), diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs index ee6fc147..7f9d9565 100644 --- a/swap/src/protocol/bob/transfer_proof.rs +++ b/swap/src/protocol/bob/transfer_proof.rs @@ -57,7 +57,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( OneShotCodec::default(), - vec![(TransferProofProtocol, ProtocolSupport::Full)], + vec![(TransferProofProtocol, ProtocolSupport::Inbound)], config, ), events: Default::default(), From 2073e88683fb6df78b772d1ff349602ca62474c6 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 27 Jan 2021 14:33:43 +1100 Subject: [PATCH 10/10] Remove wrapping if `ReadOneError` when unnecessary --- swap/src/network/request_response.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index fa5158d5..29681595 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -2,7 +2,7 @@ use crate::protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature} use async_trait::async_trait; use futures::prelude::*; use libp2p::{ - core::upgrade, + core::{upgrade, upgrade::ReadOneError}, request_response::{ProtocolName, RequestResponseCodec}, }; use serde::{Deserialize, Serialize}; @@ -213,9 +213,10 @@ where T: AsyncRead + Unpin + Send, { debug!("enter read_request"); - let message = upgrade::read_one(io, BUF_SIZE) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let message = upgrade::read_one(io, BUF_SIZE).await.map_err(|e| match e { + ReadOneError::Io(err) => err, + e => io::Error::new(io::ErrorKind::Other, e), + })?; let mut de = serde_cbor::Deserializer::from_slice(&message); let msg = Request::deserialize(&mut de).map_err(|e| { tracing::debug!("serde read_request error: {:?}", e);