mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-01-27 07:47:08 -05:00
Use event_process = false for Alice
As we do not process the event, we can just implement the needed `From` traits.
This commit is contained in:
parent
554ae6c00e
commit
a7b89e2fe4
@ -128,7 +128,7 @@ where
|
|||||||
T: AsyncWrite + Unpin + Send,
|
T: AsyncWrite + Unpin + Send,
|
||||||
{
|
{
|
||||||
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
|
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
|
||||||
tracing::debug!("serde write_reponse error: {:?}", e);
|
tracing::debug!("serde write_response error: {:?}", e);
|
||||||
io::Error::new(io::ErrorKind::InvalidData, e)
|
io::Error::new(io::ErrorKind::InvalidData, e)
|
||||||
})?;
|
})?;
|
||||||
upgrade::write_one(io, &bytes).await?;
|
upgrade::write_one(io, &bytes).await?;
|
||||||
|
@ -13,7 +13,7 @@ use crate::{
|
|||||||
protocol::{bob::EncryptedSignature, SwapAmounts},
|
protocol::{bob::EncryptedSignature, SwapAmounts},
|
||||||
seed::Seed,
|
seed::Seed,
|
||||||
};
|
};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Error, Result};
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
core::Multiaddr, identity::Keypair, request_response::ResponseChannel, NetworkBehaviour, PeerId,
|
core::Multiaddr, identity::Keypair, request_response::ResponseChannel, NetworkBehaviour, PeerId,
|
||||||
};
|
};
|
||||||
@ -30,6 +30,7 @@ pub use self::{
|
|||||||
swap_response::*,
|
swap_response::*,
|
||||||
transfer_proof::TransferProof,
|
transfer_proof::TransferProof,
|
||||||
};
|
};
|
||||||
|
use crate::protocol::bob::SwapRequest;
|
||||||
pub use execution_setup::Message3;
|
pub use execution_setup::Message3;
|
||||||
|
|
||||||
mod encrypted_signature;
|
mod encrypted_signature;
|
||||||
@ -217,10 +218,18 @@ impl Builder {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum OutEvent {
|
pub enum OutEvent {
|
||||||
ConnectionEstablished(PeerId),
|
ConnectionEstablished(PeerId),
|
||||||
Request(Box<swap_response::OutEvent>),
|
SwapRequest {
|
||||||
|
msg: SwapRequest,
|
||||||
|
channel: ResponseChannel<SwapResponse>,
|
||||||
|
},
|
||||||
ExecutionSetupDone(Result<Box<State3>>),
|
ExecutionSetupDone(Result<Box<State3>>),
|
||||||
TransferProofAcknowledged,
|
TransferProofAcknowledged,
|
||||||
EncryptedSignature(Box<EncryptedSignature>),
|
EncryptedSignature {
|
||||||
|
msg: Box<EncryptedSignature>,
|
||||||
|
channel: ResponseChannel<()>,
|
||||||
|
},
|
||||||
|
ResponseSent, // Same variant is used for all messages as no processing is done
|
||||||
|
Failure(Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<peer_tracker::OutEvent> for OutEvent {
|
impl From<peer_tracker::OutEvent> for OutEvent {
|
||||||
@ -235,7 +244,12 @@ impl From<peer_tracker::OutEvent> for OutEvent {
|
|||||||
|
|
||||||
impl From<swap_response::OutEvent> for OutEvent {
|
impl From<swap_response::OutEvent> for OutEvent {
|
||||||
fn from(event: swap_response::OutEvent) -> Self {
|
fn from(event: swap_response::OutEvent) -> Self {
|
||||||
OutEvent::Request(Box::new(event))
|
use swap_response::OutEvent::*;
|
||||||
|
match event {
|
||||||
|
MsgReceived { msg, channel } => OutEvent::SwapRequest { msg, channel },
|
||||||
|
ResponseSent => OutEvent::ResponseSent,
|
||||||
|
Failure(err) => OutEvent::Failure(err.context("Swap Request/Response failure")),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,16 +263,24 @@ impl From<execution_setup::OutEvent> for OutEvent {
|
|||||||
|
|
||||||
impl From<transfer_proof::OutEvent> for OutEvent {
|
impl From<transfer_proof::OutEvent> for OutEvent {
|
||||||
fn from(event: transfer_proof::OutEvent) -> Self {
|
fn from(event: transfer_proof::OutEvent) -> Self {
|
||||||
|
use transfer_proof::OutEvent::*;
|
||||||
match event {
|
match event {
|
||||||
transfer_proof::OutEvent::Acknowledged => OutEvent::TransferProofAcknowledged,
|
Acknowledged => OutEvent::TransferProofAcknowledged,
|
||||||
|
Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<encrypted_signature::OutEvent> for OutEvent {
|
impl From<encrypted_signature::OutEvent> for OutEvent {
|
||||||
fn from(event: encrypted_signature::OutEvent) -> Self {
|
fn from(event: encrypted_signature::OutEvent) -> Self {
|
||||||
|
use encrypted_signature::OutEvent::*;
|
||||||
match event {
|
match event {
|
||||||
encrypted_signature::OutEvent::Msg(msg) => OutEvent::EncryptedSignature(Box::new(msg)),
|
MsgReceived { msg, channel } => OutEvent::EncryptedSignature {
|
||||||
|
msg: Box::new(msg),
|
||||||
|
channel,
|
||||||
|
},
|
||||||
|
AckSent => OutEvent::ResponseSent,
|
||||||
|
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,53 +2,41 @@ use crate::{
|
|||||||
network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT},
|
network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT},
|
||||||
protocol::bob::EncryptedSignature,
|
protocol::bob::EncryptedSignature,
|
||||||
};
|
};
|
||||||
|
use anyhow::{anyhow, Error, Result};
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
request_response::{
|
request_response::{
|
||||||
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
|
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
|
||||||
RequestResponseEvent, RequestResponseMessage,
|
RequestResponseMessage, ResponseChannel,
|
||||||
},
|
},
|
||||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
|
|
||||||
NetworkBehaviour,
|
NetworkBehaviour,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::time::Duration;
|
||||||
collections::VecDeque,
|
use tracing::debug;
|
||||||
task::{Context, Poll},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use tracing::{debug, error};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum OutEvent {
|
pub enum OutEvent {
|
||||||
Msg(EncryptedSignature),
|
MsgReceived {
|
||||||
|
msg: EncryptedSignature,
|
||||||
|
channel: ResponseChannel<()>,
|
||||||
|
},
|
||||||
|
AckSent,
|
||||||
|
Failure(Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
|
/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
|
||||||
/// signature from Bob.
|
/// signature from Bob.
|
||||||
#[derive(NetworkBehaviour)]
|
#[derive(NetworkBehaviour)]
|
||||||
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
|
#[behaviour(out_event = "OutEvent", event_process = false)]
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct Behaviour {
|
pub struct Behaviour {
|
||||||
rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
|
rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
|
||||||
#[behaviour(ignore)]
|
|
||||||
events: VecDeque<OutEvent>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Behaviour {
|
impl Behaviour {
|
||||||
fn poll(
|
pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
|
||||||
&mut self,
|
self.rr
|
||||||
_: &mut Context<'_>,
|
.send_response(channel, ())
|
||||||
_: &mut impl PollParameters,
|
.map_err(|err| anyhow!("Failed to ack encrypted signature: {:?}", err))
|
||||||
) -> Poll<
|
|
||||||
NetworkBehaviourAction<
|
|
||||||
RequestProtocol<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
|
|
||||||
OutEvent,
|
|
||||||
>,
|
|
||||||
> {
|
|
||||||
if let Some(event) = self.events.pop_front() {
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,41 +52,38 @@ impl Default for Behaviour {
|
|||||||
vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)],
|
vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)],
|
||||||
config,
|
config,
|
||||||
),
|
),
|
||||||
events: Default::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkBehaviourEventProcess<RequestResponseEvent<EncryptedSignature, ()>> for Behaviour {
|
impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
|
||||||
fn inject_event(&mut self, event: RequestResponseEvent<EncryptedSignature, ()>) {
|
fn from(event: RequestResponseEvent<EncryptedSignature, ()>) -> Self {
|
||||||
match event {
|
match event {
|
||||||
RequestResponseEvent::Message {
|
RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
message:
|
message:
|
||||||
RequestResponseMessage::Request {
|
RequestResponseMessage::Request {
|
||||||
request, channel, ..
|
request, channel, ..
|
||||||
},
|
},
|
||||||
..
|
..
|
||||||
} => {
|
} => {
|
||||||
debug!("Received encrypted signature");
|
debug!("Received encrypted signature from {}", peer);
|
||||||
self.events.push_back(OutEvent::Msg(request));
|
OutEvent::MsgReceived {
|
||||||
// Send back empty response so that the request/response protocol completes.
|
msg: request,
|
||||||
if let Err(error) = self.rr.send_response(channel, ()) {
|
channel,
|
||||||
error!("Failed to send Encrypted Signature ack: {:?}", error);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RequestResponseEvent::Message {
|
RequestResponseEvent::Message {
|
||||||
message: RequestResponseMessage::Response { .. },
|
message: RequestResponseMessage::Response { .. },
|
||||||
..
|
..
|
||||||
} => panic!("Alice should not get a Response"),
|
} => OutEvent::Failure(anyhow!("Alice should not get a Response")),
|
||||||
RequestResponseEvent::InboundFailure { error, .. } => {
|
RequestResponseEvent::InboundFailure { error, .. } => {
|
||||||
error!("Inbound failure: {:?}", error);
|
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
|
||||||
}
|
}
|
||||||
RequestResponseEvent::OutboundFailure { error, .. } => {
|
RequestResponseEvent::OutboundFailure { error, .. } => {
|
||||||
error!("Outbound failure: {:?}", error);
|
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
|
||||||
}
|
|
||||||
RequestResponseEvent::ResponseSent { .. } => {
|
|
||||||
debug!("Alice has sent an Message3 response to Bob");
|
|
||||||
}
|
}
|
||||||
|
RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ use crate::{
|
|||||||
network::{transport::SwapTransport, TokioExecutor},
|
network::{transport::SwapTransport, TokioExecutor},
|
||||||
protocol::{
|
protocol::{
|
||||||
alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof},
|
alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof},
|
||||||
bob::EncryptedSignature,
|
bob::{EncryptedSignature, SwapRequest},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
@ -35,7 +35,7 @@ impl<T> Default for Channels<T> {
|
|||||||
pub struct EventLoopHandle {
|
pub struct EventLoopHandle {
|
||||||
done_execution_setup: Receiver<Result<State3>>,
|
done_execution_setup: Receiver<Result<State3>>,
|
||||||
recv_encrypted_signature: Receiver<EncryptedSignature>,
|
recv_encrypted_signature: Receiver<EncryptedSignature>,
|
||||||
request: Receiver<crate::protocol::alice::swap_response::OutEvent>,
|
recv_swap_request: Receiver<(SwapRequest, ResponseChannel<SwapResponse>)>,
|
||||||
conn_established: Receiver<PeerId>,
|
conn_established: Receiver<PeerId>,
|
||||||
send_swap_response: Sender<(ResponseChannel<SwapResponse>, SwapResponse)>,
|
send_swap_response: Sender<(ResponseChannel<SwapResponse>, SwapResponse)>,
|
||||||
start_execution_setup: Sender<(PeerId, State0)>,
|
start_execution_setup: Sender<(PeerId, State0)>,
|
||||||
@ -70,10 +70,10 @@ impl EventLoopHandle {
|
|||||||
.ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob"))
|
.ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob"))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn recv_request(
|
pub async fn recv_swap_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<crate::protocol::alice::swap_response::OutEvent> {
|
) -> Result<(SwapRequest, ResponseChannel<SwapResponse>)> {
|
||||||
self.request
|
self.recv_swap_request
|
||||||
.recv()
|
.recv()
|
||||||
.await
|
.await
|
||||||
.ok_or_else(|| anyhow!("Failed to receive amounts request from Bob"))
|
.ok_or_else(|| anyhow!("Failed to receive amounts request from Bob"))
|
||||||
@ -108,7 +108,7 @@ pub struct EventLoop {
|
|||||||
start_execution_setup: Receiver<(PeerId, State0)>,
|
start_execution_setup: Receiver<(PeerId, State0)>,
|
||||||
done_execution_setup: Sender<Result<State3>>,
|
done_execution_setup: Sender<Result<State3>>,
|
||||||
recv_encrypted_signature: Sender<EncryptedSignature>,
|
recv_encrypted_signature: Sender<EncryptedSignature>,
|
||||||
request: Sender<crate::protocol::alice::swap_response::OutEvent>,
|
recv_swap_request: Sender<(SwapRequest, ResponseChannel<SwapResponse>)>,
|
||||||
conn_established: Sender<PeerId>,
|
conn_established: Sender<PeerId>,
|
||||||
send_swap_response: Receiver<(ResponseChannel<SwapResponse>, SwapResponse)>,
|
send_swap_response: Receiver<(ResponseChannel<SwapResponse>, SwapResponse)>,
|
||||||
send_transfer_proof: Receiver<(PeerId, TransferProof)>,
|
send_transfer_proof: Receiver<(PeerId, TransferProof)>,
|
||||||
@ -145,7 +145,7 @@ impl EventLoop {
|
|||||||
start_execution_setup: start_execution_setup.receiver,
|
start_execution_setup: start_execution_setup.receiver,
|
||||||
done_execution_setup: done_execution_setup.sender,
|
done_execution_setup: done_execution_setup.sender,
|
||||||
recv_encrypted_signature: recv_encrypted_signature.sender,
|
recv_encrypted_signature: recv_encrypted_signature.sender,
|
||||||
request: request.sender,
|
recv_swap_request: request.sender,
|
||||||
conn_established: conn_established.sender,
|
conn_established: conn_established.sender,
|
||||||
send_swap_response: send_swap_response.receiver,
|
send_swap_response: send_swap_response.receiver,
|
||||||
send_transfer_proof: send_transfer_proof.receiver,
|
send_transfer_proof: send_transfer_proof.receiver,
|
||||||
@ -156,7 +156,7 @@ impl EventLoop {
|
|||||||
start_execution_setup: start_execution_setup.sender,
|
start_execution_setup: start_execution_setup.sender,
|
||||||
done_execution_setup: done_execution_setup.receiver,
|
done_execution_setup: done_execution_setup.receiver,
|
||||||
recv_encrypted_signature: recv_encrypted_signature.receiver,
|
recv_encrypted_signature: recv_encrypted_signature.receiver,
|
||||||
request: request.receiver,
|
recv_swap_request: request.receiver,
|
||||||
conn_established: conn_established.receiver,
|
conn_established: conn_established.receiver,
|
||||||
send_swap_response: send_swap_response.sender,
|
send_swap_response: send_swap_response.sender,
|
||||||
send_transfer_proof: send_transfer_proof.sender,
|
send_transfer_proof: send_transfer_proof.sender,
|
||||||
@ -174,6 +174,9 @@ impl EventLoop {
|
|||||||
OutEvent::ConnectionEstablished(alice) => {
|
OutEvent::ConnectionEstablished(alice) => {
|
||||||
let _ = self.conn_established.send(alice).await;
|
let _ = self.conn_established.send(alice).await;
|
||||||
}
|
}
|
||||||
|
OutEvent::SwapRequest { msg, channel } => {
|
||||||
|
let _ = self.recv_swap_request.send((msg, channel)).await;
|
||||||
|
}
|
||||||
OutEvent::ExecutionSetupDone(res) => {
|
OutEvent::ExecutionSetupDone(res) => {
|
||||||
let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
|
let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
|
||||||
}
|
}
|
||||||
@ -181,11 +184,16 @@ impl EventLoop {
|
|||||||
trace!("Bob acknowledged transfer proof");
|
trace!("Bob acknowledged transfer proof");
|
||||||
let _ = self.recv_transfer_proof_ack.send(()).await;
|
let _ = self.recv_transfer_proof_ack.send(()).await;
|
||||||
}
|
}
|
||||||
OutEvent::EncryptedSignature(msg) => {
|
OutEvent::EncryptedSignature{ msg, channel } => {
|
||||||
let _ = self.recv_encrypted_signature.send(*msg).await;
|
let _ = self.recv_encrypted_signature.send(*msg).await;
|
||||||
|
// Send back empty response so that the request/response protocol completes.
|
||||||
|
if let Err(error) = self.swarm.encrypted_signature.send_ack(channel) {
|
||||||
|
error!("Failed to send Encrypted Signature ack: {:?}", error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
OutEvent::Request(event) => {
|
OutEvent::ResponseSent => {}
|
||||||
let _ = self.request.send(*event).await;
|
OutEvent::Failure(err) => {
|
||||||
|
error!("Communication error: {:#}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -46,13 +46,13 @@ pub async fn negotiate(
|
|||||||
|
|
||||||
let event = timeout(
|
let event = timeout(
|
||||||
execution_params.bob_time_to_act,
|
execution_params.bob_time_to_act,
|
||||||
event_loop_handle.recv_request(),
|
event_loop_handle.recv_swap_request(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("Failed to receive swap request from Bob")??;
|
.context("Failed to receive swap request from Bob")??;
|
||||||
|
|
||||||
event_loop_handle
|
event_loop_handle
|
||||||
.send_swap_response(event.channel, SwapResponse { xmr_amount })
|
.send_swap_response(event.1, SwapResponse { xmr_amount })
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let state3 = timeout(
|
let state3 = timeout(
|
||||||
|
@ -3,27 +3,26 @@ use crate::{
|
|||||||
network::request_response::{CborCodec, Swap, TIMEOUT},
|
network::request_response::{CborCodec, Swap, TIMEOUT},
|
||||||
protocol::bob::SwapRequest,
|
protocol::bob::SwapRequest,
|
||||||
};
|
};
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Error, Result};
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
request_response::{
|
request_response::{
|
||||||
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
|
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
|
||||||
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
|
RequestResponseMessage, ResponseChannel,
|
||||||
},
|
},
|
||||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
|
|
||||||
NetworkBehaviour,
|
NetworkBehaviour,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{
|
use std::time::Duration;
|
||||||
collections::VecDeque,
|
use tracing::debug;
|
||||||
task::{Context, Poll},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use tracing::{debug, error};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct OutEvent {
|
pub enum OutEvent {
|
||||||
pub msg: SwapRequest,
|
MsgReceived {
|
||||||
pub channel: ResponseChannel<SwapResponse>,
|
msg: SwapRequest,
|
||||||
|
channel: ResponseChannel<SwapResponse>,
|
||||||
|
},
|
||||||
|
ResponseSent,
|
||||||
|
Failure(Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
||||||
@ -31,15 +30,45 @@ pub struct SwapResponse {
|
|||||||
pub xmr_amount: monero::Amount,
|
pub xmr_amount: monero::Amount,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<RequestResponseEvent<SwapRequest, SwapResponse>> for OutEvent {
|
||||||
|
fn from(event: RequestResponseEvent<SwapRequest, SwapResponse>) -> Self {
|
||||||
|
match event {
|
||||||
|
RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
|
message:
|
||||||
|
RequestResponseMessage::Request {
|
||||||
|
request, channel, ..
|
||||||
|
},
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
debug!("Received swap request from {}", peer);
|
||||||
|
OutEvent::MsgReceived {
|
||||||
|
msg: request,
|
||||||
|
channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RequestResponseEvent::Message {
|
||||||
|
message: RequestResponseMessage::Response { .. },
|
||||||
|
..
|
||||||
|
} => OutEvent::Failure(anyhow!("Alice should not get a Response")),
|
||||||
|
RequestResponseEvent::InboundFailure { error, .. } => {
|
||||||
|
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
|
||||||
|
}
|
||||||
|
RequestResponseEvent::OutboundFailure { error, .. } => {
|
||||||
|
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
|
||||||
|
}
|
||||||
|
RequestResponseEvent::ResponseSent { .. } => OutEvent::ResponseSent,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A `NetworkBehaviour` that represents negotiate a swap using Swap
|
/// A `NetworkBehaviour` that represents negotiate a swap using Swap
|
||||||
/// request/response.
|
/// request/response.
|
||||||
#[derive(NetworkBehaviour)]
|
#[derive(NetworkBehaviour)]
|
||||||
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
|
#[behaviour(out_event = "OutEvent", event_process = false)]
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct Behaviour {
|
pub struct Behaviour {
|
||||||
rr: RequestResponse<CborCodec<Swap, SwapRequest, SwapResponse>>,
|
rr: RequestResponse<CborCodec<Swap, SwapRequest, SwapResponse>>,
|
||||||
#[behaviour(ignore)]
|
|
||||||
events: VecDeque<OutEvent>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Behaviour {
|
impl Behaviour {
|
||||||
@ -53,23 +82,6 @@ impl Behaviour {
|
|||||||
.send_response(channel, msg)
|
.send_response(channel, msg)
|
||||||
.map_err(|_| anyhow!("Sending swap response failed"))
|
.map_err(|_| anyhow!("Sending swap response failed"))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(
|
|
||||||
&mut self,
|
|
||||||
_: &mut Context<'_>,
|
|
||||||
_: &mut impl PollParameters,
|
|
||||||
) -> Poll<
|
|
||||||
NetworkBehaviourAction<
|
|
||||||
RequestProtocol<CborCodec<Swap, SwapRequest, SwapResponse>>,
|
|
||||||
OutEvent,
|
|
||||||
>,
|
|
||||||
> {
|
|
||||||
if let Some(event) = self.events.pop_front() {
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Behaviour {
|
impl Default for Behaviour {
|
||||||
@ -85,41 +97,6 @@ impl Default for Behaviour {
|
|||||||
vec![(Swap, ProtocolSupport::Inbound)],
|
vec![(Swap, ProtocolSupport::Inbound)],
|
||||||
config,
|
config,
|
||||||
),
|
),
|
||||||
events: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetworkBehaviourEventProcess<RequestResponseEvent<SwapRequest, SwapResponse>> for Behaviour {
|
|
||||||
fn inject_event(&mut self, event: RequestResponseEvent<SwapRequest, SwapResponse>) {
|
|
||||||
match event {
|
|
||||||
RequestResponseEvent::Message {
|
|
||||||
peer,
|
|
||||||
message:
|
|
||||||
RequestResponseMessage::Request {
|
|
||||||
request, channel, ..
|
|
||||||
},
|
|
||||||
..
|
|
||||||
} => {
|
|
||||||
debug!("Received swap request from {}", peer);
|
|
||||||
self.events.push_back(OutEvent {
|
|
||||||
msg: request,
|
|
||||||
channel,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
RequestResponseEvent::Message {
|
|
||||||
message: RequestResponseMessage::Response { .. },
|
|
||||||
..
|
|
||||||
} => panic!("Alice should not get a Response"),
|
|
||||||
RequestResponseEvent::InboundFailure { error, .. } => {
|
|
||||||
error!("Inbound failure: {:?}", error);
|
|
||||||
}
|
|
||||||
RequestResponseEvent::OutboundFailure { error, .. } => {
|
|
||||||
error!("Outbound failure: {:?}", error);
|
|
||||||
}
|
|
||||||
RequestResponseEvent::ResponseSent { .. } => {
|
|
||||||
debug!("Alice has sent a swap response to Bob");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,64 +2,41 @@ use crate::{
|
|||||||
monero,
|
monero,
|
||||||
network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT},
|
network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT},
|
||||||
};
|
};
|
||||||
|
use anyhow::{anyhow, Error};
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
request_response::{
|
request_response::{
|
||||||
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
|
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
|
||||||
RequestResponseEvent, RequestResponseMessage,
|
RequestResponseMessage,
|
||||||
},
|
},
|
||||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
|
|
||||||
NetworkBehaviour, PeerId,
|
NetworkBehaviour, PeerId,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{
|
use std::time::Duration;
|
||||||
collections::VecDeque,
|
|
||||||
task::{Context, Poll},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct TransferProof {
|
pub struct TransferProof {
|
||||||
pub tx_lock_proof: monero::TransferProof,
|
pub tx_lock_proof: monero::TransferProof,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug)]
|
||||||
pub enum OutEvent {
|
pub enum OutEvent {
|
||||||
Acknowledged,
|
Acknowledged,
|
||||||
|
Failure(Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to
|
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to
|
||||||
/// Bob.
|
/// Bob.
|
||||||
#[derive(NetworkBehaviour)]
|
#[derive(NetworkBehaviour)]
|
||||||
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
|
#[behaviour(out_event = "OutEvent", event_process = false)]
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct Behaviour {
|
pub struct Behaviour {
|
||||||
rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
|
rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
|
||||||
#[behaviour(ignore)]
|
|
||||||
events: VecDeque<OutEvent>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Behaviour {
|
impl Behaviour {
|
||||||
pub fn send(&mut self, bob: PeerId, msg: TransferProof) {
|
pub fn send(&mut self, bob: PeerId, msg: TransferProof) {
|
||||||
let _id = self.rr.send_request(&bob, msg);
|
let _id = self.rr.send_request(&bob, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(
|
|
||||||
&mut self,
|
|
||||||
_: &mut Context<'_>,
|
|
||||||
_: &mut impl PollParameters,
|
|
||||||
) -> Poll<
|
|
||||||
NetworkBehaviourAction<
|
|
||||||
RequestProtocol<CborCodec<TransferProofProtocol, TransferProof, ()>>,
|
|
||||||
OutEvent,
|
|
||||||
>,
|
|
||||||
> {
|
|
||||||
if let Some(event) = self.events.pop_front() {
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Behaviour {
|
impl Default for Behaviour {
|
||||||
@ -74,31 +51,32 @@ impl Default for Behaviour {
|
|||||||
vec![(TransferProofProtocol, ProtocolSupport::Outbound)],
|
vec![(TransferProofProtocol, ProtocolSupport::Outbound)],
|
||||||
config,
|
config,
|
||||||
),
|
),
|
||||||
events: Default::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkBehaviourEventProcess<RequestResponseEvent<TransferProof, ()>> for Behaviour {
|
impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
|
||||||
fn inject_event(&mut self, event: RequestResponseEvent<TransferProof, ()>) {
|
fn from(event: RequestResponseEvent<TransferProof, ()>) -> Self {
|
||||||
match event {
|
match event {
|
||||||
RequestResponseEvent::Message {
|
RequestResponseEvent::Message {
|
||||||
message: RequestResponseMessage::Request { .. },
|
message: RequestResponseMessage::Request { .. },
|
||||||
..
|
..
|
||||||
} => panic!("Alice should never get a transfer proof request from Bob"),
|
} => OutEvent::Failure(anyhow!(
|
||||||
|
"Alice should never get a transfer proof request from Bob"
|
||||||
|
)),
|
||||||
RequestResponseEvent::Message {
|
RequestResponseEvent::Message {
|
||||||
message: RequestResponseMessage::Response { .. },
|
message: RequestResponseMessage::Response { .. },
|
||||||
..
|
..
|
||||||
} => {
|
} => OutEvent::Acknowledged,
|
||||||
self.events.push_back(OutEvent::Acknowledged);
|
|
||||||
}
|
|
||||||
RequestResponseEvent::InboundFailure { error, .. } => {
|
RequestResponseEvent::InboundFailure { error, .. } => {
|
||||||
error!("Inbound failure: {:?}", error);
|
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
|
||||||
}
|
}
|
||||||
RequestResponseEvent::OutboundFailure { error, .. } => {
|
RequestResponseEvent::OutboundFailure { error, .. } => {
|
||||||
error!("Outbound failure: {:?}", error);
|
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
|
||||||
|
}
|
||||||
|
RequestResponseEvent::ResponseSent { .. } => {
|
||||||
|
OutEvent::Failure(anyhow!("Alice should not send a response"))
|
||||||
}
|
}
|
||||||
RequestResponseEvent::ResponseSent { .. } => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user