Use event_process = false for Bob

As we do not process the event, we can just implement the needed `From`
traits.
This commit is contained in:
Franck Royer 2021-02-05 16:30:43 +11:00
parent a7b89e2fe4
commit eefb1b3b16
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 95 additions and 134 deletions

View File

@ -12,7 +12,7 @@ use crate::{
protocol::{alice, alice::TransferProof, bob, SwapAmounts}, protocol::{alice, alice::TransferProof, bob, SwapAmounts},
seed::Seed, seed::Seed,
}; };
use anyhow::{bail, Result}; use anyhow::{bail, Error, Result};
use libp2p::{core::Multiaddr, identity::Keypair, NetworkBehaviour, PeerId}; use libp2p::{core::Multiaddr, identity::Keypair, NetworkBehaviour, PeerId};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
@ -27,6 +27,7 @@ pub use self::{
swap_request::*, swap_request::*,
}; };
pub use execution_setup::{Message0, Message2, Message4}; pub use execution_setup::{Message0, Message2, Message4};
use libp2p::request_response::ResponseChannel;
mod encrypted_signature; mod encrypted_signature;
pub mod event_loop; pub mod event_loop;
@ -206,8 +207,13 @@ pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
SwapResponse(alice::SwapResponse), SwapResponse(alice::SwapResponse),
ExecutionSetupDone(Result<Box<State2>>), ExecutionSetupDone(Result<Box<State2>>),
TransferProof(Box<TransferProof>), TransferProof {
msg: Box<TransferProof>,
channel: ResponseChannel<()>,
},
EncryptedSignatureAcknowledged, EncryptedSignatureAcknowledged,
ResponseSent, // Same variant is used for all messages as no processing is done
Failure(Error),
} }
impl From<peer_tracker::OutEvent> for OutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
@ -222,7 +228,11 @@ impl From<peer_tracker::OutEvent> for OutEvent {
impl From<swap_request::OutEvent> for OutEvent { impl From<swap_request::OutEvent> for OutEvent {
fn from(event: swap_request::OutEvent) -> Self { fn from(event: swap_request::OutEvent) -> Self {
OutEvent::SwapResponse(event.swap_response) use swap_request::OutEvent::*;
match event {
MsgReceived(swap_response) => OutEvent::SwapResponse(swap_response),
Failure(err) => OutEvent::Failure(err.context("Failre with Swap Request")),
}
} }
} }
@ -236,16 +246,24 @@ impl From<execution_setup::OutEvent> for OutEvent {
impl From<transfer_proof::OutEvent> for OutEvent { impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self { fn from(event: transfer_proof::OutEvent) -> Self {
use transfer_proof::OutEvent::*;
match event { match event {
transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)), MsgReceived { msg, channel } => OutEvent::TransferProof {
msg: Box::new(msg),
channel,
},
AckSent => OutEvent::ResponseSent,
Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")),
} }
} }
} }
impl From<encrypted_signature::OutEvent> for OutEvent { impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self { fn from(event: encrypted_signature::OutEvent) -> Self {
use encrypted_signature::OutEvent::*;
match event { match event {
encrypted_signature::OutEvent::Acknowledged => OutEvent::EncryptedSignatureAcknowledged, Acknowledged => OutEvent::EncryptedSignatureAcknowledged,
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
} }
} }
} }

View File

@ -1,61 +1,38 @@
use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT};
use anyhow::{anyhow, Error};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, RequestResponseMessage,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::time::Duration;
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::error;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncryptedSignature { pub struct EncryptedSignature {
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Acknowledged, Acknowledged,
Failure(Error),
} }
/// A `NetworkBehaviour` that represents sending encrypted signature to Alice. /// A `NetworkBehaviour` that represents sending encrypted signature to Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>, rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) {
let _id = self.rr.send_request(&alice, msg); let _id = self.rr.send_request(&alice, msg);
} }
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
RequestProtocol<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -70,33 +47,30 @@ impl Default for Behaviour {
vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<EncryptedSignature, ()>> for Behaviour { impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<EncryptedSignature, ()>) { fn from(event: RequestResponseEvent<EncryptedSignature, ()>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. }, message: RequestResponseMessage::Request { .. },
.. ..
} => panic!("Bob should never get a request from Alice"), } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")),
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. }, message: RequestResponseMessage::Response { .. },
.. ..
} => { } => OutEvent::Acknowledged,
self.events.push_back(OutEvent::Acknowledged);
}
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob does not send the encrypted signature response to Alice");
} }
RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!(
"Bob does not send the encrypted signature response to Alice"
)),
} }
} }
} }

View File

@ -190,13 +190,21 @@ impl EventLoop {
OutEvent::ExecutionSetupDone(res) => { OutEvent::ExecutionSetupDone(res) => {
let _ = self.done_execution_setup.send(res.map(|state|*state)).await; let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
} }
OutEvent::TransferProof(msg) => { OutEvent::TransferProof{ msg, channel }=> {
let _ = self.recv_transfer_proof.send(*msg).await; let _ = self.recv_transfer_proof.send(*msg).await;
// Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.transfer_proof.send_ack(channel) {
error!("Failed to send Transfer Proof ack: {:?}", error);
}
} }
OutEvent::EncryptedSignatureAcknowledged => { OutEvent::EncryptedSignatureAcknowledged => {
debug!("Alice acknowledged encrypted signature"); debug!("Alice acknowledged encrypted signature");
let _ = self.recv_encrypted_signature_ack.send(()).await; let _ = self.recv_encrypted_signature_ack.send(()).await;
} }
OutEvent::ResponseSent => {}
OutEvent::Failure(err) => {
error!("Communication error: {:#}", err)
}
} }
}, },
option = self.dial_alice.recv().fuse() => { option = self.dial_alice.recv().fuse() => {

View File

@ -2,22 +2,17 @@ use crate::{
network::request_response::{CborCodec, Swap, TIMEOUT}, network::request_response::{CborCodec, Swap, TIMEOUT},
protocol::alice::SwapResponse, protocol::alice::SwapResponse,
}; };
use anyhow::Result; use anyhow::{anyhow, Error, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, RequestResponseMessage,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::time::Duration;
collections::VecDeque, use tracing::debug;
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct SwapRequest { pub struct SwapRequest {
@ -25,19 +20,18 @@ pub struct SwapRequest {
pub btc_amount: bitcoin::Amount, pub btc_amount: bitcoin::Amount,
} }
#[derive(Copy, Clone, Debug)] #[derive(Debug)]
pub struct OutEvent { pub enum OutEvent {
pub swap_response: SwapResponse, MsgReceived(SwapResponse),
Failure(Error),
} }
/// A `NetworkBehaviour` that represents doing the negotiation of a swap. /// A `NetworkBehaviour` that represents doing the negotiation of a swap.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<CborCodec<Swap, SwapRequest, SwapResponse>>, rr: RequestResponse<CborCodec<Swap, SwapRequest, SwapResponse>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
@ -46,23 +40,6 @@ impl Behaviour {
Ok(id) Ok(id)
} }
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
RequestProtocol<CborCodec<Swap, SwapRequest, SwapResponse>>,
OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -78,35 +55,33 @@ impl Default for Behaviour {
vec![(Swap, ProtocolSupport::Outbound)], vec![(Swap, ProtocolSupport::Outbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<SwapRequest, SwapResponse>> for Behaviour { impl From<RequestResponseEvent<SwapRequest, SwapResponse>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<SwapRequest, SwapResponse>) { fn from(event: RequestResponseEvent<SwapRequest, SwapResponse>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. }, message: RequestResponseMessage::Request { .. },
.. ..
} => panic!("Bob should never get a request from Alice"), } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")),
RequestResponseEvent::Message { RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Response { response, .. }, message: RequestResponseMessage::Response { response, .. },
.. ..
} => { } => {
debug!("Received swap response"); debug!("Received swap response from {}", peer);
self.events.push_back(OutEvent { OutEvent::MsgReceived(response)
swap_response: response,
});
} }
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
} }
RequestResponseEvent::ResponseSent { .. } => { RequestResponseEvent::ResponseSent { .. } => {
error!("Bob does not send a swap response to Alice"); OutEvent::Failure(anyhow!("Bob does not send a swap response to Alice"))
} }
} }
} }

View File

@ -2,53 +2,41 @@ use crate::{
network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}, network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT},
protocol::alice::TransferProof, protocol::alice::TransferProof,
}; };
use anyhow::{anyhow, Error, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, RequestResponseMessage, ResponseChannel,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, NetworkBehaviour,
}; };
use std::{ use std::time::Duration;
collections::VecDeque, use tracing::debug;
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Msg(TransferProof), MsgReceived {
msg: TransferProof,
channel: ResponseChannel<()>,
},
AckSent,
Failure(Error),
} }
/// A `NetworkBehaviour` that represents receiving the transfer proof from /// A `NetworkBehaviour` that represents receiving the transfer proof from
/// Alice. /// Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>, rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
fn poll( pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
&mut self, self.rr
_: &mut Context<'_>, .send_response(channel, ())
_: &mut impl PollParameters, .map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err))
) -> Poll<
NetworkBehaviourAction<
RequestProtocol<CborCodec<TransferProofProtocol, TransferProof, ()>>,
OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
} }
} }
@ -64,40 +52,38 @@ impl Default for Behaviour {
vec![(TransferProofProtocol, ProtocolSupport::Inbound)], vec![(TransferProofProtocol, ProtocolSupport::Inbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<TransferProof, ()>> for Behaviour { impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<TransferProof, ()>) { fn from(event: RequestResponseEvent<TransferProof, ()>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
peer,
message: message:
RequestResponseMessage::Request { RequestResponseMessage::Request {
request, channel, .. request, channel, ..
}, },
.. ..
} => { } => {
debug!("Received Transfer Proof"); debug!("Received Transfer Proof from {}", peer);
self.events.push_back(OutEvent::Msg(request)); OutEvent::MsgReceived {
// Send back empty response so that the request/response protocol completes. msg: request,
let _ = self channel,
.rr }
.send_response(channel, ())
.map_err(|err| error!("Failed to send message 3: {:?}", err));
} }
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. }, message: RequestResponseMessage::Response { .. },
.. ..
} => panic!("Bob should not get a Response"), } => OutEvent::Failure(anyhow!("Bob should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
} }
RequestResponseEvent::ResponseSent { .. } => debug!("Bob ack'd transfer proof message"), RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
} }
} }
} }