Pass Swarm into EventLoop

This reduces the amount of arguments we need to pass into the eventloop
at the expense of slightly more setup of the swarm.
This commit is contained in:
Thomas Eizinger 2021-03-23 16:56:04 +11:00
parent 2c9ab4f6eb
commit 2200fce3f3
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
7 changed files with 61 additions and 84 deletions

View File

@ -15,6 +15,7 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bdk::descriptor::Segwitv0; use bdk::descriptor::Segwitv0;
use bdk::keys::DerivableKey; use bdk::keys::DerivableKey;
use libp2p::Swarm;
use prettytable::{row, Table}; use prettytable::{row, Table};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -27,7 +28,8 @@ use swap::database::Database;
use swap::env::GetConfig; use swap::env::GetConfig;
use swap::fs::default_config_path; use swap::fs::default_config_path;
use swap::monero::Amount; use swap::monero::Amount;
use swap::protocol::alice::{run, EventLoop}; use swap::network::swarm;
use swap::protocol::alice::{run, Behaviour, EventLoop};
use swap::seed::Seed; use swap::seed::Seed;
use swap::trace::init_tracing; use swap::trace::init_tracing;
use swap::{bitcoin, env, kraken, monero}; use swap::{bitcoin, env, kraken, monero};
@ -93,9 +95,12 @@ async fn main() -> Result<()> {
let kraken_rate_updates = kraken::connect()?; let kraken_rate_updates = kraken::connect()?;
let mut swarm = swarm::new::<Behaviour>(&seed)?;
Swarm::listen_on(&mut swarm, config.network.listen)
.context("Failed to listen network interface")?;
let (event_loop, mut swap_receiver) = EventLoop::new( let (event_loop, mut swap_receiver) = EventLoop::new(
config.network.listen, swarm,
seed,
env_config, env_config,
Arc::new(bitcoin_wallet), Arc::new(bitcoin_wallet),
Arc::new(monero_wallet), Arc::new(monero_wallet),

View File

@ -25,8 +25,9 @@ use swap::cli::command::{AliceConnectParams, Arguments, Command, Data, MoneroPar
use swap::database::Database; use swap::database::Database;
use swap::env::{Config, GetConfig}; use swap::env::{Config, GetConfig};
use swap::network::quote::BidQuote; use swap::network::quote::BidQuote;
use swap::network::swarm;
use swap::protocol::bob; use swap::protocol::bob;
use swap::protocol::bob::{Builder, EventLoop}; use swap::protocol::bob::{Behaviour, Builder, EventLoop};
use swap::seed::Seed; use swap::seed::Seed;
use swap::{bitcoin, env, monero}; use swap::{bitcoin, env, monero};
use tracing::{debug, error, info, warn, Level}; use tracing::{debug, error, info, warn, Level};
@ -105,12 +106,12 @@ async fn main() -> Result<()> {
let (monero_wallet, _process) = let (monero_wallet, _process) =
init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; init_monero_wallet(data_dir, monero_daemon_host, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet); let bitcoin_wallet = Arc::new(bitcoin_wallet);
let (event_loop, mut event_loop_handle) = EventLoop::new(
&seed.derive_libp2p_identity(), let mut swarm = swarm::new::<Behaviour>(&seed)?;
alice_peer_id, swarm.add_address(alice_peer_id, alice_addr);
alice_addr,
bitcoin_wallet.clone(), let (event_loop, mut event_loop_handle) =
)?; EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?;
let event_loop = tokio::spawn(event_loop.run()); let event_loop = tokio::spawn(event_loop.run());
let send_bitcoin = determine_btc_to_swap( let send_bitcoin = determine_btc_to_swap(
@ -189,12 +190,11 @@ async fn main() -> Result<()> {
init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; init_monero_wallet(data_dir, monero_daemon_host, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet); let bitcoin_wallet = Arc::new(bitcoin_wallet);
let (event_loop, event_loop_handle) = EventLoop::new( let mut swarm = swarm::new::<Behaviour>(&seed)?;
&seed.derive_libp2p_identity(), swarm.add_address(alice_peer_id, alice_addr);
alice_peer_id,
alice_addr, let (event_loop, event_loop_handle) =
bitcoin_wallet.clone(), EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?;
)?;
let handle = tokio::spawn(event_loop.run()); let handle = tokio::spawn(event_loop.run());
let swap = Builder::new( let swap = Builder::new(

View File

@ -3,21 +3,6 @@ pub mod encrypted_signature;
pub mod peer_tracker; pub mod peer_tracker;
pub mod quote; pub mod quote;
pub mod spot_price; pub mod spot_price;
pub mod swarm;
pub mod transfer_proof; pub mod transfer_proof;
pub mod transport; pub mod transport;
use libp2p::core::Executor;
use std::future::Future;
use std::pin::Pin;
use tokio::runtime::Handle;
#[allow(missing_debug_implementations)]
pub struct TokioExecutor {
pub handle: Handle,
}
impl Executor for TokioExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.handle.spawn(future);
}
}

23
swap/src/network/swarm.rs Normal file
View File

@ -0,0 +1,23 @@
use crate::network::transport;
use crate::seed::Seed;
use anyhow::Result;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::Swarm;
pub fn new<B>(seed: &Seed) -> Result<Swarm<B>>
where
B: NetworkBehaviour + Default,
{
let identity = seed.derive_libp2p_identity();
let behaviour = B::default();
let transport = transport::build(&identity)?;
let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id())
.executor(Box::new(|f| {
tokio::spawn(f);
}))
.build();
Ok(swarm)
}

View File

@ -3,15 +3,13 @@ use crate::database::Database;
use crate::env::Config; use crate::env::Config;
use crate::monero::BalanceTooLow; use crate::monero::BalanceTooLow;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, spot_price, transfer_proof, transport, TokioExecutor}; use crate::network::{encrypted_signature, spot_price, transfer_proof};
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap};
use crate::seed::Seed;
use crate::{bitcoin, kraken, monero}; use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use futures::future; use futures::future;
use futures::future::{BoxFuture, FutureExt}; use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::core::Multiaddr;
use libp2p::{PeerId, Swarm}; use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::collections::HashMap; use std::collections::HashMap;
@ -24,7 +22,6 @@ use uuid::Uuid;
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop<RS> { pub struct EventLoop<RS> {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
peer_id: PeerId,
env_config: Config, env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
@ -46,10 +43,8 @@ impl<LR> EventLoop<LR>
where where
LR: LatestRate, LR: LatestRate,
{ {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
listen_address: Multiaddr, swarm: Swarm<Behaviour>,
seed: Seed,
env_config: Config, env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
@ -57,25 +52,10 @@ where
latest_rate: LR, latest_rate: LR,
max_buy: bitcoin::Amount, max_buy: bitcoin::Amount,
) -> Result<(Self, mpsc::Receiver<Swap>)> { ) -> Result<(Self, mpsc::Receiver<Swap>)> {
let identity = seed.derive_libp2p_identity();
let behaviour = Behaviour::default();
let transport = transport::build(&identity)?;
let peer_id = PeerId::from(identity.public());
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
Swarm::listen_on(&mut swarm, listen_address.clone())
.with_context(|| format!("Address is not supported: {:#}", listen_address))?;
let swap_channel = MpscChannels::default(); let swap_channel = MpscChannels::default();
let event_loop = EventLoop { let event_loop = EventLoop {
swarm, swarm,
peer_id,
env_config, env_config,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
@ -90,7 +70,7 @@ where
} }
pub fn peer_id(&self) -> PeerId { pub fn peer_id(&self) -> PeerId {
self.peer_id *Swarm::local_peer_id(&self.swarm)
} }
pub async fn run(mut self) { pub async fn run(mut self) {

View File

@ -1,12 +1,11 @@
use crate::bitcoin::EncryptedSignature; use crate::bitcoin::EncryptedSignature;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{spot_price, transfer_proof, transport, TokioExecutor}; use crate::network::{spot_price, transfer_proof};
use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2};
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use futures::FutureExt; use futures::FutureExt;
use libp2p::core::Multiaddr; use libp2p::{PeerId, Swarm};
use libp2p::PeerId;
use std::convert::Infallible; use std::convert::Infallible;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
@ -131,26 +130,10 @@ pub struct EventLoop {
impl EventLoop { impl EventLoop {
pub fn new( pub fn new(
identity: &libp2p::core::identity::Keypair, swarm: Swarm<Behaviour>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
alice_addr: Multiaddr,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Result<(Self, EventLoopHandle)> { ) -> Result<(Self, EventLoopHandle)> {
let behaviour = Behaviour::default();
let transport = transport::build(identity)?;
let mut swarm = libp2p::swarm::SwarmBuilder::new(
transport,
behaviour,
identity.public().into_peer_id(),
)
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
swarm.add_address(alice_peer_id, alice_addr);
let start_execution_setup = Channels::new(); let start_execution_setup = Channels::new();
let done_execution_setup = Channels::new(); let done_execution_setup = Channels::new();
let recv_transfer_proof = Channels::new(); let recv_transfer_proof = Channels::new();

View File

@ -7,7 +7,7 @@ use bitcoin_harness::{BitcoindRpcApi, Client};
use futures::Future; use futures::Future;
use get_port::get_port; use get_port::get_port;
use libp2p::core::Multiaddr; use libp2p::core::Multiaddr;
use libp2p::PeerId; use libp2p::{PeerId, Swarm};
use monero_harness::{image, Monero}; use monero_harness::{image, Monero};
use std::convert::Infallible; use std::convert::Infallible;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -17,6 +17,7 @@ use swap::asb::FixedRate;
use swap::bitcoin::{CancelTimelock, PunishTimelock}; use swap::bitcoin::{CancelTimelock, PunishTimelock};
use swap::database::Database; use swap::database::Database;
use swap::env::{Config, GetConfig}; use swap::env::{Config, GetConfig};
use swap::network::swarm;
use swap::protocol::alice::{AliceState, Swap}; use swap::protocol::alice::{AliceState, Swap};
use swap::protocol::bob::BobState; use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob}; use swap::protocol::{alice, bob};
@ -70,12 +71,10 @@ impl BobParams {
} }
pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> {
bob::EventLoop::new( let mut swarm = swarm::new::<bob::Behaviour>(&self.seed)?;
&self.seed.derive_libp2p_identity(), swarm.add_address(self.alice_peer_id, self.alice_address.clone());
self.alice_peer_id,
self.alice_address.clone(), bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone())
self.bitcoin_wallet.clone(),
)
} }
} }
@ -384,9 +383,11 @@ where
) )
.await; .await;
let mut alice_swarm = swarm::new::<alice::Behaviour>(&alice_seed).unwrap();
Swarm::listen_on(&mut alice_swarm, alice_listen_address.clone()).unwrap();
let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new( let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new(
alice_listen_address.clone(), alice_swarm,
alice_seed,
env_config, env_config,
alice_bitcoin_wallet.clone(), alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(), alice_monero_wallet.clone(),