diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index ab721f84..9a7ac4dd 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -1,14 +1,10 @@ -use crate::protocol::{ - alice::{SwapResponse, TransferProof}, - bob::{EncryptedSignature, SwapRequest}, -}; use async_trait::async_trait; use futures::prelude::*; use libp2p::{ core::{upgrade, upgrade::ReadOneError}, request_response::{ProtocolName, RequestResponseCodec}, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use std::{fmt::Debug, io, marker::PhantomData}; /// Time to wait for a response back once we send a request. @@ -17,20 +13,6 @@ pub const TIMEOUT: u64 = 3600; // One hour. /// Message receive buffer. pub const BUF_SIZE: usize = 1024 * 1024; -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Request { - SwapRequest(Box), - TransferProof(Box), - EncryptedSignature(Box), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Response { - SwapResponse(Box), - TransferProof, - EncryptedSignature, -} - #[derive(Debug, Clone, Copy, Default)] pub struct Swap; @@ -58,19 +40,29 @@ impl ProtocolName for EncryptedSignatureProtocol { } } -#[derive(Clone, Copy, Debug, Default)] -pub struct CborCodec

{ - phantom: PhantomData

, +#[derive(Clone, Copy, Debug)] +pub struct CborCodec { + phantom: PhantomData<(P, Req, Res)>, +} + +impl Default for CborCodec { + fn default() -> Self { + Self { + phantom: PhantomData::default(), + } + } } #[async_trait] -impl

RequestResponseCodec for CborCodec

+impl RequestResponseCodec for CborCodec where - P: Send + Sync + Clone + ProtocolName, + P: ProtocolName + Send + Sync + Clone, + Req: DeserializeOwned + Serialize + Send, + Res: DeserializeOwned + Serialize + Send, { type Protocol = P; - type Request = Request; - type Response = Response; + type Request = Req; + type Response = Res; async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result where @@ -81,7 +73,7 @@ where e => io::Error::new(io::ErrorKind::Other, e), })?; let mut de = serde_cbor::Deserializer::from_slice(&message); - let msg = Request::deserialize(&mut de).map_err(|e| { + let msg = Req::deserialize(&mut de).map_err(|e| { tracing::debug!("serde read_request error: {:?}", e); io::Error::new(io::ErrorKind::Other, e) })?; @@ -101,7 +93,7 @@ where .await .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; let mut de = serde_cbor::Deserializer::from_slice(&message); - let msg = Response::deserialize(&mut de).map_err(|e| { + let msg = Res::deserialize(&mut de).map_err(|e| { tracing::debug!("serde read_response error: {:?}", e); io::Error::new(io::ErrorKind::InvalidData, e) })?; diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index dffafc85..392bacfa 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -30,7 +30,6 @@ pub use self::{ swap_response::*, transfer_proof::TransferProof, }; -use crate::network::request_response::Response; pub use execution_setup::Message3; mod encrypted_signature; @@ -280,7 +279,7 @@ impl Behaviour { /// Alice always sends her messages as a response to a request from Bob. pub fn send_swap_response( &mut self, - channel: ResponseChannel, + channel: ResponseChannel, swap_response: SwapResponse, ) -> Result<()> { self.amounts.send(channel, swap_response)?; diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs index 1c7cf0eb..40192f2a 100644 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -1,7 +1,5 @@ use crate::{ - network::request_response::{ - CborCodec, EncryptedSignatureProtocol, Request, Response, TIMEOUT, - }, + network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}, protocol::bob::EncryptedSignature, }; use libp2p::{ @@ -30,7 +28,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -41,7 +39,10 @@ impl Behaviour { _: &mut Context<'_>, _: &mut impl PollParameters, ) -> Poll< - NetworkBehaviourAction>, OutEvent>, + NetworkBehaviourAction< + RequestProtocol>, + OutEvent, + >, > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); @@ -68,8 +69,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: @@ -78,14 +79,11 @@ impl NetworkBehaviourEventProcess> for B }, .. } => { - if let Request::EncryptedSignature(msg) = request { - debug!("Received encrypted signature"); - self.events.push_back(OutEvent::Msg(*msg)); - // Send back empty response so that the request/response protocol completes. - if let Err(error) = self.rr.send_response(channel, Response::EncryptedSignature) - { - error!("Failed to send Encrypted Signature ack: {:?}", error); - } + debug!("Received encrypted signature"); + self.events.push_back(OutEvent::Msg(request)); + // Send back empty response so that the request/response protocol completes. + if let Err(error) = self.rr.send_response(channel, ()) { + error!("Failed to send Encrypted Signature ack: {:?}", error); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 01af9288..1ee99764 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,5 +1,5 @@ use crate::{ - network::{request_response::Response, transport::SwapTransport, TokioExecutor}, + network::{transport::SwapTransport, TokioExecutor}, protocol::{ alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof}, bob::EncryptedSignature, @@ -37,7 +37,7 @@ pub struct EventLoopHandle { recv_encrypted_signature: Receiver, request: Receiver, conn_established: Receiver, - send_swap_response: Sender<(ResponseChannel, SwapResponse)>, + send_swap_response: Sender<(ResponseChannel, SwapResponse)>, start_execution_setup: Sender<(PeerId, State0)>, send_transfer_proof: Sender<(PeerId, TransferProof)>, recv_transfer_proof_ack: Receiver<()>, @@ -81,7 +81,7 @@ impl EventLoopHandle { pub async fn send_swap_response( &mut self, - channel: ResponseChannel, + channel: ResponseChannel, swap_response: SwapResponse, ) -> Result<()> { let _ = self @@ -110,7 +110,7 @@ pub struct EventLoop { recv_encrypted_signature: Sender, request: Sender, conn_established: Sender, - send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, + send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, send_transfer_proof: Receiver<(PeerId, TransferProof)>, recv_transfer_proof_ack: Sender<()>, } diff --git a/swap/src/protocol/alice/swap_response.rs b/swap/src/protocol/alice/swap_response.rs index be67f7a6..650ba4b0 100644 --- a/swap/src/protocol/alice/swap_response.rs +++ b/swap/src/protocol/alice/swap_response.rs @@ -1,7 +1,7 @@ use crate::{ monero, - network::request_response::{CborCodec, Request, Response, Swap, TIMEOUT}, - protocol::bob, + network::request_response::{CborCodec, Swap, TIMEOUT}, + protocol::bob::SwapRequest, }; use anyhow::{anyhow, Result}; use libp2p::{ @@ -22,8 +22,8 @@ use tracing::{debug, error}; #[derive(Debug)] pub struct OutEvent { - pub msg: bob::SwapRequest, - pub channel: ResponseChannel, + pub msg: SwapRequest, + pub channel: ResponseChannel, } #[derive(Copy, Clone, Debug, Serialize, Deserialize)] @@ -37,15 +37,18 @@ pub struct SwapResponse { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { /// Alice always sends her messages as a response to a request from Bob. - pub fn send(&mut self, channel: ResponseChannel, msg: SwapResponse) -> Result<()> { - let msg = Response::SwapResponse(Box::new(msg)); + pub fn send( + &mut self, + channel: ResponseChannel, + msg: SwapResponse, + ) -> Result<()> { self.rr .send_response(channel, msg) .map_err(|_| anyhow!("Sending swap response failed")) @@ -55,7 +58,12 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll< + NetworkBehaviourAction< + RequestProtocol>, + OutEvent, + >, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -82,20 +90,22 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Request { request, channel, .. }, .. } => { - if let Request::SwapRequest(msg) = request { - debug!("Received swap request"); - self.events.push_back(OutEvent { msg: *msg, channel }) - } + debug!("Received swap request from {}", peer); + self.events.push_back(OutEvent { + msg: request, + channel, + }) } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs index c37b4f0b..184eab25 100644 --- a/swap/src/protocol/alice/transfer_proof.rs +++ b/swap/src/protocol/alice/transfer_proof.rs @@ -1,6 +1,6 @@ use crate::{ monero, - network::request_response::{CborCodec, Request, Response, TransferProofProtocol, TIMEOUT}, + network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}, }; use libp2p::{ request_response::{ @@ -34,14 +34,13 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { pub fn send(&mut self, bob: PeerId, msg: TransferProof) { - let msg = Request::TransferProof(Box::new(msg)); let _id = self.rr.send_request(&bob, msg); } @@ -49,8 +48,12 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> - { + ) -> Poll< + NetworkBehaviourAction< + RequestProtocol>, + OutEvent, + >, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -76,20 +79,18 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. } => panic!("Alice should never get a transfer proof request from Bob"), RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, + message: RequestResponseMessage::Response { .. }, .. } => { - if let Response::TransferProof = response { - self.events.push_back(OutEvent::Acknowledged); - } + self.events.push_back(OutEvent::Acknowledged); } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); diff --git a/swap/src/protocol/bob/encrypted_signature.rs b/swap/src/protocol/bob/encrypted_signature.rs index c304d48a..c8353c4f 100644 --- a/swap/src/protocol/bob/encrypted_signature.rs +++ b/swap/src/protocol/bob/encrypted_signature.rs @@ -1,6 +1,4 @@ -use crate::network::request_response::{ - CborCodec, EncryptedSignatureProtocol, Request, Response, TIMEOUT, -}; +use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; use libp2p::{ request_response::{ handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, @@ -32,14 +30,13 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { - let msg = Request::EncryptedSignature(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } @@ -48,7 +45,10 @@ impl Behaviour { _: &mut Context<'_>, _: &mut impl PollParameters, ) -> Poll< - NetworkBehaviourAction>, OutEvent>, + NetworkBehaviourAction< + RequestProtocol>, + OutEvent, + >, > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); @@ -75,20 +75,18 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. } => panic!("Bob should never get a request from Alice"), RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, + message: RequestResponseMessage::Response { .. }, .. } => { - if let Response::EncryptedSignature = response { - self.events.push_back(OutEvent::Acknowledged); - } + self.events.push_back(OutEvent::Acknowledged); } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); diff --git a/swap/src/protocol/bob/swap_request.rs b/swap/src/protocol/bob/swap_request.rs index c57790d0..95fde3f2 100644 --- a/swap/src/protocol/bob/swap_request.rs +++ b/swap/src/protocol/bob/swap_request.rs @@ -1,5 +1,5 @@ use crate::{ - network::request_response::{CborCodec, Request, Response, Swap, TIMEOUT}, + network::request_response::{CborCodec, Swap, TIMEOUT}, protocol::alice::SwapResponse, }; use anyhow::Result; @@ -35,15 +35,14 @@ pub struct OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result { - let msg = Request::SwapRequest(Box::new(swap_request)); - let id = self.rr.send_request(&alice, msg); + let id = self.rr.send_request(&alice, swap_request); Ok(id) } @@ -52,7 +51,12 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll< + NetworkBehaviourAction< + RequestProtocol>, + OutEvent, + >, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -79,8 +83,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, @@ -90,12 +94,10 @@ impl NetworkBehaviourEventProcess> for B message: RequestResponseMessage::Response { response, .. }, .. } => { - if let Response::SwapResponse(swap_response) = response { - debug!("Received swap response"); - self.events.push_back(OutEvent { - swap_response: *swap_response, - }); - } + debug!("Received swap response"); + self.events.push_back(OutEvent { + swap_response: response, + }); } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs index 6746d7cc..8306b040 100644 --- a/swap/src/protocol/bob/transfer_proof.rs +++ b/swap/src/protocol/bob/transfer_proof.rs @@ -1,5 +1,5 @@ use crate::{ - network::request_response::{CborCodec, Request, Response, TransferProofProtocol, TIMEOUT}, + network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}, protocol::alice::TransferProof, }; use libp2p::{ @@ -28,7 +28,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -38,8 +38,12 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> - { + ) -> Poll< + NetworkBehaviourAction< + RequestProtocol>, + OutEvent, + >, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -65,8 +69,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: @@ -75,15 +79,13 @@ impl NetworkBehaviourEventProcess> for B }, .. } => { - if let Request::TransferProof(msg) = request { - debug!("Received Transfer Proof"); - self.events.push_back(OutEvent::Msg(*msg)); - // Send back empty response so that the request/response protocol completes. - let _ = self - .rr - .send_response(channel, Response::TransferProof) - .map_err(|err| error!("Failed to send message 3: {:?}", err)); - } + debug!("Received Transfer Proof"); + self.events.push_back(OutEvent::Msg(request)); + // Send back empty response so that the request/response protocol completes. + let _ = self + .rr + .send_response(channel, ()) + .map_err(|err| error!("Failed to send message 3: {:?}", err)); } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. },