Convert event loop to use fused futures select

Co-authored-by: Daniel Karzel <daniel@comit.network>
This commit is contained in:
rishflab 2020-12-09 18:08:26 +11:00
parent 58da1df9dc
commit 3d8866f1a0
4 changed files with 102 additions and 73 deletions

View File

@ -4,7 +4,10 @@ use crate::{
SwapAmounts, SwapAmounts,
}; };
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use libp2p::{core::Multiaddr, request_response::ResponseChannel, PeerId, Swarm}; use futures::FutureExt;
use libp2p::{
core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm,
};
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use xmr_btc::{alice, bob}; use xmr_btc::{alice, bob};
@ -178,7 +181,9 @@ impl SwarmDriver {
pub async fn run(&mut self) { pub async fn run(&mut self) {
loop { loop {
match self.swarm.next().await { tokio::select! {
swarm_event = self.swarm.next().fuse() => {
match swarm_event {
OutEvent::ConnectionEstablished(alice) => { OutEvent::ConnectionEstablished(alice) => {
let _ = self.conn_established.send(alice).await; let _ = self.conn_established.send(alice).await;
} }
@ -197,19 +202,24 @@ impl SwarmDriver {
OutEvent::Request(event) => { OutEvent::Request(event) => {
let _ = self.request.send(event).await; let _ = self.request.send(event).await;
} }
}; }
},
if let Ok((channel, amounts)) = self.send_amounts.try_recv() { amounts = self.send_amounts.next().fuse() => {
if let Some((channel, amounts)) = amounts {
self.swarm.send_amounts(channel, amounts); self.swarm.send_amounts(channel, amounts);
} }
},
if let Ok((channel, msg)) = self.send_msg1.try_recv() { msg1 = self.send_msg1.next().fuse() => {
if let Some((channel, msg)) = msg1 {
self.swarm.send_message1(channel, msg); self.swarm.send_message1(channel, msg);
} }
},
if let Ok((channel, msg)) = self.send_msg2.try_recv() { msg2 = self.send_msg2.next().fuse() => {
if let Some((channel, msg)) = msg2 {
self.swarm.send_message2(channel, msg); self.swarm.send_message2(channel, msg);
} }
},
}
} }
} }
} }

View File

@ -16,6 +16,7 @@ pub async fn negotiate<R>(
where where
R: RngCore + CryptoRng + Send, R: RngCore + CryptoRng + Send,
{ {
tracing::trace!("Starting negotiate");
swarm.dial_alice(addr).await?; swarm.dial_alice(addr).await?;
let alice = swarm.recv_conn_established().await?; let alice = swarm.recv_conn_established().await?;

View File

@ -3,8 +3,12 @@ use crate::{
network::{transport::SwapTransport, TokioExecutor}, network::{transport::SwapTransport, TokioExecutor},
}; };
use anyhow::Result; use anyhow::Result;
use futures::FutureExt;
use libp2p::{core::Multiaddr, PeerId}; use libp2p::{core::Multiaddr, PeerId};
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::{
stream::StreamExt,
sync::mpsc::{Receiver, Sender},
};
use tracing::info; use tracing::info;
use xmr_btc::{alice, bitcoin::EncryptedSignature, bob}; use xmr_btc::{alice, bitcoin::EncryptedSignature, bob};
@ -69,6 +73,7 @@ impl SwarmDriverHandle {
} }
pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> {
info!("sending msg to ourselves to dial alice: {}", addr);
let _ = self.dial_alice.send(addr).await?; let _ = self.dial_alice.send(addr).await?;
Ok(()) Ok(())
} }
@ -177,7 +182,9 @@ impl SwarmDriver {
pub async fn run(mut self) { pub async fn run(mut self) {
loop { loop {
match self.swarm.next().await { tokio::select! {
swarm_event = self.swarm.next().fuse() => {
match swarm_event {
OutEvent::ConnectionEstablished(alice) => { OutEvent::ConnectionEstablished(alice) => {
let _ = self.conn_established.send(alice).await; let _ = self.conn_established.send(alice).await;
} }
@ -192,31 +199,42 @@ impl SwarmDriver {
let _ = self.msg2.send(msg).await; let _ = self.msg2.send(msg).await;
} }
OutEvent::Message3 => info!("Alice acknowledged message 3 received"), OutEvent::Message3 => info!("Alice acknowledged message 3 received"),
}; }
},
if let Ok(addr) = self.dial_alice.try_recv() { addr = self.dial_alice.next().fuse() => {
if let Some(addr) = addr {
info!("dialing alice: {}", addr);
libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice"); libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice");
} }
},
if let Ok((peer_id, btc_amount)) = self.request_amounts.try_recv() { amounts = self.request_amounts.next().fuse() => {
if let Some((peer_id, btc_amount)) = amounts {
self.swarm.request_amounts(peer_id, btc_amount.as_sat()); self.swarm.request_amounts(peer_id, btc_amount.as_sat());
} }
},
if let Ok((peer_id, msg)) = self.send_msg0.try_recv() { msg0 = self.send_msg0.next().fuse() => {
if let Some((peer_id, msg)) = msg0 {
self.swarm.send_message0(peer_id, msg); self.swarm.send_message0(peer_id, msg);
} }
}
if let Ok((peer_id, msg)) = self.send_msg1.try_recv() { msg1 = self.send_msg1.next().fuse() => {
if let Some((peer_id, msg)) = msg1 {
self.swarm.send_message1(peer_id, msg); self.swarm.send_message1(peer_id, msg);
} }
},
if let Ok((peer_id, msg)) = self.send_msg2.try_recv() { msg2 = self.send_msg2.next().fuse() => {
if let Some((peer_id, msg)) = msg2 {
self.swarm.send_message2(peer_id, msg); self.swarm.send_message2(peer_id, msg);
} }
},
if let Ok((peer_id, tx_redeem_encsig)) = self.send_msg3.try_recv() { msg3 = self.send_msg3.next().fuse() => {
if let Some((peer_id, tx_redeem_encsig)) = msg3 {
self.swarm.send_message3(peer_id, tx_redeem_encsig); self.swarm.send_message3(peer_id, tx_redeem_encsig);
} }
} }
} }
}
}
} }

View File

@ -20,7 +20,7 @@ use xmr_btc::{bitcoin, config::Config, cross_curve_dleq};
async fn happy_path() { async fn happy_path() {
use tracing_subscriber::util::SubscriberInitExt as _; use tracing_subscriber::util::SubscriberInitExt as _;
let _guard = tracing_subscriber::fmt() let _guard = tracing_subscriber::fmt()
.with_env_filter("swap=info,xmr_btc=info") .with_env_filter("swap=trace,xmr_btc=trace")
.with_ansi(false) .with_ansi(false)
.set_default(); .set_default();
@ -48,7 +48,7 @@ async fn happy_path() {
let ( let (
alice_state, alice_state,
mut alice_swarm, mut alice_swarm_driver,
alice_swarm_handle, alice_swarm_handle,
alice_btc_wallet, alice_btc_wallet,
alice_xmr_wallet, alice_xmr_wallet,
@ -85,7 +85,7 @@ async fn happy_path() {
Config::regtest(), Config::regtest(),
); );
let _alice_swarm_fut = tokio::spawn(async move { alice_swarm.run().await }); let _alice_swarm_fut = tokio::spawn(async move { alice_swarm_driver.run().await });
let bob_swap_fut = bob::swap::swap( let bob_swap_fut = bob::swap::swap(
bob_state, bob_state,