Use Message4

This commit is contained in:
Franck Royer 2021-01-22 14:21:27 +11:00
parent 124d6f1ebb
commit d2a1937f51
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
13 changed files with 211 additions and 107 deletions

View File

@ -83,7 +83,7 @@ impl From<Alice> for AliceState {
match db_state { match db_state {
Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 }, Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 },
Alice::Negotiated(state3) => AliceState::Negotiated { Alice::Negotiated(state3) => AliceState::Negotiated {
channel: None, bob_peer_id: None,
amounts: SwapAmounts { amounts: SwapAmounts {
btc: state3.btc, btc: state3.btc,
xmr: state3.xmr, xmr: state3.xmr,
@ -91,7 +91,7 @@ impl From<Alice> for AliceState {
state3: Box::new(state3), state3: Box::new(state3),
}, },
Alice::BtcLocked(state3) => AliceState::BtcLocked { Alice::BtcLocked(state3) => AliceState::BtcLocked {
channel: None, bob_peer_id: None,
amounts: SwapAmounts { amounts: SwapAmounts {
btc: state3.btc, btc: state3.btc,
xmr: state3.xmr, xmr: state3.xmr,

View File

@ -35,7 +35,7 @@ pub enum AliceToBob {
SwapResponse(alice::SwapResponse), SwapResponse(alice::SwapResponse),
Message0(Box<alice::Message0>), Message0(Box<alice::Message0>),
Message1(Box<alice::Message1>), Message1(Box<alice::Message1>),
Message2(alice::Message2), Message2,
} }
/// Messages sent from one party to the other. /// Messages sent from one party to the other.

View File

@ -4,7 +4,6 @@ pub use self::{
event_loop::{EventLoop, EventLoopHandle}, event_loop::{EventLoop, EventLoopHandle},
message0::Message0, message0::Message0,
message1::Message1, message1::Message1,
message2::Message2,
message4::Message4, message4::Message4,
state::*, state::*,
swap::{run, run_until}, swap::{run, run_until},
@ -234,9 +233,10 @@ pub enum OutEvent {
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<AliceToBob>,
}, },
Message2 { Message2 {
msg: bob::Message2, msg: Box<bob::Message2>,
channel: ResponseChannel<AliceToBob>, bob_peer_id: PeerId,
}, },
Message4,
Message5(Message5), Message5(Message5),
} }
@ -278,7 +278,18 @@ impl From<message1::OutEvent> for OutEvent {
impl From<message2::OutEvent> for OutEvent { impl From<message2::OutEvent> for OutEvent {
fn from(event: message2::OutEvent) -> Self { fn from(event: message2::OutEvent) -> Self {
match event { match event {
message2::OutEvent::Msg { msg, channel } => OutEvent::Message2 { msg, channel }, message2::OutEvent::Msg { msg, bob_peer_id } => OutEvent::Message2 {
msg: Box::new(msg),
bob_peer_id,
},
}
}
}
impl From<message4::OutEvent> for OutEvent {
fn from(event: message4::OutEvent) -> Self {
match event {
message4::OutEvent::Msg => OutEvent::Message4,
} }
} }
} }
@ -301,6 +312,7 @@ pub struct Behaviour {
message0: message0::Behaviour, message0: message0::Behaviour,
message1: message1::Behaviour, message1: message1::Behaviour,
message2: message2::Behaviour, message2: message2::Behaviour,
message4: message4::Behaviour,
message5: message5::Behaviour, message5: message5::Behaviour,
} }
@ -327,9 +339,9 @@ impl Behaviour {
debug!("Sent Message1"); debug!("Sent Message1");
} }
/// Send Message2 to Bob in response to receiving his Message2. /// Send Message4 to Bob.
pub fn send_message2(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message2) { pub fn send_message4(&mut self, bob: PeerId, msg: Message4) {
self.message2.send(channel, msg); self.message4.send(bob, msg);
debug!("Sent Message2"); debug!("Sent Message 4");
} }
} }

View File

@ -2,7 +2,7 @@ use crate::{
network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor},
protocol::{ protocol::{
alice, alice,
alice::{Behaviour, OutEvent, SwapResponse}, alice::{Behaviour, Message4, OutEvent, SwapResponse},
bob, bob,
bob::Message5, bob::Message5,
}, },
@ -13,6 +13,7 @@ use libp2p::{
core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm,
}; };
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tracing::trace;
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Channels<T> { pub struct Channels<T> {
@ -37,14 +38,14 @@ impl<T> Default for Channels<T> {
pub struct EventLoopHandle { pub struct EventLoopHandle {
msg0: Receiver<(bob::Message0, ResponseChannel<AliceToBob>)>, msg0: Receiver<(bob::Message0, ResponseChannel<AliceToBob>)>,
msg1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>, msg1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>,
msg2: Receiver<(bob::Message2, ResponseChannel<AliceToBob>)>, msg2: Receiver<bob::Message2>,
msg5: Receiver<Message5>, msg5: Receiver<Message5>,
request: Receiver<crate::protocol::alice::swap_response::OutEvent>, request: Receiver<crate::protocol::alice::swap_response::OutEvent>,
conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
send_swap_response: Sender<(ResponseChannel<AliceToBob>, SwapResponse)>, send_swap_response: Sender<(ResponseChannel<AliceToBob>, SwapResponse)>,
send_msg0: Sender<(ResponseChannel<AliceToBob>, alice::Message0)>, send_msg0: Sender<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_msg1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>, send_msg1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_msg2: Sender<(ResponseChannel<AliceToBob>, alice::Message2)>, send_msg4: Sender<(PeerId, Message4)>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -69,11 +70,11 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed to receive message 1 from Bob")) .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob"))
} }
pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel<AliceToBob>)> { pub async fn recv_message2(&mut self) -> Result<bob::Message2> {
self.msg2 self.msg2
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed o receive message 2 from Bob")) .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob"))
} }
pub async fn recv_message5(&mut self) -> Result<Message5> { pub async fn recv_message5(&mut self) -> Result<Message5> {
@ -122,12 +123,8 @@ impl EventLoopHandle {
Ok(()) Ok(())
} }
pub async fn send_message2( pub async fn send_message4(&mut self, bob: PeerId, msg: Message4) -> Result<()> {
&mut self, let _ = self.send_msg4.send((bob, msg)).await?;
channel: ResponseChannel<AliceToBob>,
msg: alice::Message2,
) -> Result<()> {
let _ = self.send_msg2.send((channel, msg)).await?;
Ok(()) Ok(())
} }
} }
@ -137,14 +134,14 @@ pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
msg0: Sender<(bob::Message0, ResponseChannel<AliceToBob>)>, msg0: Sender<(bob::Message0, ResponseChannel<AliceToBob>)>,
msg1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>, msg1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>,
msg2: Sender<(bob::Message2, ResponseChannel<AliceToBob>)>, msg2: Sender<bob::Message2>,
msg5: Sender<Message5>, msg5: Sender<Message5>,
request: Sender<crate::protocol::alice::swap_response::OutEvent>, request: Sender<crate::protocol::alice::swap_response::OutEvent>,
conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
send_swap_response: Receiver<(ResponseChannel<AliceToBob>, SwapResponse)>, send_swap_response: Receiver<(ResponseChannel<AliceToBob>, SwapResponse)>,
send_msg0: Receiver<(ResponseChannel<AliceToBob>, alice::Message0)>, send_msg0: Receiver<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_msg1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>, send_msg1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_msg2: Receiver<(ResponseChannel<AliceToBob>, alice::Message2)>, send_msg4: Receiver<(PeerId, Message4)>,
} }
impl EventLoop { impl EventLoop {
@ -172,7 +169,7 @@ impl EventLoop {
let send_swap_response = Channels::new(); let send_swap_response = Channels::new();
let send_msg0 = Channels::new(); let send_msg0 = Channels::new();
let send_msg1 = Channels::new(); let send_msg1 = Channels::new();
let send_msg2 = Channels::new(); let send_msg4 = Channels::new();
let driver = EventLoop { let driver = EventLoop {
swarm, swarm,
@ -185,7 +182,7 @@ impl EventLoop {
send_swap_response: send_swap_response.receiver, send_swap_response: send_swap_response.receiver,
send_msg0: send_msg0.receiver, send_msg0: send_msg0.receiver,
send_msg1: send_msg1.receiver, send_msg1: send_msg1.receiver,
send_msg2: send_msg2.receiver, send_msg4: send_msg4.receiver,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
@ -198,7 +195,7 @@ impl EventLoop {
send_swap_response: send_swap_response.sender, send_swap_response: send_swap_response.sender,
send_msg0: send_msg0.sender, send_msg0: send_msg0.sender,
send_msg1: send_msg1.sender, send_msg1: send_msg1.sender,
send_msg2: send_msg2.sender, send_msg4: send_msg4.sender,
}; };
Ok((driver, handle)) Ok((driver, handle))
@ -218,9 +215,10 @@ impl EventLoop {
OutEvent::Message1 { msg, channel } => { OutEvent::Message1 { msg, channel } => {
let _ = self.msg1.send((msg, channel)).await; let _ = self.msg1.send((msg, channel)).await;
} }
OutEvent::Message2 { msg, channel } => { OutEvent::Message2 { msg, bob_peer_id : _} => {
let _ = self.msg2.send((msg, channel)).await; let _ = self.msg2.send(*msg).await;
} }
OutEvent::Message4 => trace!("Bob ack'd message 4"),
OutEvent::Message5(msg) => { OutEvent::Message5(msg) => {
let _ = self.msg5.send(msg).await; let _ = self.msg5.send(msg).await;
} }
@ -244,9 +242,9 @@ impl EventLoop {
self.swarm.send_message1(channel, msg); self.swarm.send_message1(channel, msg);
} }
}, },
msg2 = self.send_msg2.next().fuse() => { msg4 = self.send_msg4.next().fuse() => {
if let Some((channel, msg)) = msg2 { if let Some((bob_peer_id, msg)) = msg4 {
self.swarm.send_message2(channel, msg); self.swarm.send_message4(bob_peer_id, msg);
} }
}, },
} }

View File

@ -1,17 +1,15 @@
use crate::{ use crate::{
monero,
network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}, network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT},
protocol::bob, protocol::bob,
}; };
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage, ResponseChannel, RequestResponseEvent, RequestResponseMessage,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, NetworkBehaviour, PeerId,
}; };
use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
task::{Context, Poll}, task::{Context, Poll},
@ -22,18 +20,11 @@ use tracing::{debug, error};
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Msg { Msg {
/// Received message from Bob.
msg: bob::Message2, msg: bob::Message2,
/// Channel to send back Alice's message 2. bob_peer_id: PeerId,
channel: ResponseChannel<AliceToBob>,
}, },
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message2 {
pub tx_lock_proof: monero::TransferProof,
}
/// A `NetworkBehaviour` that represents receiving of message 2 from Bob. /// A `NetworkBehaviour` that represents receiving of message 2 from Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", poll_method = "poll")]
@ -45,11 +36,6 @@ pub struct Behaviour {
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message2) {
let msg = AliceToBob::Message2(msg);
self.rr.send_response(channel, msg);
}
fn poll( fn poll(
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
@ -84,15 +70,20 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) { fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
peer,
message: message:
RequestResponseMessage::Request { RequestResponseMessage::Request {
request, channel, .. request, channel, ..
}, },
..
} => { } => {
if let BobToAlice::Message2(msg) = request { if let BobToAlice::Message2(msg) = request {
debug!("Received Message2"); debug!("Received Message 2");
self.events.push_back(OutEvent::Msg { msg, channel }); self.events.push_back(OutEvent::Msg {
msg,
bob_peer_id: peer,
});
// Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, AliceToBob::Message2);
} }
} }
RequestResponseEvent::Message { RequestResponseEvent::Message {

View File

@ -8,15 +8,14 @@ use crate::{
}, },
monero, monero,
monero::CreateWalletForOutput, monero::CreateWalletForOutput,
network::request_response::AliceToBob, protocol::{alice, alice::Message4, bob, bob::Message5, SwapAmounts},
protocol::{alice, bob, bob::Message5, SwapAmounts},
}; };
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use ecdsa_fun::{ use ecdsa_fun::{
adaptor::{Adaptor, EncryptedSignature}, adaptor::{Adaptor, EncryptedSignature},
nonce::Deterministic, nonce::Deterministic,
}; };
use libp2p::request_response::ResponseChannel; use libp2p::PeerId;
use rand::{CryptoRng, RngCore}; use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
@ -30,12 +29,14 @@ pub enum AliceState {
state0: State0, state0: State0,
}, },
Negotiated { Negotiated {
channel: Option<ResponseChannel<AliceToBob>>, // TODO: Remove option
bob_peer_id: Option<PeerId>,
amounts: SwapAmounts, amounts: SwapAmounts,
state3: Box<State3>, state3: Box<State3>,
}, },
BtcLocked { BtcLocked {
channel: Option<ResponseChannel<AliceToBob>>, // TODO: Remove option
bob_peer_id: Option<PeerId>,
amounts: SwapAmounts, amounts: SwapAmounts,
state3: Box<State3>, state3: Box<State3>,
}, },
@ -476,8 +477,8 @@ pub struct State5 {
} }
impl State5 { impl State5 {
pub fn next_message(&self) -> alice::Message2 { pub fn next_message(&self) -> Message4 {
alice::Message2 { Message4 {
tx_lock_proof: self.tx_lock_proof.clone(), tx_lock_proof: self.tx_lock_proof.clone(),
} }
} }

View File

@ -10,10 +10,9 @@ use crate::{
config::Config, config::Config,
monero, monero,
monero::Transfer, monero::Transfer,
network::request_response::AliceToBob,
protocol::{ protocol::{
alice, alice,
alice::{event_loop::EventLoopHandle, SwapResponse}, alice::{event_loop::EventLoopHandle, Message4, SwapResponse},
SwapAmounts, SwapAmounts,
}, },
}; };
@ -23,7 +22,7 @@ use futures::{
future::{select, Either}, future::{select, Either},
pin_mut, pin_mut,
}; };
use libp2p::request_response::ResponseChannel; use libp2p::PeerId;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use sha2::Sha256; use sha2::Sha256;
use std::sync::Arc; use std::sync::Arc;
@ -35,11 +34,11 @@ pub async fn negotiate(
xmr_amount: monero::Amount, xmr_amount: monero::Amount,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
config: Config, config: Config,
) -> Result<(ResponseChannel<AliceToBob>, alice::State3)> { ) -> Result<(PeerId, alice::State3)> {
trace!("Starting negotiate"); trace!("Starting negotiate");
// todo: we can move this out, we dont need to timeout here // todo: we can move this out, we dont need to timeout here
let _peer_id = timeout( let bob_peer_id = timeout(
config.bob_time_to_act, config.bob_time_to_act,
event_loop_handle.recv_conn_established(), event_loop_handle.recv_conn_established(),
) )
@ -73,12 +72,11 @@ pub async fn negotiate(
.send_message1(channel, state2.next_message()) .send_message1(channel, state2.next_message())
.await?; .await?;
let (bob_message2, channel) = let bob_message2 = timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??;
timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??;
let state3 = state2.receive(bob_message2)?; let state3 = state2.receive(bob_message2)?;
Ok((channel, state3)) Ok((bob_peer_id, state3))
} }
// TODO(Franck): Use helper functions from xmr-btc instead of re-writing them // TODO(Franck): Use helper functions from xmr-btc instead of re-writing them
@ -108,7 +106,7 @@ where
} }
pub async fn lock_xmr<W>( pub async fn lock_xmr<W>(
channel: ResponseChannel<AliceToBob>, bob_peer_id: PeerId,
amounts: SwapAmounts, amounts: SwapAmounts,
state3: alice::State3, state3: alice::State3,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
@ -134,7 +132,7 @@ where
// Otherwise Alice might publish the lock tx twice! // Otherwise Alice might publish the lock tx twice!
event_loop_handle event_loop_handle
.send_message2(channel, alice::Message2 { .send_message4(bob_peer_id, Message4 {
tx_lock_proof: transfer_proof, tx_lock_proof: transfer_proof,
}) })
.await?; .await?;

View File

@ -91,11 +91,11 @@ async fn run_until_internal(
} else { } else {
match state { match state {
AliceState::Started { amounts, state0 } => { AliceState::Started { amounts, state0 } => {
let (channel, state3) = let (peer_id, state3) =
negotiate(state0, amounts.xmr, &mut event_loop_handle, config).await?; negotiate(state0, amounts.xmr, &mut event_loop_handle, config).await?;
let state = AliceState::Negotiated { let state = AliceState::Negotiated {
channel: Some(channel), bob_peer_id: Some(peer_id),
amounts, amounts,
state3: Box::new(state3), state3: Box::new(state3),
}; };
@ -117,11 +117,11 @@ async fn run_until_internal(
} }
AliceState::Negotiated { AliceState::Negotiated {
state3, state3,
channel, bob_peer_id,
amounts, amounts,
} => { } => {
let state = match channel { let state = match bob_peer_id {
Some(channel) => { Some(bob_peer_id) => {
let _ = wait_for_locked_bitcoin( let _ = wait_for_locked_bitcoin(
state3.tx_lock.txid(), state3.tx_lock.txid(),
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
@ -130,7 +130,7 @@ async fn run_until_internal(
.await?; .await?;
AliceState::BtcLocked { AliceState::BtcLocked {
channel: Some(channel), bob_peer_id: Some(bob_peer_id),
amounts, amounts,
state3, state3,
} }
@ -159,14 +159,14 @@ async fn run_until_internal(
.await .await
} }
AliceState::BtcLocked { AliceState::BtcLocked {
channel, bob_peer_id,
amounts, amounts,
state3, state3,
} => { } => {
let state = match channel { let state = match bob_peer_id {
Some(channel) => { Some(bob_peer_id) => {
lock_xmr( lock_xmr(
channel, bob_peer_id,
amounts, amounts,
*state3.clone(), *state3.clone(),
&mut event_loop_handle, &mut event_loop_handle,

View File

@ -31,11 +31,13 @@ pub use self::{
swap::{run, run_until}, swap::{run, run_until},
swap_request::*, swap_request::*,
}; };
use crate::protocol::alice::Message4;
pub mod event_loop; pub mod event_loop;
mod message0; mod message0;
mod message1; mod message1;
mod message2; mod message2;
mod message4;
mod message5; mod message5;
pub mod state; pub mod state;
pub mod swap; pub mod swap;
@ -210,7 +212,8 @@ pub enum OutEvent {
SwapResponse(alice::SwapResponse), SwapResponse(alice::SwapResponse),
Message0(Box<alice::Message0>), Message0(Box<alice::Message0>),
Message1(Box<alice::Message1>), Message1(Box<alice::Message1>),
Message2(alice::Message2), Message2,
Message4(Box<Message4>),
Message5, Message5,
} }
@ -249,7 +252,15 @@ impl From<message1::OutEvent> for OutEvent {
impl From<message2::OutEvent> for OutEvent { impl From<message2::OutEvent> for OutEvent {
fn from(event: message2::OutEvent) -> Self { fn from(event: message2::OutEvent) -> Self {
match event { match event {
message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), message2::OutEvent::Msg => OutEvent::Message2,
}
}
}
impl From<message4::OutEvent> for OutEvent {
fn from(event: message4::OutEvent) -> Self {
match event {
message4::OutEvent::Msg(msg) => OutEvent::Message4(Box::new(msg)),
} }
} }
} }
@ -272,6 +283,7 @@ pub struct Behaviour {
message0: message0::Behaviour, message0: message0::Behaviour,
message1: message1::Behaviour, message1: message1::Behaviour,
message2: message2::Behaviour, message2: message2::Behaviour,
message4: message4::Behaviour,
message5: message5::Behaviour, message5: message5::Behaviour,
} }

View File

@ -3,7 +3,7 @@ use crate::{
network::{transport::SwapTransport, TokioExecutor}, network::{transport::SwapTransport, TokioExecutor},
protocol::{ protocol::{
alice, alice,
alice::SwapResponse, alice::{Message4, SwapResponse},
bob::{self, Behaviour, OutEvent, SwapRequest}, bob::{self, Behaviour, OutEvent, SwapRequest},
}, },
}; };
@ -40,7 +40,7 @@ pub struct EventLoopHandle {
swap_response: Receiver<SwapResponse>, swap_response: Receiver<SwapResponse>,
msg0: Receiver<alice::Message0>, msg0: Receiver<alice::Message0>,
msg1: Receiver<alice::Message1>, msg1: Receiver<alice::Message1>,
msg2: Receiver<alice::Message2>, msg4: Receiver<Message4>,
conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
dial_alice: Sender<()>, dial_alice: Sender<()>,
send_swap_request: Sender<SwapRequest>, send_swap_request: Sender<SwapRequest>,
@ -72,11 +72,11 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice"))
} }
pub async fn recv_message2(&mut self) -> Result<alice::Message2> { pub async fn recv_message4(&mut self) -> Result<Message4> {
self.msg2 self.msg4
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed o receive message 2 from Alice")) .ok_or_else(|| anyhow!("Failed to receive message 4 from Alice"))
} }
/// Dials other party and wait for the connection to be established. /// Dials other party and wait for the connection to be established.
@ -126,7 +126,7 @@ pub struct EventLoop {
swap_response: Sender<SwapResponse>, swap_response: Sender<SwapResponse>,
msg0: Sender<alice::Message0>, msg0: Sender<alice::Message0>,
msg1: Sender<alice::Message1>, msg1: Sender<alice::Message1>,
msg2: Sender<alice::Message2>, msg4: Sender<Message4>,
conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
dial_alice: Receiver<()>, dial_alice: Receiver<()>,
send_swap_request: Receiver<SwapRequest>, send_swap_request: Receiver<SwapRequest>,
@ -155,7 +155,7 @@ impl EventLoop {
let swap_response = Channels::new(); let swap_response = Channels::new();
let msg0 = Channels::new(); let msg0 = Channels::new();
let msg1 = Channels::new(); let msg1 = Channels::new();
let msg2 = Channels::new(); let msg4 = Channels::new();
let conn_established = Channels::new(); let conn_established = Channels::new();
let dial_alice = Channels::new(); let dial_alice = Channels::new();
let send_swap_request = Channels::new(); let send_swap_request = Channels::new();
@ -170,7 +170,7 @@ impl EventLoop {
swap_response: swap_response.sender, swap_response: swap_response.sender,
msg0: msg0.sender, msg0: msg0.sender,
msg1: msg1.sender, msg1: msg1.sender,
msg2: msg2.sender, msg4: msg4.sender,
conn_established: conn_established.sender, conn_established: conn_established.sender,
dial_alice: dial_alice.receiver, dial_alice: dial_alice.receiver,
send_swap_request: send_swap_request.receiver, send_swap_request: send_swap_request.receiver,
@ -184,7 +184,7 @@ impl EventLoop {
swap_response: swap_response.receiver, swap_response: swap_response.receiver,
msg0: msg0.receiver, msg0: msg0.receiver,
msg1: msg1.receiver, msg1: msg1.receiver,
msg2: msg2.receiver, msg4: msg4.receiver,
conn_established: conn_established.receiver, conn_established: conn_established.receiver,
dial_alice: dial_alice.sender, dial_alice: dial_alice.sender,
send_swap_request: send_swap_request.sender, send_swap_request: send_swap_request.sender,
@ -214,8 +214,9 @@ impl EventLoop {
OutEvent::Message1(msg) => { OutEvent::Message1(msg) => {
let _ = self.msg1.send(*msg).await; let _ = self.msg1.send(*msg).await;
} }
OutEvent::Message2(msg) => { OutEvent::Message2 => info!("Alice acknowledged message 2 received"),
let _ = self.msg2.send(msg).await; OutEvent::Message4(msg) => {
let _ = self.msg4.send(*msg).await;
} }
OutEvent::Message5 => info!("Alice acknowledged message 5 received"), OutEvent::Message5 => info!("Alice acknowledged message 5 received"),
} }

View File

@ -1,7 +1,4 @@
use crate::{ use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT};
network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT},
protocol::alice,
};
use ecdsa_fun::Signature; use ecdsa_fun::Signature;
use libp2p::{ use libp2p::{
request_response::{ request_response::{
@ -25,9 +22,9 @@ pub struct Message2 {
pub(crate) tx_cancel_sig: Signature, pub(crate) tx_cancel_sig: Signature,
} }
#[derive(Debug)] #[derive(Clone, Copy, Debug)]
pub enum OutEvent { pub enum OutEvent {
Msg(alice::Message2), Msg,
} }
/// A `NetworkBehaviour` that represents sending message 2 to Alice. /// A `NetworkBehaviour` that represents sending message 2 to Alice.
@ -87,9 +84,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
message: RequestResponseMessage::Response { response, .. }, message: RequestResponseMessage::Response { response, .. },
.. ..
} => { } => {
if let AliceToBob::Message2(msg) = response { if let AliceToBob::Message2 = response {
debug!("Received Message2"); debug!("Received Message 2 acknowledgement");
self.events.push_back(OutEvent::Msg(msg)); self.events.push_back(OutEvent::Msg);
} }
} }
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {

View File

@ -0,0 +1,96 @@
use crate::{
network::request_response::{Message4Protocol, OneShotCodec, Request, Response, TIMEOUT},
protocol::alice::Message4,
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg(Message4),
}
/// A `NetworkBehaviour` that represents receiving of message 4 from Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<OneShotCodec<Message4Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<OneShotCodec<Message4Protocol>>, OutEvent>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
OneShotCodec::default(),
vec![(Message4Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) {
match event {
RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
if let Request::Message4(msg) = request {
debug!("Received message 4");
self.events.push_back(OutEvent::Msg(*msg));
// Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, Response::Message4);
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Bob should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
}
}
}

View File

@ -130,7 +130,7 @@ where
{ {
event_loop_handle.dial().await?; event_loop_handle.dial().await?;
let msg2_watcher = event_loop_handle.recv_message2(); let msg4_watcher = event_loop_handle.recv_message4();
let cancel_timelock_expires = let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref());
@ -143,13 +143,11 @@ where
monero_wallet.inner.block_height().await?; monero_wallet.inner.block_height().await?;
select! { select! {
msg2 = msg2_watcher => { msg4 = msg4_watcher => {
let msg4 = msg4?;
let msg2 = msg2?;
BobState::XmrLockProofReceived { BobState::XmrLockProofReceived {
state: state3, state: state3,
lock_transfer_proof: msg2.tx_lock_proof, lock_transfer_proof: msg4.tx_lock_proof,
monero_wallet_restore_blockheight monero_wallet_restore_blockheight
} }
}, },