From 6f7408ecce342fbb4328d11a3136f649bfafe1bf Mon Sep 17 00:00:00 2001 From: rishflab Date: Mon, 7 Dec 2020 13:31:14 +1100 Subject: [PATCH] Drive swarm in seperate async task Previously the libp2p swarm had to be manually polled within the protocol execution code to execute actions such as sending a message. The swarm is now wrapped in SwarmDriver which polls the swarm in a seperate task --- swap/src/alice.rs | 3 +- swap/src/alice/amounts.rs | 15 ++-- swap/src/alice/execution.rs | 82 +++++------------- swap/src/alice/swap.rs | 10 +-- swap/src/alice/swarm_driver.rs | 148 +++++++++++++++++++++++++++++++++ swap/src/bob.rs | 1 + swap/src/bob/execution.rs | 37 +++------ swap/src/bob/swap.rs | 32 ++----- swap/src/bob/swarm_driver.rs | 145 ++++++++++++++++++++++++++++++++ swap/tests/e2e.rs | 21 +++-- 10 files changed, 366 insertions(+), 128 deletions(-) create mode 100644 swap/src/alice/swarm_driver.rs create mode 100644 swap/src/bob/swarm_driver.rs diff --git a/swap/src/alice.rs b/swap/src/alice.rs index 0aa0eedf..769e4e6f 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -43,6 +43,7 @@ mod message1; mod message2; mod message3; pub mod swap; +pub mod swarm_driver; pub async fn swap( bitcoin_wallet: Arc, @@ -117,7 +118,7 @@ pub async fn swap( OutEvent::ConnectionEstablished(bob) => { info!("Connection established with: {}", bob); } - OutEvent::Request(amounts::OutEvent::Btc { btc, channel }) => { + OutEvent::Request(amounts::OutEvent { btc, channel }) => { let amounts = calculate_amounts(btc); last_amounts = Some(amounts); swarm.send_amounts(channel, amounts); diff --git a/swap/src/alice/amounts.rs b/swap/src/alice/amounts.rs index 33e230d5..f1b01cb0 100644 --- a/swap/src/alice/amounts.rs +++ b/swap/src/alice/amounts.rs @@ -13,14 +13,15 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, AmountsProtocol, BobToAlice, Codec, TIMEOUT}; +use crate::{ + alice::amounts, + network::request_response::{AliceToBob, AmountsProtocol, BobToAlice, Codec, TIMEOUT}, +}; #[derive(Debug)] -pub enum OutEvent { - Btc { - btc: ::bitcoin::Amount, - channel: ResponseChannel, - }, +pub struct OutEvent { + pub btc: ::bitcoin::Amount, + pub channel: ResponseChannel, } /// A `NetworkBehaviour` that represents getting the amounts of an XMR/BTC swap. @@ -82,7 +83,7 @@ impl NetworkBehaviourEventProcess> } => { if let BobToAlice::AmountsFromBtc(btc) = request { debug!("Received amounts request"); - self.events.push_back(OutEvent::Btc { btc, channel }) + self.events.push_back(amounts::OutEvent { btc, channel }) } } RequestResponseEvent::Message { diff --git a/swap/src/alice/execution.rs b/swap/src/alice/execution.rs index 7ff638ae..d9f5efe4 100644 --- a/swap/src/alice/execution.rs +++ b/swap/src/alice/execution.rs @@ -1,7 +1,5 @@ use crate::{ - alice::{amounts, OutEvent, Swarm}, - bitcoin, monero, - network::request_response::AliceToBob, + alice::swarm_driver::SwarmDriver, bitcoin, monero, network::request_response::AliceToBob, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, }; use anyhow::{bail, Context, Result}; @@ -33,36 +31,28 @@ pub async fn negotiate( a: bitcoin::SecretKey, s_a: cross_curve_dleq::Scalar, v_a: monero::PrivateViewKey, - swarm: &mut Swarm, + swarm: &mut SwarmDriver, bitcoin_wallet: Arc, config: Config, ) -> Result<(ResponseChannel, State3)> { trace!("Starting negotiate"); - let event = timeout(config.bob_time_to_act, swarm.next()) + let _peer_id = timeout(config.bob_time_to_act, swarm.recv_conn_established()) .await - .context("Failed to receive dial connection from Bob")?; - match event { - OutEvent::ConnectionEstablished(_bob_peer_id) => {} - other => bail!("Unexpected event received: {:?}", other), - } + .context("Failed to receive dial connection from Bob")??; - let event = timeout(config.bob_time_to_act, swarm.next()) + let event = timeout(config.bob_time_to_act, swarm.recv_request()) .await - .context("Failed to receive amounts from Bob")?; - let (btc, channel) = match event { - OutEvent::Request(amounts::OutEvent::Btc { btc, channel }) => (btc, channel), - other => bail!("Unexpected event received: {:?}", other), - }; + .context("Failed to receive amounts from Bob")??; - if btc != amounts.btc { + if event.btc != amounts.btc { bail!( "Bob proposed a different amount; got {}, expected: {}", - btc, + event.btc, amounts.btc ); } - // TODO: get an ack from libp2p2 - swarm.send_amounts(channel, amounts); + + swarm.send_amounts(event.channel, amounts); let redeem_address = bitcoin_wallet.as_ref().new_address().await?; let punish_address = redeem_address.clone(); @@ -80,40 +70,21 @@ pub async fn negotiate( ); // TODO(Franck): Understand why this is needed. - swarm.set_state0(state0.clone()); + swarm.swarm.set_state0(state0.clone()); - let event = timeout(config.bob_time_to_act, swarm.next()) - .await - .context("Failed to receive message 0 from Bob")?; - let message0 = match event { - OutEvent::Message0(msg) => msg, - other => bail!("Unexpected event received: {:?}", other), - }; + let bob_message0 = timeout(config.bob_time_to_act, swarm.recv_message0()).await??; - let state1 = state0.receive(message0)?; + let state1 = state0.receive(bob_message0)?; - let event = timeout(config.bob_time_to_act, swarm.next()) - .await - .context("Failed to receive message 1 from Bob")?; - let (msg, channel) = match event { - OutEvent::Message1 { msg, channel } => (msg, channel), - other => bail!("Unexpected event: {:?}", other), - }; + let (bob_message1, channel) = timeout(config.bob_time_to_act, swarm.recv_message1()).await??; - let state2 = state1.receive(msg); + let state2 = state1.receive(bob_message1); - let message1 = state2.next_message(); - swarm.send_message1(channel, message1); + swarm.send_message1(channel, state2.next_message()); - let event = timeout(config.bob_time_to_act, swarm.next()) - .await - .context("Failed to receive message 2 from Bob")?; - let (msg, channel) = match event { - OutEvent::Message2 { msg, channel } => (msg, channel), - other => bail!("Unexpected event: {:?}", other), - }; + let (bob_message2, channel) = timeout(config.bob_time_to_act, swarm.recv_message2()).await??; - let state3 = state2.receive(msg)?; + let state3 = state2.receive(bob_message2)?; Ok((channel, state3)) } @@ -146,7 +117,7 @@ pub async fn lock_xmr( channel: ResponseChannel, amounts: SwapAmounts, state3: State3, - swarm: &mut Swarm, + swarm: &mut SwarmDriver, monero_wallet: Arc, ) -> Result<()> where @@ -173,20 +144,13 @@ where } pub async fn wait_for_bitcoin_encrypted_signature( - swarm: &mut Swarm, + swarm: &mut SwarmDriver, timeout_duration: Duration, ) -> Result { - let event = timeout(timeout_duration, swarm.next()) + let msg3 = timeout(timeout_duration, swarm.recv_message3()) .await - .context("Failed to receive Bitcoin encrypted signature from Bob")?; - - match event { - OutEvent::Message3(msg) => Ok(msg.tx_redeem_encsig), - other => bail!( - "Expected Bob's Bitcoin redeem encrypted signature, got: {:?}", - other - ), - } + .context("Failed to receive Bitcoin encrypted signature from Bob")??; + Ok(msg3.tx_redeem_encsig) } pub fn build_bitcoin_redeem_transaction( diff --git a/swap/src/alice/swap.rs b/swap/src/alice/swap.rs index 8cac8e6b..66299c5d 100644 --- a/swap/src/alice/swap.rs +++ b/swap/src/alice/swap.rs @@ -8,7 +8,7 @@ use crate::{ publish_bitcoin_redeem_transaction, publish_cancel_transaction, wait_for_bitcoin_encrypted_signature, wait_for_bitcoin_refund, wait_for_locked_bitcoin, }, - Swarm, + swarm_driver::SwarmDriver, }, bitcoin, bitcoin::EncryptedSignature, @@ -109,11 +109,11 @@ impl fmt::Display for AliceState { pub async fn swap( state: AliceState, - swarm: Swarm, + swarm: SwarmDriver, bitcoin_wallet: Arc, monero_wallet: Arc, config: Config, -) -> Result<(AliceState, Swarm)> { +) -> Result<(AliceState, SwarmDriver)> { run_until( state, is_complete, @@ -147,11 +147,11 @@ pub fn is_xmr_locked(state: &AliceState) -> bool { pub async fn run_until( state: AliceState, is_target_state: fn(&AliceState) -> bool, - mut swarm: Swarm, + mut swarm: SwarmDriver, bitcoin_wallet: Arc, monero_wallet: Arc, config: Config, -) -> Result<(AliceState, Swarm)> { +) -> Result<(AliceState, SwarmDriver)> { info!("Current state:{}", state); if is_target_state(&state) { Ok((state, swarm)) diff --git a/swap/src/alice/swarm_driver.rs b/swap/src/alice/swarm_driver.rs new file mode 100644 index 00000000..b0947920 --- /dev/null +++ b/swap/src/alice/swarm_driver.rs @@ -0,0 +1,148 @@ +use crate::{ + alice::{Behaviour, OutEvent}, + network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, + SwapAmounts, +}; +use anyhow::{Context, Result}; +use libp2p::{core::Multiaddr, request_response::ResponseChannel, PeerId, Swarm}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tracing::info; +use xmr_btc::{alice, bob}; + +pub struct Channels { + sender: Sender, + receiver: Receiver, +} + +impl Channels { + pub fn new() -> Channels { + let (sender, receiver) = tokio::sync::mpsc::channel(100); + Channels { sender, receiver } + } +} + +impl Default for Channels { + fn default() -> Self { + Self::new() + } +} + +pub struct SwarmDriver { + pub swarm: libp2p::Swarm, + pub msg0: Channels, + pub msg1: Channels<(bob::Message1, ResponseChannel)>, + pub msg2: Channels<(bob::Message2, ResponseChannel)>, + pub msg3: Channels, + pub request: Channels, + pub conn_established: Channels, +} + +impl SwarmDriver { + pub fn new(transport: SwapTransport, behaviour: Behaviour, listen: Multiaddr) -> Result { + let local_peer_id = behaviour.peer_id(); + + let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id) + .executor(Box::new(TokioExecutor { + handle: tokio::runtime::Handle::current(), + })) + .build(); + + Swarm::listen_on(&mut swarm, listen.clone()) + .with_context(|| format!("Address is not supported: {:#}", listen))?; + + Ok(SwarmDriver { + swarm, + msg0: Channels::new(), + msg1: Channels::new(), + msg2: Channels::new(), + msg3: Channels::new(), + request: Channels::new(), + conn_established: Channels::new(), + }) + } + + pub async fn poll_swarm(mut self) { + loop { + match self.swarm.next().await { + OutEvent::ConnectionEstablished(alice) => { + let _ = self.conn_established.sender.send(alice).await; + } + OutEvent::Message0(msg) => { + let _ = self.msg0.sender.send(msg).await; + } + OutEvent::Message1 { msg, channel } => { + let _ = self.msg1.sender.send((msg, channel)).await; + } + OutEvent::Message2 { msg, channel } => { + let _ = self.msg2.sender.send((msg, channel)).await; + } + OutEvent::Message3(msg) => { + let _ = self.msg3.sender.send(msg).await; + } + OutEvent::Request(event) => { + let _ = self.request.sender.send(event).await; + } + }; + } + } + + pub fn send_amounts(&mut self, channel: ResponseChannel, amounts: SwapAmounts) { + let msg = AliceToBob::Amounts(amounts); + self.swarm.amounts.send(channel, msg); + info!("Sent amounts response"); + } + + pub fn send_message1(&mut self, channel: ResponseChannel, msg: alice::Message1) { + self.swarm.send_message1(channel, msg); + } + + pub fn send_message2(&mut self, channel: ResponseChannel, msg: alice::Message2) { + self.swarm.send_message2(channel, msg); + } + + pub async fn recv_conn_established(&mut self) -> Result { + self.conn_established + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive connection established from Bob")) + } + + pub async fn recv_message0(&mut self) -> Result { + self.msg0 + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Bob")) + } + + pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel)> { + self.msg1 + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Bob")) + } + + pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel)> { + self.msg2 + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed o receive message 2 from Bob")) + } + + pub async fn recv_message3(&mut self) -> Result { + self.msg3.receiver.recv().await.ok_or_else(|| { + anyhow::Error::msg("Failed to receive Bitcoin encrypted signature from Bob") + }) + } + + pub async fn recv_request(&mut self) -> Result { + self.request + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive amounts request from Bob")) + } +} diff --git a/swap/src/bob.rs b/swap/src/bob.rs index f5602b8a..bf7cd48a 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -23,6 +23,7 @@ mod message1; mod message2; mod message3; pub mod swap; +pub mod swarm_driver; use self::{amounts::*, message0::*, message1::*, message2::*, message3::*}; use crate::{ diff --git a/swap/src/bob/execution.rs b/swap/src/bob/execution.rs index abe31815..8d0babe7 100644 --- a/swap/src/bob/execution.rs +++ b/swap/src/bob/execution.rs @@ -1,7 +1,4 @@ -use crate::{ - bob::{OutEvent, Swarm}, - SwapAmounts, -}; +use crate::{bob::swarm_driver::SwarmDriver, SwapAmounts}; use anyhow::Result; use libp2p::core::Multiaddr; use rand::{CryptoRng, RngCore}; @@ -10,8 +7,8 @@ use xmr_btc::bob::State2; pub async fn negotiate( state0: xmr_btc::bob::State0, - amounts: SwapAmounts, - swarm: &mut Swarm, + _amounts: SwapAmounts, + swarm: &mut SwarmDriver, addr: Multiaddr, mut rng: R, bitcoin_wallet: Arc, @@ -19,32 +16,20 @@ pub async fn negotiate( where R: RngCore + CryptoRng + Send, { - libp2p::Swarm::dial_addr(swarm, addr)?; + swarm.dial_alice(addr)?; - let alice = match swarm.next().await { - OutEvent::ConnectionEstablished(alice) => alice, - other => panic!("unexpected event: {:?}", other), - }; + let alice = swarm.recv_conn_established().await?; - swarm.request_amounts(alice.clone(), amounts.btc.as_sat()); - - // todo: see if we can remove - let (_btc, _xmr) = match swarm.next().await { - OutEvent::Amounts(amounts) => (amounts.btc, amounts.xmr), - other => panic!("unexpected event: {:?}", other), - }; + swarm.request_amounts(alice.clone()); + swarm.recv_amounts().await?; swarm.send_message0(alice.clone(), state0.next_message(&mut rng)); - let state1 = match swarm.next().await { - OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?, - other => panic!("unexpected event: {:?}", other), - }; + let msg0 = swarm.recv_message0().await?; + let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?; swarm.send_message1(alice.clone(), state1.next_message()); - let state2 = match swarm.next().await { - OutEvent::Message1(msg) => state1.receive(msg)?, - other => panic!("unexpected event: {:?}", other), - }; + let msg1 = swarm.recv_message1().await?; + let state2 = state1.receive(msg1)?; swarm.send_message2(alice.clone(), state2.next_message()); diff --git a/swap/src/bob/swap.rs b/swap/src/bob/swap.rs index 38e2a7f7..f87bc106 100644 --- a/swap/src/bob/swap.rs +++ b/swap/src/bob/swap.rs @@ -1,5 +1,5 @@ use crate::{ - bob::{execution::negotiate, OutEvent, Swarm}, + bob::{execution::negotiate, swarm_driver::SwarmDriver}, storage::Database, SwapAmounts, }; @@ -8,7 +8,7 @@ use async_recursion::async_recursion; use libp2p::{core::Multiaddr, PeerId}; use rand::{CryptoRng, RngCore}; use std::{fmt, sync::Arc}; -use tracing::{debug, info}; +use tracing::info; use uuid::Uuid; use xmr_btc::bob::{self}; @@ -53,7 +53,7 @@ impl fmt::Display for BobState { pub async fn swap( state: BobState, - swarm: Swarm, + swarm: SwarmDriver, db: Database, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -100,7 +100,7 @@ pub fn is_xmr_locked(state: &BobState) -> bool { pub async fn run_until( state: BobState, is_target_state: fn(&BobState) -> bool, - mut swarm: Swarm, + mut swarm: SwarmDriver, db: Database, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -162,14 +162,11 @@ where // Watch for Alice to Lock Xmr or for t1 to elapse BobState::BtcLocked(state3, alice_peer_id) => { // todo: watch until t1, not indefinetely - let state4 = match swarm.next().await { - OutEvent::Message2(msg) => { - state3 - .watch_for_lock_xmr(monero_wallet.as_ref(), msg) - .await? - } - other => panic!("unexpected event: {:?}", other), - }; + let msg2 = swarm.recv_message2().await?; + let state4 = state3 + .watch_for_lock_xmr(monero_wallet.as_ref(), msg2) + .await?; + run_until( BobState::XmrLocked(state4, alice_peer_id), is_target_state, @@ -192,17 +189,6 @@ where // should happen in this arm? swarm.send_message3(alice_peer_id.clone(), tx_redeem_encsig); - // Sadly we have to poll the swarm to get make sure the message is sent? - // FIXME: Having to wait for Alice's response here is a big problem, because - // we're stuck if she doesn't send her response back. I believe this is - // currently necessary, so we may have to rework this and/or how we use libp2p - match swarm.next().await { - OutEvent::Message3 => { - debug!("Got Message3 empty response"); - } - other => panic!("unexpected event: {:?}", other), - }; - run_until( BobState::EncSigSent(state, alice_peer_id), is_target_state, diff --git a/swap/src/bob/swarm_driver.rs b/swap/src/bob/swarm_driver.rs new file mode 100644 index 00000000..69b42c36 --- /dev/null +++ b/swap/src/bob/swarm_driver.rs @@ -0,0 +1,145 @@ +use crate::{ + bob::{Behaviour, OutEvent}, + network::{transport::SwapTransport, TokioExecutor}, + SwapAmounts, +}; +use anyhow::Result; +use libp2p::{core::Multiaddr, PeerId}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tracing::info; +use xmr_btc::{alice, bitcoin::EncryptedSignature, bob}; + +pub struct Channels { + sender: Sender, + receiver: Receiver, +} + +impl Channels { + pub fn new() -> Channels { + let (sender, receiver) = tokio::sync::mpsc::channel(100); + Channels { sender, receiver } + } +} + +impl Default for Channels { + fn default() -> Self { + Self::new() + } +} + +pub struct SwarmDriver { + pub swarm: libp2p::Swarm, + pub amounts: Channels, + pub msg0: Channels, + pub msg1: Channels, + pub msg2: Channels, + pub conn_established: Channels, +} + +impl SwarmDriver { + pub fn new(transport: SwapTransport, behaviour: Behaviour) -> Self { + let local_peer_id = behaviour.peer_id(); + + let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id) + .executor(Box::new(TokioExecutor { + handle: tokio::runtime::Handle::current(), + })) + .build(); + + SwarmDriver { + swarm, + amounts: Channels::new(), + msg0: Channels::new(), + msg1: Channels::new(), + msg2: Channels::new(), + conn_established: Channels::new(), + } + } + + pub async fn poll_swarm(mut self) { + loop { + match self.swarm.next().await { + OutEvent::ConnectionEstablished(alice) => { + let _ = self.conn_established.sender.send(alice).await; + } + OutEvent::Amounts(amounts) => { + let _ = self.amounts.sender.send(amounts).await; + } + OutEvent::Message0(msg) => { + let _ = self.msg0.sender.send(msg).await; + } + OutEvent::Message1(msg) => { + let _ = self.msg1.sender.send(msg).await; + } + OutEvent::Message2(msg) => { + let _ = self.msg2.sender.send(msg).await; + } + OutEvent::Message3 => info!("Alice acknowledged message 3 received"), + }; + } + } + + // todo: Remove this + pub fn request_amounts(&mut self, alice_peer_id: PeerId) { + self.swarm.request_amounts(alice_peer_id, 0); + } + + pub fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { + let _ = libp2p::Swarm::dial_addr(&mut self.swarm, addr)?; + Ok(()) + } + + pub fn send_message0(&mut self, peer_id: PeerId, msg: bob::Message0) { + self.swarm.send_message0(peer_id, msg); + } + + pub fn send_message1(&mut self, peer_id: PeerId, msg: bob::Message1) { + self.swarm.send_message1(peer_id, msg); + } + + pub fn send_message2(&mut self, peer_id: PeerId, msg: bob::Message2) { + self.swarm.send_message2(peer_id, msg); + } + + pub fn send_message3(&mut self, peer_id: PeerId, tx_redeem_encsig: EncryptedSignature) { + self.swarm.send_message3(peer_id, tx_redeem_encsig); + } + + pub async fn recv_conn_established(&mut self) -> Result { + self.conn_established.receiver.recv().await.ok_or_else(|| { + anyhow::Error::msg("Failed to receive connection established from Alice") + }) + } + + pub async fn recv_amounts(&mut self) -> Result { + self.amounts + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive amounts from Alice")) + } + + pub async fn recv_message0(&mut self) -> Result { + self.msg0 + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 0 from Alice")) + } + + pub async fn recv_message1(&mut self) -> Result { + self.msg1 + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 1 from Alice")) + } + + pub async fn recv_message2(&mut self) -> Result { + self.msg2 + .receiver + .recv() + .await + .ok_or_else(|| anyhow::Error::msg("Failed to receive message 2 from Alice")) + } +} diff --git a/swap/tests/e2e.rs b/swap/tests/e2e.rs index 237543b2..13a402c4 100644 --- a/swap/tests/e2e.rs +++ b/swap/tests/e2e.rs @@ -18,8 +18,10 @@ use xmr_btc::{bitcoin, config::Config, cross_curve_dleq}; #[tokio::test] async fn happy_path() { + use tracing_subscriber::util::SubscriberInitExt as _; let _guard = tracing_subscriber::fmt() - .with_env_filter("trace,hyper=warn") + .with_env_filter("swap=info,xmr_btc=info") + .with_ansi(false) .set_default(); let cli = Cli::default(); @@ -109,8 +111,10 @@ async fn happy_path() { /// the encsig and fail to refund or redeem. Alice punishes. #[tokio::test] async fn alice_punishes_if_bob_never_acts_after_fund() { + use tracing_subscriber::util::SubscriberInitExt as _; let _guard = tracing_subscriber::fmt() - .with_env_filter("trace,hyper=warn") + .with_env_filter("swap=info,xmr_btc=info") + .with_ansi(false) .set_default(); let cli = Cli::default(); @@ -192,10 +196,10 @@ async fn init_alice( _btc_starting_balance: bitcoin::Amount, xmr_to_swap: xmr_btc::monero::Amount, xmr_starting_balance: xmr_btc::monero::Amount, - alice_multiaddr: Multiaddr, + listen: Multiaddr, ) -> ( AliceState, - alice::Swarm, + alice::swarm_driver::SwarmDriver, Arc, Arc, PeerId, @@ -236,7 +240,9 @@ async fn init_alice( } }; - let alice_swarm = alice::new_swarm(alice_multiaddr, alice_transport, alice_behaviour).unwrap(); + let alice_swarm = + alice::swarm_driver::SwarmDriver::new(alice_transport, alice_behaviour, listen) + .expect("Could not init alice"); ( alice_state, @@ -259,7 +265,7 @@ async fn init_bob( xmr_stating_balance: xmr_btc::monero::Amount, ) -> ( BobState, - bob::Swarm, + bob::swarm_driver::SwarmDriver, Arc, Arc, Database, @@ -309,7 +315,8 @@ async fn init_bob( peer_id: alice_peer_id, addr: alice_multiaddr, }; - let bob_swarm = bob::new_swarm(bob_transport, bob_behaviour).unwrap(); + + let bob_swarm = bob::swarm_driver::SwarmDriver::new(bob_transport, bob_behaviour); (bob_state, bob_swarm, bob_btc_wallet, bob_xmr_wallet, bob_db) }