From 3d8866f1a04593476152f75c0f798fe45f4e7222 Mon Sep 17 00:00:00 2001 From: rishflab Date: Wed, 9 Dec 2020 18:08:26 +1100 Subject: [PATCH] Convert event loop to use fused futures select Co-authored-by: Daniel Karzel --- swap/src/alice/swarm_driver.rs | 74 ++++++++++++++------------ swap/src/bob/execution.rs | 1 + swap/src/bob/swarm_driver.rs | 94 ++++++++++++++++++++-------------- swap/tests/e2e.rs | 6 +-- 4 files changed, 102 insertions(+), 73 deletions(-) diff --git a/swap/src/alice/swarm_driver.rs b/swap/src/alice/swarm_driver.rs index 65365aaa..e631457c 100644 --- a/swap/src/alice/swarm_driver.rs +++ b/swap/src/alice/swarm_driver.rs @@ -4,7 +4,10 @@ use crate::{ SwapAmounts, }; use anyhow::{Context, Result}; -use libp2p::{core::Multiaddr, request_response::ResponseChannel, PeerId, Swarm}; +use futures::FutureExt; +use libp2p::{ + core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm, +}; use tokio::sync::mpsc::{Receiver, Sender}; use xmr_btc::{alice, bob}; @@ -178,37 +181,44 @@ impl SwarmDriver { pub async fn run(&mut self) { loop { - match self.swarm.next().await { - OutEvent::ConnectionEstablished(alice) => { - let _ = self.conn_established.send(alice).await; - } - OutEvent::Message0(msg) => { - let _ = self.msg0.send(msg).await; - } - OutEvent::Message1 { msg, channel } => { - let _ = self.msg1.send((msg, channel)).await; - } - OutEvent::Message2 { msg, channel } => { - let _ = self.msg2.send((msg, channel)).await; - } - OutEvent::Message3(msg) => { - let _ = self.msg3.send(msg).await; - } - OutEvent::Request(event) => { - let _ = self.request.send(event).await; - } - }; - - if let Ok((channel, amounts)) = self.send_amounts.try_recv() { - self.swarm.send_amounts(channel, amounts); - } - - if let Ok((channel, msg)) = self.send_msg1.try_recv() { - self.swarm.send_message1(channel, msg); - } - - if let Ok((channel, msg)) = self.send_msg2.try_recv() { - self.swarm.send_message2(channel, msg); + tokio::select! { + swarm_event = self.swarm.next().fuse() => { + match swarm_event { + OutEvent::ConnectionEstablished(alice) => { + let _ = self.conn_established.send(alice).await; + } + OutEvent::Message0(msg) => { + let _ = self.msg0.send(msg).await; + } + OutEvent::Message1 { msg, channel } => { + let _ = self.msg1.send((msg, channel)).await; + } + OutEvent::Message2 { msg, channel } => { + let _ = self.msg2.send((msg, channel)).await; + } + OutEvent::Message3(msg) => { + let _ = self.msg3.send(msg).await; + } + OutEvent::Request(event) => { + let _ = self.request.send(event).await; + } + } + }, + amounts = self.send_amounts.next().fuse() => { + if let Some((channel, amounts)) = amounts { + self.swarm.send_amounts(channel, amounts); + } + }, + msg1 = self.send_msg1.next().fuse() => { + if let Some((channel, msg)) = msg1 { + self.swarm.send_message1(channel, msg); + } + }, + msg2 = self.send_msg2.next().fuse() => { + if let Some((channel, msg)) = msg2 { + self.swarm.send_message2(channel, msg); + } + }, } } } diff --git a/swap/src/bob/execution.rs b/swap/src/bob/execution.rs index 3c0e6adb..f74987b8 100644 --- a/swap/src/bob/execution.rs +++ b/swap/src/bob/execution.rs @@ -16,6 +16,7 @@ pub async fn negotiate( where R: RngCore + CryptoRng + Send, { + tracing::trace!("Starting negotiate"); swarm.dial_alice(addr).await?; let alice = swarm.recv_conn_established().await?; diff --git a/swap/src/bob/swarm_driver.rs b/swap/src/bob/swarm_driver.rs index 70946892..f36a9090 100644 --- a/swap/src/bob/swarm_driver.rs +++ b/swap/src/bob/swarm_driver.rs @@ -3,8 +3,12 @@ use crate::{ network::{transport::SwapTransport, TokioExecutor}, }; use anyhow::Result; +use futures::FutureExt; use libp2p::{core::Multiaddr, PeerId}; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::{ + stream::StreamExt, + sync::mpsc::{Receiver, Sender}, +}; use tracing::info; use xmr_btc::{alice, bitcoin::EncryptedSignature, bob}; @@ -69,6 +73,7 @@ impl SwarmDriverHandle { } pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { + info!("sending msg to ourselves to dial alice: {}", addr); let _ = self.dial_alice.send(addr).await?; Ok(()) } @@ -177,45 +182,58 @@ impl SwarmDriver { pub async fn run(mut self) { loop { - match self.swarm.next().await { - OutEvent::ConnectionEstablished(alice) => { - let _ = self.conn_established.send(alice).await; + tokio::select! { + swarm_event = self.swarm.next().fuse() => { + match swarm_event { + OutEvent::ConnectionEstablished(alice) => { + let _ = self.conn_established.send(alice).await; + } + OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"), + OutEvent::Message0(msg) => { + let _ = self.msg0.send(msg).await; + } + OutEvent::Message1(msg) => { + let _ = self.msg1.send(msg).await; + } + OutEvent::Message2(msg) => { + let _ = self.msg2.send(msg).await; + } + OutEvent::Message3 => info!("Alice acknowledged message 3 received"), + } + }, + addr = self.dial_alice.next().fuse() => { + if let Some(addr) = addr { + info!("dialing alice: {}", addr); + libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice"); + } + }, + amounts = self.request_amounts.next().fuse() => { + if let Some((peer_id, btc_amount)) = amounts { + self.swarm.request_amounts(peer_id, btc_amount.as_sat()); + } + }, + + msg0 = self.send_msg0.next().fuse() => { + if let Some((peer_id, msg)) = msg0 { + self.swarm.send_message0(peer_id, msg); + } } - OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"), - OutEvent::Message0(msg) => { - let _ = self.msg0.send(msg).await; + + msg1 = self.send_msg1.next().fuse() => { + if let Some((peer_id, msg)) = msg1 { + self.swarm.send_message1(peer_id, msg); + } + }, + msg2 = self.send_msg2.next().fuse() => { + if let Some((peer_id, msg)) = msg2 { + self.swarm.send_message2(peer_id, msg); + } + }, + msg3 = self.send_msg3.next().fuse() => { + if let Some((peer_id, tx_redeem_encsig)) = msg3 { + self.swarm.send_message3(peer_id, tx_redeem_encsig); + } } - OutEvent::Message1(msg) => { - let _ = self.msg1.send(msg).await; - } - OutEvent::Message2(msg) => { - let _ = self.msg2.send(msg).await; - } - OutEvent::Message3 => info!("Alice acknowledged message 3 received"), - }; - - if let Ok(addr) = self.dial_alice.try_recv() { - libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice"); - } - - if let Ok((peer_id, btc_amount)) = self.request_amounts.try_recv() { - self.swarm.request_amounts(peer_id, btc_amount.as_sat()); - } - - if let Ok((peer_id, msg)) = self.send_msg0.try_recv() { - self.swarm.send_message0(peer_id, msg); - } - - if let Ok((peer_id, msg)) = self.send_msg1.try_recv() { - self.swarm.send_message1(peer_id, msg); - } - - if let Ok((peer_id, msg)) = self.send_msg2.try_recv() { - self.swarm.send_message2(peer_id, msg); - } - - if let Ok((peer_id, tx_redeem_encsig)) = self.send_msg3.try_recv() { - self.swarm.send_message3(peer_id, tx_redeem_encsig); } } } diff --git a/swap/tests/e2e.rs b/swap/tests/e2e.rs index f6299268..f277f0b1 100644 --- a/swap/tests/e2e.rs +++ b/swap/tests/e2e.rs @@ -20,7 +20,7 @@ use xmr_btc::{bitcoin, config::Config, cross_curve_dleq}; async fn happy_path() { use tracing_subscriber::util::SubscriberInitExt as _; let _guard = tracing_subscriber::fmt() - .with_env_filter("swap=info,xmr_btc=info") + .with_env_filter("swap=trace,xmr_btc=trace") .with_ansi(false) .set_default(); @@ -48,7 +48,7 @@ async fn happy_path() { let ( alice_state, - mut alice_swarm, + mut alice_swarm_driver, alice_swarm_handle, alice_btc_wallet, alice_xmr_wallet, @@ -85,7 +85,7 @@ async fn happy_path() { Config::regtest(), ); - let _alice_swarm_fut = tokio::spawn(async move { alice_swarm.run().await }); + let _alice_swarm_fut = tokio::spawn(async move { alice_swarm_driver.run().await }); let bob_swap_fut = bob::swap::swap( bob_state,