From edb93624f39f6a8f37d53db746a94ae5d4491859 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 22 Jan 2021 11:05:46 +1100 Subject: [PATCH] Introduce one shot code To allow alice to be the requester for message 4. --- swap/src/network/request_response.rs | 106 ++++++++++++++++++++++++++- swap/src/protocol/alice/message5.rs | 24 +++--- swap/src/protocol/bob/message5.rs | 20 ++--- 3 files changed, 126 insertions(+), 24 deletions(-) diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 3d9c5b9d..68e9d803 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -19,13 +19,14 @@ const BUF_SIZE: usize = 1024 * 1024; // Codec for each Message and a macro that implements them. /// Messages Bob sends to Alice. +// TODO: Remove this once network changes are done +#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BobToAlice { SwapRequest(bob::SwapRequest), Message0(Box), Message1(bob::Message1), Message2(bob::Message2), - Message5(Message5), } /// Messages Alice sends to Bob. @@ -35,7 +36,19 @@ pub enum AliceToBob { Message0(Box), Message1(Box), Message2(alice::Message2), - Message5, // empty response +} + +/// Messages sent from one party to the other. +/// All responses are empty +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Request { + Message5(Message5), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +/// Response are only used for acknowledgement purposes. +pub enum Response { + Message5, } #[derive(Debug, Clone, Copy, Default)] @@ -171,3 +184,92 @@ where Ok(()) } } + +#[derive(Clone, Copy, Debug, Default)] +pub struct OneShotCodec

{ + phantom: PhantomData

, +} + +#[async_trait] +impl

RequestResponseCodec for OneShotCodec

+where + P: Send + Sync + Clone + ProtocolName, +{ + type Protocol = P; + type Request = Request; + type Response = Response; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + debug!("enter read_request"); + let message = upgrade::read_one(io, BUF_SIZE) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = Request::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_request error: {:?}", e); + io::Error::new(io::ErrorKind::Other, e) + })?; + + Ok(msg) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + debug!("enter read_response"); + let message = upgrade::read_one(io, BUF_SIZE) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = Response::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_response error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + + Ok(msg) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let bytes = + serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + debug!("enter write_response"); + let bytes = serde_cbor::to_vec(&res).map_err(|e| { + tracing::debug!("serde write_reponse error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } +} diff --git a/swap/src/protocol/alice/message5.rs b/swap/src/protocol/alice/message5.rs index fd6d40d4..00cc7d8f 100644 --- a/swap/src/protocol/alice/message5.rs +++ b/swap/src/protocol/alice/message5.rs @@ -1,5 +1,5 @@ use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message5Protocol, TIMEOUT}, + network::request_response::{Message5Protocol, OneShotCodec, Request, Response, TIMEOUT}, protocol::bob::Message5, }; use libp2p::{ @@ -27,7 +27,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -37,7 +37,8 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll>, OutEvent>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -54,7 +55,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), + OneShotCodec::default(), vec![(Message5Protocol, ProtocolSupport::Full)], config, ), @@ -63,8 +64,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: @@ -73,12 +74,11 @@ impl NetworkBehaviourEventProcess> }, .. } => { - if let BobToAlice::Message5(msg) = request { - debug!("Received message 5"); - self.events.push_back(OutEvent::Msg(msg)); - // Send back empty response so that the request/response protocol completes. - self.rr.send_response(channel, AliceToBob::Message5); - } + let Request::Message5(msg) = request; + debug!("Received message 5"); + self.events.push_back(OutEvent::Msg(msg)); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, Response::Message5); } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, diff --git a/swap/src/protocol/bob/message5.rs b/swap/src/protocol/bob/message5.rs index 801e6532..ee2104da 100644 --- a/swap/src/protocol/bob/message5.rs +++ b/swap/src/protocol/bob/message5.rs @@ -1,6 +1,6 @@ use crate::{ bitcoin::EncryptedSignature, - network::request_response::{AliceToBob, BobToAlice, Codec, Message5Protocol, TIMEOUT}, + network::request_response::{Message5Protocol, OneShotCodec, Request, Response, TIMEOUT}, }; use libp2p::{ request_response::{ @@ -33,14 +33,14 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message5) { - let msg = BobToAlice::Message5(msg); + let msg = Request::Message5(msg); let _id = self.rr.send_request(&alice, msg); } @@ -48,7 +48,8 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll>, OutEvent>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -65,7 +66,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), + OneShotCodec::default(), vec![(Message5Protocol, ProtocolSupport::Full)], config, ), @@ -74,8 +75,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, @@ -85,9 +86,8 @@ impl NetworkBehaviourEventProcess> message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::Message5 = response { - self.events.push_back(OutEvent::Msg); - } + let Response::Message5 = response; + self.events.push_back(OutEvent::Msg); } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error);