diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 84e118cb..94afa7fa 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -12,7 +12,7 @@ use crate::{ protocol::{alice, alice::TransferProof, bob, SwapAmounts}, seed::Seed, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Error, Result}; use libp2p::{core::Multiaddr, identity::Keypair, NetworkBehaviour, PeerId}; use rand::rngs::OsRng; use std::{path::PathBuf, sync::Arc}; @@ -27,6 +27,7 @@ pub use self::{ swap_request::*, }; pub use execution_setup::{Message0, Message2, Message4}; +use libp2p::request_response::ResponseChannel; mod encrypted_signature; pub mod event_loop; @@ -206,8 +207,13 @@ pub enum OutEvent { ConnectionEstablished(PeerId), SwapResponse(alice::SwapResponse), ExecutionSetupDone(Result>), - TransferProof(Box), + TransferProof { + msg: Box, + channel: ResponseChannel<()>, + }, EncryptedSignatureAcknowledged, + ResponseSent, // Same variant is used for all messages as no processing is done + Failure(Error), } impl From for OutEvent { @@ -222,7 +228,11 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: swap_request::OutEvent) -> Self { - OutEvent::SwapResponse(event.swap_response) + use swap_request::OutEvent::*; + match event { + MsgReceived(swap_response) => OutEvent::SwapResponse(swap_response), + Failure(err) => OutEvent::Failure(err.context("Failre with Swap Request")), + } } } @@ -236,16 +246,24 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { + use transfer_proof::OutEvent::*; match event { - transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)), + MsgReceived { msg, channel } => OutEvent::TransferProof { + msg: Box::new(msg), + channel, + }, + AckSent => OutEvent::ResponseSent, + Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")), } } } impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { + use encrypted_signature::OutEvent::*; match event { - encrypted_signature::OutEvent::Acknowledged => OutEvent::EncryptedSignatureAcknowledged, + Acknowledged => OutEvent::EncryptedSignatureAcknowledged, + Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")), } } } diff --git a/swap/src/protocol/bob/encrypted_signature.rs b/swap/src/protocol/bob/encrypted_signature.rs index c8353c4f..96a253a2 100644 --- a/swap/src/protocol/bob/encrypted_signature.rs +++ b/swap/src/protocol/bob/encrypted_signature.rs @@ -1,61 +1,38 @@ use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; +use anyhow::{anyhow, Error}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, PeerId, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::error; +use std::time::Duration; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EncryptedSignature { pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug)] pub enum OutEvent { Acknowledged, + Failure(Error), } /// A `NetworkBehaviour` that represents sending encrypted signature to Alice. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, } impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { let _id = self.rr.send_request(&alice, msg); } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - RequestProtocol>, - OutEvent, - >, - > { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl Default for Behaviour { @@ -70,33 +47,30 @@ impl Default for Behaviour { vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. - } => panic!("Bob should never get a request from Alice"), + } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")), RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. - } => { - self.events.push_back(OutEvent::Acknowledged); - } + } => OutEvent::Acknowledged, RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - unreachable!("Bob does not send the encrypted signature response to Alice"); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } + RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!( + "Bob does not send the encrypted signature response to Alice" + )), } } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index d86536a0..4d8229d6 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -190,13 +190,21 @@ impl EventLoop { OutEvent::ExecutionSetupDone(res) => { let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } - OutEvent::TransferProof(msg) => { + OutEvent::TransferProof{ msg, channel }=> { let _ = self.recv_transfer_proof.send(*msg).await; + // Send back empty response so that the request/response protocol completes. + if let Err(error) = self.swarm.transfer_proof.send_ack(channel) { + error!("Failed to send Transfer Proof ack: {:?}", error); + } } OutEvent::EncryptedSignatureAcknowledged => { debug!("Alice acknowledged encrypted signature"); let _ = self.recv_encrypted_signature_ack.send(()).await; } + OutEvent::ResponseSent => {} + OutEvent::Failure(err) => { + error!("Communication error: {:#}", err) + } } }, option = self.dial_alice.recv().fuse() => { diff --git a/swap/src/protocol/bob/swap_request.rs b/swap/src/protocol/bob/swap_request.rs index 95fde3f2..b7d50dad 100644 --- a/swap/src/protocol/bob/swap_request.rs +++ b/swap/src/protocol/bob/swap_request.rs @@ -2,22 +2,17 @@ use crate::{ network::request_response::{CborCodec, Swap, TIMEOUT}, protocol::alice::SwapResponse, }; -use anyhow::Result; +use anyhow::{anyhow, Error, Result}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, - RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, PeerId, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; +use std::time::Duration; +use tracing::debug; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct SwapRequest { @@ -25,19 +20,18 @@ pub struct SwapRequest { pub btc_amount: bitcoin::Amount, } -#[derive(Copy, Clone, Debug)] -pub struct OutEvent { - pub swap_response: SwapResponse, +#[derive(Debug)] +pub enum OutEvent { + MsgReceived(SwapResponse), + Failure(Error), } /// A `NetworkBehaviour` that represents doing the negotiation of a swap. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, } impl Behaviour { @@ -46,23 +40,6 @@ impl Behaviour { Ok(id) } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - RequestProtocol>, - OutEvent, - >, - > { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl Default for Behaviour { @@ -78,35 +55,33 @@ impl Default for Behaviour { vec![(Swap, ProtocolSupport::Outbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. - } => panic!("Bob should never get a request from Alice"), + } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")), RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Response { response, .. }, .. } => { - debug!("Received swap response"); - self.events.push_back(OutEvent { - swap_response: response, - }); + debug!("Received swap response from {}", peer); + OutEvent::MsgReceived(response) } RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } RequestResponseEvent::ResponseSent { .. } => { - error!("Bob does not send a swap response to Alice"); + OutEvent::Failure(anyhow!("Bob does not send a swap response to Alice")) } } } diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs index 8306b040..67eeb9c4 100644 --- a/swap/src/protocol/bob/transfer_proof.rs +++ b/swap/src/protocol/bob/transfer_proof.rs @@ -2,53 +2,41 @@ use crate::{ network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}, protocol::alice::TransferProof, }; +use anyhow::{anyhow, Error, Result}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, }; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; +use std::time::Duration; +use tracing::debug; #[derive(Debug)] pub enum OutEvent { - Msg(TransferProof), + MsgReceived { + msg: TransferProof, + channel: ResponseChannel<()>, + }, + AckSent, + Failure(Error), } /// A `NetworkBehaviour` that represents receiving the transfer proof from /// Alice. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, } impl Behaviour { - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - RequestProtocol>, - OutEvent, - >, - > { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending + pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { + self.rr + .send_response(channel, ()) + .map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err)) } } @@ -64,40 +52,38 @@ impl Default for Behaviour { vec![(TransferProofProtocol, ProtocolSupport::Inbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Request { request, channel, .. }, .. } => { - debug!("Received Transfer Proof"); - self.events.push_back(OutEvent::Msg(request)); - // Send back empty response so that the request/response protocol completes. - let _ = self - .rr - .send_response(channel, ()) - .map_err(|err| error!("Failed to send message 3: {:?}", err)); + debug!("Received Transfer Proof from {}", peer); + OutEvent::MsgReceived { + msg: request, + channel, + } } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. - } => panic!("Bob should not get a Response"), + } => OutEvent::Failure(anyhow!("Bob should not get a Response")), RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } - RequestResponseEvent::ResponseSent { .. } => debug!("Bob ack'd transfer proof message"), + RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, } } }