diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 9c9b21c4..1c95eb19 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -339,6 +339,11 @@ impl Behaviour { Ok(()) } + pub fn start_execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) { + self.execution_setup.run(bob_peer_id, state0); + info!("Start execution setup with {}", bob_peer_id); + } + /// Send Message0 to Bob in response to receiving his Message0. pub fn send_message0( &mut self, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index d2bd2f33..685f4606 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,7 +2,7 @@ use crate::{ network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::{Behaviour, OutEvent, SwapResponse, TransferProof}, + alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof}, bob, bob::EncryptedSignature, }, @@ -38,10 +38,12 @@ pub struct EventLoopHandle { recv_message0: Receiver<(bob::Message0, ResponseChannel)>, recv_message1: Receiver<(bob::Message1, ResponseChannel)>, recv_message2: Receiver, + done_execution_setup: Receiver>, recv_encrypted_signature: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, + start_execution_setup: Sender<(PeerId, State0)>, send_message0: Sender<(ResponseChannel, alice::Message0)>, send_message1: Sender<(ResponseChannel, alice::Message1)>, send_transfer_proof: Sender<(PeerId, TransferProof)>, @@ -77,6 +79,18 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) } + pub async fn execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) -> Result { + let _ = self + .start_execution_setup + .send((bob_peer_id, state0)) + .await?; + + self.done_execution_setup + .recv() + .await + .ok_or_else(|| anyhow!("Failed to setup execution with Bob"))? + } + pub async fn recv_encrypted_signature(&mut self) -> Result { self.recv_encrypted_signature .recv() @@ -140,6 +154,8 @@ pub struct EventLoop { recv_message0: Sender<(bob::Message0, ResponseChannel)>, recv_message1: Sender<(bob::Message1, ResponseChannel)>, recv_message2: Sender, + start_execution_setup: Receiver<(PeerId, State0)>, + done_execution_setup: Sender>, recv_encrypted_signature: Sender, request: Sender, conn_established: Sender, @@ -169,6 +185,8 @@ impl EventLoop { let recv_message0 = Channels::new(); let recv_message1 = Channels::new(); let recv_message2 = Channels::new(); + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); let recv_encrypted_signature = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); @@ -183,6 +201,8 @@ impl EventLoop { recv_message0: recv_message0.sender, recv_message1: recv_message1.sender, recv_message2: recv_message2.sender, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, recv_encrypted_signature: recv_encrypted_signature.sender, request: request.sender, conn_established: conn_established.sender, @@ -197,6 +217,8 @@ impl EventLoop { recv_message0: recv_message0.receiver, recv_message1: recv_message1.receiver, recv_message2: recv_message2.receiver, + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, recv_encrypted_signature: recv_encrypted_signature.receiver, request: request.receiver, conn_established: conn_established.receiver, @@ -227,8 +249,8 @@ impl EventLoop { OutEvent::Message2 { msg, bob_peer_id : _} => { let _ = self.recv_message2.send(*msg).await; } - OutEvent::ExecutionSetupDone(_res) => { - todo!() + OutEvent::ExecutionSetupDone(res) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); @@ -250,6 +272,13 @@ impl EventLoop { .map_err(|err|error!("Failed to send swap response: {:#}", err)); } }, + option = self.start_execution_setup.recv().fuse() => { + if let Some((bob_peer_id, state0)) = option { + let _ = self + .swarm + .start_execution_setup(bob_peer_id, state0); + } + }, msg0 = self.send_message0.recv().fuse() => { if let Some((channel, msg)) = msg0 { let _ = self diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index 38233614..8360c1de 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -39,7 +39,7 @@ impl Default for Behaviour { } impl Behaviour { - fn run(&mut self, bob: PeerId, state0: State0) { + pub fn run(&mut self, bob: PeerId, state0: State0) { self.inner .do_protocol_listener(bob, move |mut substream| async move { let alice_message0 = state0.next_message(); diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index 3272d616..08f15cc0 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -55,39 +55,12 @@ pub async fn negotiate( .send_swap_response(event.channel, SwapResponse { xmr_amount }) .await?; - let (bob_message0, channel) = timeout( + let state3 = timeout( execution_params.bob_time_to_act, - event_loop_handle.recv_message0(), + event_loop_handle.execution_setup(bob_peer_id, state0), ) .await??; - let alice_message0 = state0.next_message(); - event_loop_handle - .send_message0(channel, alice_message0) - .await?; - - let state1 = state0.receive(bob_message0)?; - - let (bob_message1, channel) = timeout( - execution_params.bob_time_to_act, - event_loop_handle.recv_message1(), - ) - .await??; - - let state2 = state1.receive(bob_message1); - - event_loop_handle - .send_message1(channel, state2.next_message()) - .await?; - - let bob_message2 = timeout( - execution_params.bob_time_to_act, - event_loop_handle.recv_message2(), - ) - .await??; - - let state3 = state2.receive(bob_message2)?; - Ok((bob_peer_id, state3)) } diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 6a7ec09f..8c7226c9 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -163,6 +163,7 @@ impl Builder { } } } + fn init_event_loop( &self, ) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> { @@ -175,6 +176,7 @@ impl Builder { self.peer_id, self.alice_peer_id, self.alice_address.clone(), + self.bitcoin_wallet.clone(), ) } @@ -302,6 +304,17 @@ impl Behaviour { info!("Requesting swap from: {}", alice); } + pub fn start_execution_setup( + &mut self, + alice_peer_id: PeerId, + state0: State0, + bitcoin_wallet: Arc, + ) { + self.execution_setup + .run(alice_peer_id, state0, bitcoin_wallet); + info!("Start execution setup with {}", alice_peer_id); + } + /// Sends Bob's first message to Alice. pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { self.message0.send(alice, msg); diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index fcb1da17..abc646b6 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,15 +1,17 @@ use crate::{ + bitcoin, bitcoin::EncryptedSignature, network::{transport::SwapTransport, TokioExecutor}, protocol::{ alice, alice::{SwapResponse, TransferProof}, - bob::{self, Behaviour, OutEvent, SwapRequest}, + bob::{self, Behaviour, OutEvent, State0, State2, SwapRequest}, }, }; use anyhow::{anyhow, Result}; use futures::FutureExt; use libp2p::{core::Multiaddr, PeerId}; +use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, error, info}; @@ -37,6 +39,8 @@ pub struct EventLoopHandle { recv_swap_response: Receiver, recv_message0: Receiver, recv_message1: Receiver, + start_execution_setup: Sender, + done_execution_setup: Receiver>, recv_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, @@ -70,6 +74,15 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) } + pub async fn execution_setup(&mut self, state0: State0) -> Result { + let _ = self.start_execution_setup.send(state0).await?; + + self.done_execution_setup + .recv() + .await + .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? + } + pub async fn recv_transfer_proof(&mut self) -> Result { self.recv_transfer_proof .recv() @@ -128,10 +141,13 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, + bitcoin_wallet: Arc, alice_peer_id: PeerId, recv_swap_response: Sender, recv_message0: Sender, recv_message1: Sender, + start_execution_setup: Receiver, + done_execution_setup: Sender>, recv_transfer_proof: Sender, dial_alice: Receiver<()>, conn_established: Sender, @@ -150,6 +166,7 @@ impl EventLoop { peer_id: PeerId, alice_peer_id: PeerId, alice_addr: Multiaddr, + bitcoin_wallet: Arc, ) -> Result<(Self, EventLoopHandle)> { let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) .executor(Box::new(TokioExecutor { @@ -162,6 +179,8 @@ impl EventLoop { let swap_response = Channels::new(); let recv_message0 = Channels::new(); let recv_message1 = Channels::new(); + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); let recv_transfer_proof = Channels::new(); let dial_alice = Channels::new(); let conn_established = Channels::new(); @@ -175,9 +194,12 @@ impl EventLoop { let event_loop = EventLoop { swarm, alice_peer_id, + bitcoin_wallet, recv_swap_response: swap_response.sender, recv_message0: recv_message0.sender, recv_message1: recv_message1.sender, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, @@ -193,6 +215,8 @@ impl EventLoop { recv_swap_response: swap_response.receiver, recv_message0: recv_message0.receiver, recv_message1: recv_message1.receiver, + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, recv_transfer_proof: recv_transfer_proof.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, @@ -225,8 +249,8 @@ impl EventLoop { let _ = self.recv_message1.send(*msg).await; } OutEvent::Message2 => info!("Alice acknowledged message 2 received"), - OutEvent::ExecutionSetupDone(_res) => { - todo!() + OutEvent::ExecutionSetupDone(res) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } OutEvent::TransferProof(msg) => { let _ = self.recv_transfer_proof.send(*msg).await; @@ -275,6 +299,13 @@ impl EventLoop { self.swarm.send_message2(self.alice_peer_id, msg); } }, + option = self.start_execution_setup.recv().fuse() => { + if let Some(state0) = option { + let _ = self + .swarm + .start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone()); + } + }, encrypted_signature = self.send_encrypted_signature.recv().fuse() => { if let Some(tx_redeem_encsig) = encrypted_signature { self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig); diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs index ed2de167..3a0f3af0 100644 --- a/swap/src/protocol/bob/execution_setup.rs +++ b/swap/src/protocol/bob/execution_setup.rs @@ -40,7 +40,12 @@ impl Default for Behaviour { } impl Behaviour { - fn run(&mut self, alice: PeerId, state0: State0, bitcoin_wallet: Arc) { + pub fn run( + &mut self, + alice: PeerId, + state0: State0, + bitcoin_wallet: Arc, + ) { self.inner .do_protocol_dialer(alice, move |mut substream| async move { let bob_message0 = state0.next_message(); diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 400a5be8..5036b52c 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -70,13 +70,7 @@ async fn run_until_internal( BobState::Started { state0, amounts } => { event_loop_handle.dial().await?; - let state2 = negotiate( - state0, - amounts, - &mut event_loop_handle, - bitcoin_wallet.clone(), - ) - .await?; + let state2 = negotiate(state0, amounts, &mut event_loop_handle).await?; let state = BobState::Negotiated(state2); let db_state = state.clone().into(); @@ -378,7 +372,6 @@ pub async fn negotiate( state0: crate::protocol::bob::state::State0, amounts: SwapAmounts, event_loop_handle: &mut EventLoopHandle, - bitcoin_wallet: Arc, ) -> Result { tracing::trace!("Starting negotiate"); event_loop_handle @@ -391,21 +384,7 @@ pub async fn negotiate( // argument. let _swap_response = event_loop_handle.recv_swap_response().await?; - event_loop_handle - .send_message0(state0.next_message()) - .await?; - let msg0 = event_loop_handle.recv_message0().await?; - let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?; - - event_loop_handle - .send_message1(state1.next_message()) - .await?; - let msg1 = event_loop_handle.recv_message1().await?; - let state2 = state1.receive(msg1)?; - - event_loop_handle - .send_message2(state2.next_message()) - .await?; + let state2 = event_loop_handle.execution_setup(state0).await?; Ok(state2) }